Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance
@ 2020-02-26 10:00 sergepetrenko
  2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 1/4] box: expose box_is_orphan method sergepetrenko
                   ` (4 more replies)
  0 siblings, 5 replies; 25+ messages in thread
From: sergepetrenko @ 2020-02-26 10:00 UTC (permalink / raw)
  To: kirichenkoga, kostja.osipov, v.shpilevoy, alexander.turenko
  Cc: tarantool-patches

https://github.com/tarantool/tarantool/issues/4739
https://github.com/tarantool/tarantool/tree/sp/gh-4739-vclock-assert-v4

Changes in v4:
  - move row skipping logic from recovery to relay
  - encode a list of instances whose rows to skip
    in SUBSCRIBE request insead of encoding
    is_orhpan status

Changes in v3:
  - review fixes as per review from Vlad
  - instead of skipping rows on replica side,
    do it on master side, by patching recovery
    to silently follow rows coming from a certain
    instance.

Changes in v2:
 - review fixes as per review from Kostja

Serge Petrenko (4):
  box: expose box_is_orphan method
  wal: warn when trying to write a record with a broken lsn
  replication: implement an instance id filter for relay
  replication: do not relay rows coming from a remote instance back to
    it

 src/box/applier.cc         |  3 ++-
 src/box/box.cc             | 13 +++++++++---
 src/box/box.h              |  3 +++
 src/box/iproto_constants.h |  1 +
 src/box/relay.cc           |  9 ++++++++-
 src/box/relay.h            |  3 ++-
 src/box/wal.c              | 17 +++++++++++++---
 src/box/xrow.c             | 41 +++++++++++++++++++++++++++++++++++---
 src/box/xrow.h             | 34 +++++++++++++++++++++----------
 9 files changed, 101 insertions(+), 23 deletions(-)

-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Tarantool-patches] [PATCH v4 1/4] box: expose box_is_orphan method
  2020-02-26 10:00 [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance sergepetrenko
@ 2020-02-26 10:00 ` sergepetrenko
  2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 2/4] wal: warn when trying to write a record with a broken lsn sergepetrenko
                   ` (3 subsequent siblings)
  4 siblings, 0 replies; 25+ messages in thread
From: sergepetrenko @ 2020-02-26 10:00 UTC (permalink / raw)
  To: kirichenkoga, kostja.osipov, v.shpilevoy, alexander.turenko
  Cc: tarantool-patches

From: Serge Petrenko <sergepetrenko@tarantool.org>

is_orphan status check is needed by applier in order to tell relay
whether to send the instance's own rows back or not.

Prerequisite #4739
---
 src/box/box.cc | 6 ++++++
 src/box/box.h  | 3 +++
 2 files changed, 9 insertions(+)

diff --git a/src/box/box.cc b/src/box/box.cc
index 9e8311d1e..5850894de 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -247,6 +247,12 @@ box_is_ro(void)
 	return is_ro || is_orphan;
 }
 
+bool
+box_is_orphan(void)
+{
+	return is_orphan;
+}
+
 int
 box_wait_ro(bool ro, double timeout)
 {
diff --git a/src/box/box.h b/src/box/box.h
index a212e6510..f37a945eb 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -105,6 +105,9 @@ box_set_ro();
 bool
 box_is_ro(void);
 
+bool
+box_is_orphan(void);
+
 /**
  * Wait until the instance switches to a desired mode.
  * \param ro wait read-only if set or read-write if unset
-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Tarantool-patches] [PATCH v4 2/4] wal: warn when trying to write a record with a broken lsn
  2020-02-26 10:00 [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance sergepetrenko
  2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 1/4] box: expose box_is_orphan method sergepetrenko
@ 2020-02-26 10:00 ` sergepetrenko
  2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay sergepetrenko
                   ` (2 subsequent siblings)
  4 siblings, 0 replies; 25+ messages in thread
From: sergepetrenko @ 2020-02-26 10:00 UTC (permalink / raw)
  To: kirichenkoga, kostja.osipov, v.shpilevoy, alexander.turenko
  Cc: tarantool-patches

From: Serge Petrenko <sergepetrenko@tarantool.org>

There is an assertion in vclock_follow `lsn > prev_lsn`, which doesn't
fire in release builds, of course. Let's at least warn the user on an
attemt to write a record with a duplicate or otherwise broken lsn, and
not follow such an lsn.

Follow-up #4739
---
 src/box/wal.c | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)

diff --git a/src/box/wal.c b/src/box/wal.c
index ac977c16e..27bff662a 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -951,9 +951,20 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 			(*row)->tsn = tsn;
 			(*row)->is_commit = row == end - 1;
 		} else {
-			vclock_follow(vclock_diff, (*row)->replica_id,
-				      (*row)->lsn - vclock_get(base,
-							       (*row)->replica_id));
+			int64_t diff = (*row)->lsn - vclock_get(base, (*row)->replica_id);
+			if (diff <= vclock_get(vclock_diff,
+					       (*row)->replica_id)) {
+				say_crit("Attempt to write a broken LSN to WAL:"
+					 " replica id: %d, confirmed lsn: %d,"
+					 " new lsn %d", (*row)->replica_id,
+					 vclock_get(base, (*row)->replica_id) +
+					 vclock_get(vclock_diff,
+						    (*row)->replica_id),
+						    (*row)->lsn);
+				assert(false);
+			} else {
+				vclock_follow(vclock_diff, (*row)->replica_id, diff);
+			}
 		}
 	}
 }
-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay
  2020-02-26 10:00 [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance sergepetrenko
  2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 1/4] box: expose box_is_orphan method sergepetrenko
  2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 2/4] wal: warn when trying to write a record with a broken lsn sergepetrenko
@ 2020-02-26 10:00 ` sergepetrenko
  2020-02-26 10:18   ` Konstantin Osipov
  2020-02-26 23:54   ` Vladislav Shpilevoy
  2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it sergepetrenko
  2020-02-26 23:54 ` [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance Vladislav Shpilevoy
  4 siblings, 2 replies; 25+ messages in thread
From: sergepetrenko @ 2020-02-26 10:00 UTC (permalink / raw)
  To: kirichenkoga, kostja.osipov, v.shpilevoy, alexander.turenko
  Cc: tarantool-patches

From: Serge Petrenko <sergepetrenko@tarantool.org>

Add a filter for relay to skip rows coming from unwanted instances.
A list of instance ids whose rows replica doesn't want to fetch is encoded
together with SUBSCRIBE request after a freshly introduced flag IPROTO_ID_MASK.

Filtering rows is needed to prevent an instance from fetching its own
rows from a remote master, which is useful on initial configuration and
harmful on resubscribe.

Prerequisite #4739, #3294

@TarantoolBot document

Title: document new binary protocol key and subscribe request changes

Add key `IPROTO_ID_MASK = 0x51` to the internals reference.
This is an optional key used in SUBSCRIBE request followed by an array
of ids of instances whose rows won't be relayed to the replica.

SUBSCRIBE request is supplemented with an optional field of the
following structure:
```
+====================+
|      ID_MASK       |
|   0x51 : ID LIST   |
| MP_INT : MP_ARRRAY |
|                    |
+====================+
```
The field is encoded only when the id list is not empty.
---
 src/box/applier.cc         |  2 +-
 src/box/box.cc             |  7 ++++---
 src/box/iproto_constants.h |  1 +
 src/box/relay.cc           |  9 ++++++++-
 src/box/relay.h            |  3 ++-
 src/box/xrow.c             | 41 +++++++++++++++++++++++++++++++++++---
 src/box/xrow.h             | 34 +++++++++++++++++++++----------
 7 files changed, 77 insertions(+), 20 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index ae3d281a5..911353425 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -867,7 +867,7 @@ applier_subscribe(struct applier *applier)
 	vclock_create(&vclock);
 	vclock_copy(&vclock, &replicaset.vclock);
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
-				 &vclock, replication_anon);
+				 &vclock, replication_anon, 0);
 	coio_write_xrow(coio, &row);
 
 	/* Read SUBSCRIBE response */
diff --git a/src/box/box.cc b/src/box/box.cc
index 5850894de..232d7861b 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1787,8 +1787,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 	uint32_t replica_version_id;
 	vclock_create(&replica_clock);
 	bool anon;
-	xrow_decode_subscribe_xc(header, NULL, &replica_uuid,
-				 &replica_clock, &replica_version_id, &anon);
+	unsigned int id_mask;
+	xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock,
+				 &replica_version_id, &anon, &id_mask);
 
 	/* Forbid connection to itself */
 	if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID))
@@ -1871,7 +1872,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 	 * indefinitely).
 	 */
 	relay_subscribe(replica, io->fd, header->sync, &replica_clock,
-			replica_version_id);
+			replica_version_id, id_mask);
 }
 
 void
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index b66c05c06..814ada303 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -125,6 +125,7 @@ enum iproto_key {
 	IPROTO_STMT_ID = 0x43,
 	/* Leave a gap between SQL keys and additional request keys */
 	IPROTO_REPLICA_ANON = 0x50,
+	IPROTO_ID_MASK = 0x51,
 	IPROTO_KEY_MAX
 };
 
diff --git a/src/box/relay.cc b/src/box/relay.cc
index b89632273..87930a006 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -109,6 +109,7 @@ struct relay {
 	struct vclock recv_vclock;
 	/** Replicatoin slave version. */
 	uint32_t version_id;
+	unsigned int id_mask;
 	/**
 	 * Local vclock at the moment of subscribe, used to check
 	 * dataset on the other side and send missing data rows if any.
@@ -676,7 +677,8 @@ relay_subscribe_f(va_list ap)
 /** Replication acceptor fiber handler. */
 void
 relay_subscribe(struct replica *replica, int fd, uint64_t sync,
-		struct vclock *replica_clock, uint32_t replica_version_id)
+		struct vclock *replica_clock, uint32_t replica_version_id,
+		unsigned int replica_id_mask)
 {
 	assert(replica->anon || replica->id != REPLICA_ID_NIL);
 	struct relay *relay = replica->relay;
@@ -705,6 +707,8 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
 
+	relay->id_mask = replica_id_mask;
+
 	int rc = cord_costart(&relay->cord, "subscribe",
 			      relay_subscribe_f, relay);
 	if (rc == 0)
@@ -763,6 +767,9 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 		packet->group_id = GROUP_DEFAULT;
 		packet->bodycnt = 0;
 	}
+	/* Check if the rows from the instance are filtered. */
+	if (1 << packet->replica_id & relay->id_mask)
+		return;
 	/*
 	 * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
 	 * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
diff --git a/src/box/relay.h b/src/box/relay.h
index e1782d78f..255d29d90 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -124,6 +124,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
  */
 void
 relay_subscribe(struct replica *replica, int fd, uint64_t sync,
-		struct vclock *replica_vclock, uint32_t replica_version_id);
+		struct vclock *replica_vclock, uint32_t replica_version_id,
+		unsigned int replica_id_mask);
 
 #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 968c3a202..ee78ed24d 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1194,17 +1194,23 @@ int
 xrow_encode_subscribe(struct xrow_header *row,
 		      const struct tt_uuid *replicaset_uuid,
 		      const struct tt_uuid *instance_uuid,
-		      const struct vclock *vclock, bool anon)
+		      const struct vclock *vclock, bool anon,
+		      unsigned int id_mask)
 {
 	memset(row, 0, sizeof(*row));
 	size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
+	unsigned int map_size = __builtin_popcount(id_mask);
+	if (map_size) {
+		size += mp_sizeof_array(map_size) + map_size *
+			mp_sizeof_uint(VCLOCK_MAX);
+	}
 	char *buf = (char *) region_alloc(&fiber()->gc, size);
 	if (buf == NULL) {
 		diag_set(OutOfMemory, size, "region_alloc", "buf");
 		return -1;
 	}
 	char *data = buf;
-	data = mp_encode_map(data, 5);
+	data = mp_encode_map(data, map_size ? 6 : 5);
 	data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
 	data = xrow_encode_uuid(data, replicaset_uuid);
 	data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
@@ -1215,6 +1221,17 @@ xrow_encode_subscribe(struct xrow_header *row,
 	data = mp_encode_uint(data, tarantool_version_id());
 	data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
 	data = mp_encode_bool(data, anon);
+	if (map_size) {
+		data = mp_encode_uint(data, IPROTO_ID_MASK);
+		data = mp_encode_array(data, map_size);
+		struct bit_iterator it;
+		bit_iterator_init(&it, &id_mask, sizeof(id_mask),
+				  true);
+		for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;
+		     id = bit_iterator_next(&it)) {
+			data = mp_encode_uint(data, id);
+		}
+	}
 	assert(data <= buf + size);
 	row->body[0].iov_base = buf;
 	row->body[0].iov_len = (data - buf);
@@ -1226,7 +1243,8 @@ xrow_encode_subscribe(struct xrow_header *row,
 int
 xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 		      struct tt_uuid *instance_uuid, struct vclock *vclock,
-		      uint32_t *version_id, bool *anon)
+		      uint32_t *version_id, bool *anon,
+		      unsigned int *id_mask)
 {
 	if (row->bodycnt == 0) {
 		diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -1244,6 +1262,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 
 	if (anon)
 		*anon = false;
+	if (id_mask)
+		*id_mask = 0;
 	d = data;
 	uint32_t map_size = mp_decode_map(&d);
 	for (uint32_t i = 0; i < map_size; i++) {
@@ -1301,6 +1321,21 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 			}
 			*anon = mp_decode_bool(&d);
 			break;
+		case IPROTO_ID_MASK:
+			if (id_mask == NULL)
+				goto skip;
+			if (mp_typeof(*d) != MP_ARRAY) {
+decode_err:			xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+						   "invalid id_mask");
+				return -1;
+			}
+			uint32_t len = mp_decode_array(&d);
+			for(uint32_t i = 0; i < len; ++i) {
+				if (mp_typeof(*d) != MP_UINT)
+					goto decode_err;
+				*id_mask |= 1 << mp_decode_uint(&d);
+			}
+			break;
 		default: skip:
 			mp_next(&d); /* value */
 		}
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 0973c497d..ab5fc944a 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -322,6 +322,8 @@ xrow_encode_register(struct xrow_header *row,
  * @param instance_uuid Instance uuid.
  * @param vclock Replication clock.
  * @param anon Whether it is an anonymous subscribe request or not.
+ * @param id_mask A List of replica ids to skip rows from
+ *                      when feeding a replica.
  *
  * @retval  0 Success.
  * @retval -1 Memory error.
@@ -330,7 +332,8 @@ int
 xrow_encode_subscribe(struct xrow_header *row,
 		      const struct tt_uuid *replicaset_uuid,
 		      const struct tt_uuid *instance_uuid,
-		      const struct vclock *vclock, bool anon);
+		      const struct vclock *vclock, bool anon,
+		      unsigned int id_mask);
 
 /**
  * Decode SUBSCRIBE command.
@@ -340,6 +343,8 @@ xrow_encode_subscribe(struct xrow_header *row,
  * @param[out] vclock.
  * @param[out] version_id.
  * @param[out] anon Whether it is an anonymous subscribe.
+ * @param[out] id_mask A list of ids to skip rows from when
+ *             feeding replica.
  *
  * @retval  0 Success.
  * @retval -1 Memory or format error.
@@ -347,7 +352,8 @@ xrow_encode_subscribe(struct xrow_header *row,
 int
 xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 		      struct tt_uuid *instance_uuid, struct vclock *vclock,
-		      uint32_t *version_id, bool *anon);
+		      uint32_t *version_id, bool *anon,
+		      unsigned int *id_mask);
 
 /**
  * Encode JOIN command.
@@ -371,7 +377,8 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid);
 static inline int
 xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid)
 {
-	return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL);
+	return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL,
+				     NULL);
 }
 
 /**
@@ -386,7 +393,8 @@ static inline int
 xrow_decode_register(struct xrow_header *row, struct tt_uuid *instance_uuid,
 		     struct vclock *vclock)
 {
-	return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL, NULL);
+	return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL,
+				     NULL, NULL);
 }
 
 /**
@@ -411,7 +419,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock);
 static inline int
 xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock)
 {
-	return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL);
+	return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL, NULL);
 }
 
 /**
@@ -442,7 +450,8 @@ xrow_decode_subscribe_response(struct xrow_header *row,
 			       struct tt_uuid *replicaset_uuid,
 			       struct vclock *vclock)
 {
-	return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL, NULL);
+	return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL,
+				     NULL, NULL);
 }
 
 /**
@@ -817,10 +826,11 @@ static inline void
 xrow_encode_subscribe_xc(struct xrow_header *row,
 			 const struct tt_uuid *replicaset_uuid,
 			 const struct tt_uuid *instance_uuid,
-			 const struct vclock *vclock, bool anon)
+			 const struct vclock *vclock, bool anon,
+			 unsigned int id_mask)
 {
 	if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
-				  vclock, anon) != 0)
+				  vclock, anon, id_mask) != 0)
 		diag_raise();
 }
 
@@ -828,11 +838,13 @@ xrow_encode_subscribe_xc(struct xrow_header *row,
 static inline void
 xrow_decode_subscribe_xc(struct xrow_header *row,
 			 struct tt_uuid *replicaset_uuid,
-		         struct tt_uuid *instance_uuid, struct vclock *vclock,
-			 uint32_t *replica_version_id, bool *anon)
+			 struct tt_uuid *instance_uuid, struct vclock *vclock,
+			 uint32_t *replica_version_id, bool *anon,
+			 unsigned int *id_mask)
 {
 	if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
-				  vclock, replica_version_id, anon) != 0)
+				  vclock, replica_version_id, anon,
+				  id_mask) != 0)
 		diag_raise();
 }
 
-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 25+ messages in thread

* [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it
  2020-02-26 10:00 [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance sergepetrenko
                   ` (2 preceding siblings ...)
  2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay sergepetrenko
@ 2020-02-26 10:00 ` sergepetrenko
  2020-02-26 10:23   ` Konstantin Osipov
  2020-02-26 23:54   ` Vladislav Shpilevoy
  2020-02-26 23:54 ` [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance Vladislav Shpilevoy
  4 siblings, 2 replies; 25+ messages in thread
From: sergepetrenko @ 2020-02-26 10:00 UTC (permalink / raw)
  To: kirichenkoga, kostja.osipov, v.shpilevoy, alexander.turenko
  Cc: tarantool-patches

From: Serge Petrenko <sergepetrenko@tarantool.org>

We have a mechanism for restoring rows originating from an instance that
suffered a sudden power loss: remote masters resend the isntance's rows
received before a certain point in time, defined by remote master vclock
at the moment of subscribe.
However, this is useful only on initial replication configuraiton, when
an instance has just recovered, so that it can receive what it has
relayed but haven't synced to disk.
In other cases, when an instance is operating normally and master-master
replication is configured, the mechanism described above may lead to
instance re-applying instance's own rows, coming from a master it has just
subscribed to.
To fix the problem do not relay rows coming from a remote instance, if
the instance has already recovered.

Closes #4739
---
 src/box/applier.cc | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 911353425..1a07d71a9 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -866,8 +866,9 @@ applier_subscribe(struct applier *applier)
 	struct vclock vclock;
 	vclock_create(&vclock);
 	vclock_copy(&vclock, &replicaset.vclock);
+	unsigned int id_mask = box_is_orphan() ? 0 : 1 << instance_id;
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
-				 &vclock, replication_anon, 0);
+				 &vclock, replication_anon, id_mask);
 	coio_write_xrow(coio, &row);
 
 	/* Read SUBSCRIBE response */
-- 
2.20.1 (Apple Git-117)

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay
  2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay sergepetrenko
@ 2020-02-26 10:18   ` Konstantin Osipov
  2020-02-26 11:16     ` Serge Petrenko
  2020-02-26 23:54   ` Vladislav Shpilevoy
  1 sibling, 1 reply; 25+ messages in thread
From: Konstantin Osipov @ 2020-02-26 10:18 UTC (permalink / raw)
  To: sergepetrenko; +Cc: kirichenkoga, tarantool-patches, v.shpilevoy


Generally I think you're on track with these series. 

I believe it will also be possible to use this filter to not send
records twice in a full mesh. E.g. you could add a follow up patch
which excludes from the subscription peers to which we have a
direct connection, this will easily solve the full-mesh
replication traffic explosion issue, the replica will only get the
changes directly from the peers.

Please solicit a detailed review.

* sergepetrenko <sergepetrenko@tarantool.org> [20/02/26 13:00]:

> +	unsigned int map_size = __builtin_popcount(id_mask);
> +	if (map_size) {
> +		size += mp_sizeof_array(map_size) + map_size *
> +			mp_sizeof_uint(VCLOCK_MAX);
> +	}

Better:mask_size.

Besides, this is perhaps not id_mask, but id_filter. The
difference is that mask is something inclusive, i.e. if you're in
the mask, you get the records, while filter is
something exclusive, if you're not in the mask, you don't get the
records.


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it
  2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it sergepetrenko
@ 2020-02-26 10:23   ` Konstantin Osipov
  2020-02-26 11:21     ` Serge Petrenko
  2020-02-26 23:54   ` Vladislav Shpilevoy
  1 sibling, 1 reply; 25+ messages in thread
From: Konstantin Osipov @ 2020-02-26 10:23 UTC (permalink / raw)
  To: sergepetrenko; +Cc: kirichenkoga, tarantool-patches, v.shpilevoy

* sergepetrenko <sergepetrenko@tarantool.org> [20/02/26 13:00]:
> From: Serge Petrenko <sergepetrenko@tarantool.org>
> 
> We have a mechanism for restoring rows originating from an instance that
> suffered a sudden power loss: remote masters resend the isntance's rows
> received before a certain point in time, defined by remote master vclock
> at the moment of subscribe.
> However, this is useful only on initial replication configuraiton, when
> an instance has just recovered, so that it can receive what it has
> relayed but haven't synced to disk.
> In other cases, when an instance is operating normally and master-master
> replication is configured, the mechanism described above may lead to
> instance re-applying instance's own rows, coming from a master it has just
> subscribed to.
> To fix the problem do not relay rows coming from a remote instance, if
> the instance has already recovered.
> 

A comment like this also belongs to the code. Usually the patch 
that fixes a bug comes along with a test case for a bug, are you
sure you can't submit one?

>  	vclock_copy(&vclock, &replicaset.vclock);
> +	unsigned int id_mask = box_is_orphan() ? 0 : 1 << instance_id;

box_is_orphan() fits the bill, so it's good enough.

I would explain, however, that what we are really looking for
here is whether or not the local WAL accepts writes. As soon as we
started allowing writes to the local WAL, we don't want to get
these writes from elsewhere. 

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay
  2020-02-26 10:18   ` Konstantin Osipov
@ 2020-02-26 11:16     ` Serge Petrenko
  0 siblings, 0 replies; 25+ messages in thread
From: Serge Petrenko @ 2020-02-26 11:16 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: kirichenkoga, tarantool-patches, v.shpilevoy

[-- Attachment #1: Type: text/plain, Size: 10129 bytes --]




  
>Среда, 26 февраля 2020, 13:18 +03:00 от Konstantin Osipov <kostja.osipov@gmail.com>:
> 
>
>Generally I think you're on track with these series.
>
>I believe it will also be possible to use this filter to not send
>records twice in a full mesh. E.g. you could add a follow up patch
>which excludes from the subscription peers to which we have a
>direct connection, this will easily solve the full-mesh
>replication traffic explosion issue, the replica will only get the
>changes directly from the peers.
 
Okay, will do.
 
>
>Please solicit a detailed review.
>
>* sergepetrenko < sergepetrenko@tarantool.org > [20/02/26 13:00]:
> 
>> + unsigned int map_size = __builtin_popcount(id_mask);
>> + if (map_size) {
>> + size += mp_sizeof_array(map_size) + map_size *
>> + mp_sizeof_uint(VCLOCK_MAX);
>> + }
>Better:mask_size.
>
>Besides, this is perhaps not id_mask, but id_filter. The
>difference is that mask is something inclusive, i.e. if you're in
>the mask, you get the records, while filter is
>something exclusive, if you're not in the mask, you don't get the
>records.
 
Okay,

s/id_mask/id_filter
s/map_size/mask_size
 
diff --git a/src/box/box.cc b/src/box/box.cc
index 232d7861b..94267c74e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1787,9 +1787,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
  uint32_t replica_version_id;
  vclock_create(&replica_clock);
  bool anon;
- unsigned int id_mask;
+ unsigned int id_filter;
  xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock,
- &replica_version_id, &anon, &id_mask);
+ &replica_version_id, &anon, &id_filter);
 
  /* Forbid connection to itself */
  if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID))
@@ -1872,7 +1872,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
  * indefinitely).
  */
  relay_subscribe(replica, io->fd, header->sync, &replica_clock,
- replica_version_id, id_mask);
+ replica_version_id, id_filter);
}
 
void
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 814ada303..f9d413a31 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -125,7 +125,7 @@ enum iproto_key {
  IPROTO_STMT_ID = 0x43,
  /* Leave a gap between SQL keys and additional request keys */
  IPROTO_REPLICA_ANON = 0x50,
- IPROTO_ID_MASK = 0x51,
+ IPROTO_ID_FILTER = 0x51,
  IPROTO_KEY_MAX
};
 
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 87930a006..dc89b90e2 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -109,7 +109,7 @@ struct relay {
  struct vclock recv_vclock;
  /** Replicatoin slave version. */
  uint32_t version_id;
- unsigned int id_mask;
+ unsigned int id_filter;
  /**
  * Local vclock at the moment of subscribe, used to check
  * dataset on the other side and send missing data rows if any.
@@ -678,7 +678,7 @@ relay_subscribe_f(va_list ap)
void
relay_subscribe(struct replica *replica, int fd, uint64_t sync,
  struct vclock *replica_clock, uint32_t replica_version_id,
- unsigned int replica_id_mask)
+ unsigned int replica_id_filter)
{
  assert(replica->anon || replica->id != REPLICA_ID_NIL);
  struct relay *relay = replica->relay;
@@ -707,7 +707,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
  vclock_copy(&relay->tx.vclock, replica_clock);
  relay->version_id = replica_version_id;
 
- relay->id_mask = replica_id_mask;
+ relay->id_filter = replica_id_filter;
 
  int rc = cord_costart(&relay->cord, "subscribe",
        relay_subscribe_f, relay);
@@ -768,7 +768,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
  packet->bodycnt = 0;
  }
  /* Check if the rows from the instance are filtered. */
- if (1 << packet->replica_id & relay->id_mask)
+ if (1 << packet->replica_id & relay->id_filter)
  return;
  /*
  * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
diff --git a/src/box/relay.h b/src/box/relay.h
index 255d29d90..6e7eebab1 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -125,6 +125,6 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
void
relay_subscribe(struct replica *replica, int fd, uint64_t sync,
  struct vclock *replica_vclock, uint32_t replica_version_id,
- unsigned int replica_id_mask);
+ unsigned int replica_id_filter);
 
#endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index ee78ed24d..10edbf6a8 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1195,13 +1195,13 @@ xrow_encode_subscribe(struct xrow_header *row,
        const struct tt_uuid *replicaset_uuid,
        const struct tt_uuid *instance_uuid,
        const struct vclock *vclock, bool anon,
-       unsigned int id_mask)
+       unsigned int id_filter)
{
  memset(row, 0, sizeof(*row));
  size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
- unsigned int map_size = __builtin_popcount(id_mask);
- if (map_size) {
- size += mp_sizeof_array(map_size) + map_size *
+ unsigned int filter_size = __builtin_popcount(id_filter);
+ if (filter_size) {
+ size += mp_sizeof_array(filter_size) + filter_size *
  mp_sizeof_uint(VCLOCK_MAX);
  }
  char *buf = (char *) region_alloc(&fiber()->gc, size);
@@ -1210,7 +1210,7 @@ xrow_encode_subscribe(struct xrow_header *row,
  return -1;
  }
  char *data = buf;
- data = mp_encode_map(data, map_size ? 6 : 5);
+ data = mp_encode_map(data, filter_size ? 6 : 5);
  data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
  data = xrow_encode_uuid(data, replicaset_uuid);
  data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
@@ -1221,11 +1221,11 @@ xrow_encode_subscribe(struct xrow_header *row,
  data = mp_encode_uint(data, tarantool_version_id());
  data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
  data = mp_encode_bool(data, anon);
- if (map_size) {
- data = mp_encode_uint(data, IPROTO_ID_MASK);
- data = mp_encode_array(data, map_size);
+ if (filter_size) {
+ data = mp_encode_uint(data, IPROTO_ID_FILTER);
+ data = mp_encode_array(data, filter_size);
  struct bit_iterator it;
- bit_iterator_init(&it, &id_mask, sizeof(id_mask),
+ bit_iterator_init(&it, &id_filter, sizeof(id_filter),
    true);
  for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;
      id = bit_iterator_next(&it)) {
@@ -1244,7 +1244,7 @@ int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
        struct tt_uuid *instance_uuid, struct vclock *vclock,
        uint32_t *version_id, bool *anon,
-       unsigned int *id_mask)
+       unsigned int *id_filter)
{
  if (row->bodycnt == 0) {
  diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -1262,8 +1262,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 
  if (anon)
  *anon = false;
- if (id_mask)
- *id_mask = 0;
+ if (id_filter)
+ *id_filter = 0;
  d = data;
  uint32_t map_size = mp_decode_map(&d);
  for (uint32_t i = 0; i < map_size; i++) {
@@ -1321,19 +1321,19 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
  }
  *anon = mp_decode_bool(&d);
  break;
- case IPROTO_ID_MASK:
- if (id_mask == NULL)
+ case IPROTO_ID_FILTER:
+ if (id_filter == NULL)
  goto skip;
  if (mp_typeof(*d) != MP_ARRAY) {
decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
-   "invalid id_mask");
+   "invalid id_filter");
  return -1;
  }
  uint32_t len = mp_decode_array(&d);
  for(uint32_t i = 0; i < len; ++i) {
  if (mp_typeof(*d) != MP_UINT)
  goto decode_err;
- *id_mask |= 1 << mp_decode_uint(&d);
+ *id_filter |= 1 << mp_decode_uint(&d);
  }
  break;
  default: skip:
diff --git a/src/box/xrow.h b/src/box/xrow.h
index ab5fc944a..8e5716b30 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -322,7 +322,7 @@ xrow_encode_register(struct xrow_header *row,
  * @param instance_uuid Instance uuid.
  * @param vclock Replication clock.
  * @param anon Whether it is an anonymous subscribe request or not.
- * @param id_mask A List of replica ids to skip rows from
+ * @param id_filter A List of replica ids to skip rows from
  *                      when feeding a replica.
  *
  * @retval  0 Success.
@@ -333,7 +333,7 @@ xrow_encode_subscribe(struct xrow_header *row,
        const struct tt_uuid *replicaset_uuid,
        const struct tt_uuid *instance_uuid,
        const struct vclock *vclock, bool anon,
-       unsigned int id_mask);
+       unsigned int id_filter);
 
/**
  * Decode SUBSCRIBE command.
@@ -343,7 +343,7 @@ xrow_encode_subscribe(struct xrow_header *row,
  * @param[out] vclock.
  * @param[out] version_id.
  * @param[out] anon Whether it is an anonymous subscribe.
- * @param[out] id_mask A list of ids to skip rows from when
+ * @param[out] id_filter A list of ids to skip rows from when
  *             feeding replica.
  *
  * @retval  0 Success.
@@ -353,7 +353,7 @@ int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
        struct tt_uuid *instance_uuid, struct vclock *vclock,
        uint32_t *version_id, bool *anon,
-       unsigned int *id_mask);
+       unsigned int *id_filter);
 
/**
  * Encode JOIN command.
@@ -827,10 +827,10 @@ xrow_encode_subscribe_xc(struct xrow_header *row,
  const struct tt_uuid *replicaset_uuid,
  const struct tt_uuid *instance_uuid,
  const struct vclock *vclock, bool anon,
- unsigned int id_mask)
+ unsigned int id_filter)
{
  if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
-   vclock, anon, id_mask) != 0)
+   vclock, anon, id_filter) != 0)
  diag_raise();
}
 
@@ -840,11 +840,11 @@ xrow_decode_subscribe_xc(struct xrow_header *row,
  struct tt_uuid *replicaset_uuid,
  struct tt_uuid *instance_uuid, struct vclock *vclock,
  uint32_t *replica_version_id, bool *anon,
- unsigned int *id_mask)
+ unsigned int *id_filter)
{
  if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
    vclock, replica_version_id, anon,
-   id_mask) != 0)
+   id_filter) != 0)
  diag_raise();
}
>
>
>--
>Konstantin Osipov, Moscow, Russia 
 
 
--
Serge Petrenko
 

[-- Attachment #2: Type: text/html, Size: 13667 bytes --]

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it
  2020-02-26 10:23   ` Konstantin Osipov
@ 2020-02-26 11:21     ` Serge Petrenko
  2020-02-26 11:58       ` Konstantin Osipov
  0 siblings, 1 reply; 25+ messages in thread
From: Serge Petrenko @ 2020-02-26 11:21 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: kirichenkoga, tarantool-patches, v.shpilevoy

[-- Attachment #1: Type: text/plain, Size: 2890 bytes --]


  
>Среда, 26 февраля 2020, 13:23 +03:00 от Konstantin Osipov <kostja.osipov@gmail.com>:
> 
>* sergepetrenko < sergepetrenko@tarantool.org > [20/02/26 13:00]:
>> From: Serge Petrenko < sergepetrenko@tarantool.org >
>>
>> We have a mechanism for restoring rows originating from an instance that
>> suffered a sudden power loss: remote masters resend the isntance's rows
>> received before a certain point in time, defined by remote master vclock
>> at the moment of subscribe.
>> However, this is useful only on initial replication configuraiton, when
>> an instance has just recovered, so that it can receive what it has
>> relayed but haven't synced to disk.
>> In other cases, when an instance is operating normally and master-master
>> replication is configured, the mechanism described above may lead to
>> instance re-applying instance's own rows, coming from a master it has just
>> subscribed to.
>> To fix the problem do not relay rows coming from a remote instance, if
>> the instance has already recovered.
>>
>
>A comment like this also belongs to the code. Usually the patch
>that fixes a bug comes along with a test case for a bug, are you
>sure you can't submit one?
 
I don’t think I can. The test that comes with an issue is a stress test,
relying on running it with multiple workers simultaneously.
It reproduces the problem when ran with 4 workers on one of my PCs,
and with 20 workers on the other.
I think we don’t have the appropriate testing infrastructure to run the same
test with multiple workers at the same time, and I couldn’t come up with a
single test which would reproduce the same problem.
 
>
>> vclock_copy(&vclock, &replicaset.vclock);
>> + unsigned int id_mask = box_is_orphan() ? 0 : 1 << instance_id;
>
>box_is_orphan() fits the bill, so it's good enough.
>
>I would explain, however, that what we are really looking for
>here is whether or not the local WAL accepts writes. As soon as we
>started allowing writes to the local WAL, we don't want to get
>these writes from elsewhere.
 
Ok.
 
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 1a07d71a9..73ffc0d68 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -866,9 +866,13 @@ applier_subscribe(struct applier *applier)
  struct vclock vclock;
  vclock_create(&vclock);
  vclock_copy(&vclock, &replicaset.vclock);
- unsigned int id_mask = box_is_orphan() ? 0 : 1 << instance_id;
+ /*
+ * Stop accepting local rows coming from a remote
+ * instance as soon as local WAL starts accepting writes.
+ */
+ unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id;
  xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
- &vclock, replication_anon, id_mask);
+ &vclock, replication_anon, id_filter);
  coio_write_xrow(coio, &row);
 
  /* Read SUBSCRIBE response */
 
>
>--
>Konstantin Osipov, Moscow, Russia
 
--
Serge Petrenko
 

[-- Attachment #2: Type: text/html, Size: 4179 bytes --]

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it
  2020-02-26 11:21     ` Serge Petrenko
@ 2020-02-26 11:58       ` Konstantin Osipov
  2020-02-26 15:58         ` Serge Petrenko
  0 siblings, 1 reply; 25+ messages in thread
From: Konstantin Osipov @ 2020-02-26 11:58 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: kirichenkoga, tarantool-patches, v.shpilevoy

* Serge Petrenko <sergepetrenko@tarantool.org> [20/02/26 14:22]:
> I don’t think I can. The test that comes with an issue is a stress test,
> relying on running it with multiple workers simultaneously.
> It reproduces the problem when ran with 4 workers on one of my PCs,
> and with 20 workers on the other.
> I think we don’t have the appropriate testing infrastructure to run the same
> test with multiple workers at the same time, and I couldn’t come up with a
> single test which would reproduce the same problem.

Is there a place in which you can inject a sleep to make the
problem much easier to reproduce? 

What about injecting a sleep in wal code on replica, the place
which increments local replicaset vclock ?

Then you will be much more likely to receive a record from the
peer before you incremented the record vclock locally, and the bug
will be reproducible with a single master.

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it
  2020-02-26 11:58       ` Konstantin Osipov
@ 2020-02-26 15:58         ` Serge Petrenko
  2020-02-26 16:40           ` Konstantin Osipov
  0 siblings, 1 reply; 25+ messages in thread
From: Serge Petrenko @ 2020-02-26 15:58 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: kirichenkoga, tarantool-patches, v.shpilevoy

[-- Attachment #1: Type: text/plain, Size: 7435 bytes --]


  
>Среда, 26 февраля 2020, 14:58 +03:00 от Konstantin Osipov <kostja.osipov@gmail.com>:
> 
>* Serge Petrenko < sergepetrenko@tarantool.org > [20/02/26 14:22]:
>> I don’t think I can. The test that comes with an issue is a stress test,
>> relying on running it with multiple workers simultaneously.
>> It reproduces the problem when ran with 4 workers on one of my PCs,
>> and with 20 workers on the other.
>> I think we don’t have the appropriate testing infrastructure to run the same
>> test with multiple workers at the same time, and I couldn’t come up with a
>> single test which would reproduce the same problem.
>Is there a place in which you can inject a sleep to make the
>problem much easier to reproduce?
>
>What about injecting a sleep in wal code on replica, the place
>which increments local replicaset vclock ?
 
Thanks for the suggestion! Haven’t thought about it for some reason.
I made a test. The diff’s below.
>
>Then you will be much more likely to receive a record from the
>peer before you incremented the record vclock locally, and the bug
>will be reproducible with a single master.
>
>--
>Konstantin Osipov, Moscow, Russia
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 27bff662a..35ba7b072 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -278,8 +278,13 @@ tx_schedule_commit(struct cmsg *msg)
  /* Closes the input valve. */
  stailq_concat(&writer->rollback, &batch->rollback);
  }
+
+ ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
  /* Update the tx vclock to the latest written by wal. */
  vclock_copy(&replicaset.vclock, &batch->vclock);
+#ifndef NDEBUG
+skip_update:
+#endif
  tx_schedule_queue(&batch->commit);
  mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
}
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index ed0cba903..58fe158fd 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -136,7 +136,8 @@ struct errinj {
  _(ERRINJ_SWIM_FD_ONLY, ERRINJ_BOOL, {.bparam = false}) \
  _(ERRINJ_DYN_MODULE_COUNT, ERRINJ_INT, {.iparam = 0}) \
  _(ERRINJ_FIBER_MADVISE, ERRINJ_BOOL, {.bparam = false}) \
- _(ERRINJ_FIBER_MPROTECT, ERRINJ_INT, {.iparam = -1})
+ _(ERRINJ_FIBER_MPROTECT, ERRINJ_INT, {.iparam = -1}) \
+ _(ERRINJ_REPLICASET_VCLOCK_UPDATE, ERRINJ_BOOL, {.bparam = false}) \
 
ENUM0(errinj_id, ERRINJ_LIST);
extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index daa27ed24..eb0905238 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -64,6 +64,7 @@ evals
   - ERRINJ_RELAY_REPORT_INTERVAL: 0
   - ERRINJ_RELAY_SEND_DELAY: false
   - ERRINJ_RELAY_TIMEOUT: 0
+  - ERRINJ_REPLICASET_VCLOCK_UPDATE: false
   - ERRINJ_REPLICA_JOIN_DELAY: false
   - ERRINJ_SIO_READ_MAX: -1
   - ERRINJ_SNAP_COMMIT_DELAY: false
diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
new file mode 100644
index 000000000..7dc2f7118
--- /dev/null
+++ b/test/replication/gh-4739-vclock-assert.result
@@ -0,0 +1,82 @@
+-- test-run result file version 2
+env = require('test_run')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+
+SERVERS = {'rebootstrap1', 'rebootstrap2'}
+ | ---
+ | ...
+test_run:create_cluster(SERVERS, "replication")
+ | ---
+ | ...
+test_run:wait_fullmesh(SERVERS)
+ | ---
+ | ...
+
+test_run:cmd('switch rebootstrap1')
+ | ---
+ | - true
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+-- Stop updating replicaset vclock to simulate a situation, when
+-- a row is already relayed to the remote master, but the local
+-- vclock update hasn't happened yet.
+box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
+ | ---
+ | - ok
+ | ...
+lsn = box.info.lsn
+ | ---
+ | ...
+box.space._schema:replace{'something'}
+ | ---
+ | - ['something']
+ | ...
+-- Vclock isn't updated.
+box.info.lsn == lsn
+ | ---
+ | - true
+ | ...
+
+-- Wait until the remote instance gets the row.
+while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\
+    fiber.sleep(0.01)\
+end
+ | ---
+ | ...
+
+-- Restart the remote instance. This will make the first instance
+-- resubscribe without entering orphan mode.
+test_run:cmd('restart server rebootstrap2')
+ | ---
+ | - true
+ | ...
+test_run:cmd('switch rebootstrap1')
+ | ---
+ | - true
+ | ...
+-- Wait until resubscribe is sent
+fiber.sleep(2 * box.cfg.replication_timeout)
+ | ---
+ | ...
+box.info.replication[2].upstream.status
+ | ---
+ | - sync
+ | ...
+
+box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
+ | ---
+ | - ok
+ | ...
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+test_run:drop_cluster(SERVERS)
+ | ---
+ | ...
diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua
new file mode 100644
index 000000000..26dc781e2
--- /dev/null
+++ b/test/replication/gh-4739-vclock-assert.test.lua
@@ -0,0 +1,34 @@
+env = require('test_run')
+test_run = env.new()
+
+SERVERS = {'rebootstrap1', 'rebootstrap2'}
+test_run:create_cluster(SERVERS, "replication")
+test_run:wait_fullmesh(SERVERS)
+
+test_run:cmd('switch rebootstrap1')
+fiber = require('fiber')
+-- Stop updating replicaset vclock to simulate a situation, when
+-- a row is already relayed to the remote master, but the local
+-- vclock update hasn't happened yet.
+box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
+lsn = box.info.lsn
+box.space._schema:replace{'something'}
+-- Vclock isn't updated.
+box.info.lsn == lsn
+
+-- Wait until the remote instance gets the row.
+while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\
+    fiber.sleep(0.01)\
+end
+
+-- Restart the remote instance. This will make the first instance
+-- resubscribe without entering orphan mode.
+test_run:cmd('restart server rebootstrap2')
+test_run:cmd('switch rebootstrap1')
+-- Wait until resubscribe is sent
+fiber.sleep(2 * box.cfg.replication_timeout)
+box.info.replication[2].upstream.status
+
+box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
+test_run:cmd('switch default')
+test_run:drop_cluster(SERVERS)
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 429c64df3..90fd53ca6 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -15,6 +15,7 @@
     "gh-4402-info-errno.test.lua": {},
     "gh-4605-empty-password.test.lua": {},
     "gh-4606-admin-creds.test.lua": {},
+    "gh-4739-vclock-assert.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index ed1de3140..b4e09744a 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
script =  master.lua
description = tarantool/box, replication
disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua
config = suite.cfg
lua_libs = lua/fast_replica.lua lua/rlimit.lua
use_unix_sockets = True
 
--
Serge Petrenko

[-- Attachment #2: Type: text/html, Size: 9577 bytes --]

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it
  2020-02-26 15:58         ` Serge Petrenko
@ 2020-02-26 16:40           ` Konstantin Osipov
  0 siblings, 0 replies; 25+ messages in thread
From: Konstantin Osipov @ 2020-02-26 16:40 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: kirichenkoga, tarantool-patches, v.shpilevoy

* Serge Petrenko <sergepetrenko@tarantool.org> [20/02/26 19:01]:

OK, you decided to skip it altogether, that is not very realistic
though. Let's see whether anyone else will like this test case (I'm not
very fond of it).

What was wrong with adding fiber_sleep()?

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay
  2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay sergepetrenko
  2020-02-26 10:18   ` Konstantin Osipov
@ 2020-02-26 23:54   ` Vladislav Shpilevoy
  2020-02-27  6:48     ` Konstantin Osipov
  2020-02-27 13:15     ` Serge Petrenko
  1 sibling, 2 replies; 25+ messages in thread
From: Vladislav Shpilevoy @ 2020-02-26 23:54 UTC (permalink / raw)
  To: sergepetrenko, kirichenkoga, kostja.osipov, alexander.turenko
  Cc: tarantool-patches

Thanks for the patch!

See 8 comments below.

>     replication: implement an instance id filter for relay
>     
>     Add a filter for relay to skip rows coming from unwanted instances.
>     A list of instance ids whose rows replica doesn't want to fetch is encoded
>     together with SUBSCRIBE request after a freshly introduced flag IPROTO_ID_FILTER.
>     
>     Filtering rows is needed to prevent an instance from fetching its own
>     rows from a remote master, which is useful on initial configuration and
>     harmful on resubscribe.
>     
>     Prerequisite #4739, #3294
>     
>     @TarantoolBot document
>     
>     Title: document new binary protocol key and subscribe request changes
>     
>     Add key `IPROTO_ID_FILTER = 0x51` to the internals reference.
>     This is an optional key used in SUBSCRIBE request followed by an array
>     of ids of instances whose rows won't be relayed to the replica.
>     
>     SUBSCRIBE request is supplemented with an optional field of the
>     following structure:
>     ```
>     +====================+
>     |      ID_FILTER     |
>     |   0x51 : ID LIST   |
>     | MP_INT : MP_ARRRAY |
>     |                    |
>     +====================+
>     ```
>     The field is encoded only when the id list is not empty.
> 
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index b89632273..dc89b90e2 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -109,6 +109,7 @@ struct relay {
>  	struct vclock recv_vclock;
>  	/** Replicatoin slave version. */
>  	uint32_t version_id;
> +	unsigned int id_filter;

1. Please, add a comment here explaining what is it,
and why is needed.

>  	/**
>  	 * Local vclock at the moment of subscribe, used to check
>  	 * dataset on the other side and send missing data rows if any.
> @@ -763,6 +767,9 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
>  		packet->group_id = GROUP_DEFAULT;
>  		packet->bodycnt = 0;
>  	}
> +	/* Check if the rows from the instance are filtered. */
> +	if (1 << packet->replica_id & relay->id_filter)

2. Please use explicit != 0 here and below when check 'filter_size'
variable.

> +		return;
>  	/*
>  	 * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
>  	 * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index 968c3a202..10edbf6a8 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -1194,17 +1194,23 @@ int
>  xrow_encode_subscribe(struct xrow_header *row,
>  		      const struct tt_uuid *replicaset_uuid,
>  		      const struct tt_uuid *instance_uuid,
> -		      const struct vclock *vclock, bool anon)
> +		      const struct vclock *vclock, bool anon,
> +		      unsigned int id_filter)

3. Nit - it would be better to have it as uint32_t explicitly.
Becuase max id count is 32. Unsigned int does not have size
guarantees, formally speaking. It is at least 16 bits, no more
info.

>  {
>  	memset(row, 0, sizeof(*row));
>  	size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
> +	unsigned int filter_size = __builtin_popcount(id_filter);
> +	if (filter_size) {
> +		size += mp_sizeof_array(filter_size) + filter_size *
> +			mp_sizeof_uint(VCLOCK_MAX);

4. You didn't add mp_sizeof_uint(IPROTO_ID_FILTER) here, but you
didn't update XROW_BODY_LEN_MAX either. Does it fit in the current
value? Also seems like it can't be called XROW_BODY_LEN_MAX anymore.
Because clearly it is not enough to fit the filter array, if you
needed to patch the allocation here.

You could also update XROW_BODY_LEN_MAX so as it includes the biggest
possible filter. Max filter size is just 34 bytes. This is the
simplest option, I think.

> +	}
>  	char *buf = (char *) region_alloc(&fiber()->gc, size);
>  	if (buf == NULL) {
>  		diag_set(OutOfMemory, size, "region_alloc", "buf");
>  		return -1;
>  	}
>  	char *data = buf;
> -	data = mp_encode_map(data, 5);
> +	data = mp_encode_map(data, filter_size ? 6 : 5);
>  	data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
>  	data = xrow_encode_uuid(data, replicaset_uuid);
>  	data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
> @@ -1215,6 +1221,17 @@ xrow_encode_subscribe(struct xrow_header *row,
>  	data = mp_encode_uint(data, tarantool_version_id());
>  	data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
>  	data = mp_encode_bool(data, anon);
> +	if (filter_size) {

5. I wouldn't make it optional in encoding, since this is sent
really rare, but could simplify the code a bit. However, up to
you.

Also, won't it break replication from an old version? You
will send subscribe with this new key, and the old instance
should ignore it. Does it? I don't remember.

If the old instance would ignore it, it means that the bug
still can explode when replicating from an old version, right?
I don't know how to fix that, but if it is true, we need to
document that, at least.

> +		data = mp_encode_uint(data, IPROTO_ID_FILTER);
> +		data = mp_encode_array(data, filter_size);
> +		struct bit_iterator it;
> +		bit_iterator_init(&it, &id_filter, sizeof(id_filter),
> +				  true);
> +		for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;
> +		     id = bit_iterator_next(&it)) {
> +			data = mp_encode_uint(data, id);
> +		}
> +	}
>  	assert(data <= buf + size);
>  	row->body[0].iov_base = buf;
>  	row->body[0].iov_len = (data - buf);
> @@ -1301,6 +1321,21 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
>  			}
>  			*anon = mp_decode_bool(&d);
>  			break;
> +		case IPROTO_ID_FILTER:
> +			if (id_filter == NULL)
> +				goto skip;
> +			if (mp_typeof(*d) != MP_ARRAY) {
> +decode_err:			xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
> +						   "invalid id_filter");

6. Lets say it in caps: ID_FILTER, just like with other decode
error messages. Also I would name this label 'id_filter_decode_err',
becuase other keys can't jump here.

> +				return -1;
> +			}
> +			uint32_t len = mp_decode_array(&d);
> +			for(uint32_t i = 0; i < len; ++i) {
> +				if (mp_typeof(*d) != MP_UINT)
> +					goto decode_err;
> +				*id_filter |= 1 << mp_decode_uint(&d);

7. If someone would send a big ID (a program, pretending to be a Tarantool
instance), it would cause unsigned bit shift overflow, which is undefined
behaviour. Lets check that it is not bigger than 31.

However this won't really help much. This code will crash even if I will
just send a truncated packet. From what I see.

Up to you whether you want to fix the bit shift.

> +			}
> +			break;
>  		default: skip:
>  			mp_next(&d); /* value */
>  		}
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 0973c497d..8e5716b30 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -322,6 +322,8 @@ xrow_encode_register(struct xrow_header *row,
>   * @param instance_uuid Instance uuid.
>   * @param vclock Replication clock.
>   * @param anon Whether it is an anonymous subscribe request or not.
> + * @param id_filter A List of replica ids to skip rows from
> + *                      when feeding a replica.

8. Looks like a huge indentation. Keep if you want. I noticed
just in case it was an accident. In the next @param description
you aligned it differently.

>   *
>   * @retval  0 Success.
>   * @retval -1 Memory error.
> @@ -330,7 +332,8 @@ int
> @@ -340,6 +343,8 @@ xrow_encode_subscribe(struct xrow_header *row,
>   * @param[out] vclock.
>   * @param[out] version_id.
>   * @param[out] anon Whether it is an anonymous subscribe.
> + * @param[out] id_filter A list of ids to skip rows from when
> + *             feeding replica.
>   *
>   * @retval  0 Success.
>   * @retval -1 Memory or format error.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance
  2020-02-26 10:00 [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance sergepetrenko
                   ` (3 preceding siblings ...)
  2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it sergepetrenko
@ 2020-02-26 23:54 ` Vladislav Shpilevoy
  2020-02-27 21:24   ` Serge Petrenko
  4 siblings, 1 reply; 25+ messages in thread
From: Vladislav Shpilevoy @ 2020-02-26 23:54 UTC (permalink / raw)
  To: sergepetrenko, kirichenkoga, kostja.osipov, alexander.turenko
  Cc: tarantool-patches

Hi! Thanks for the patch!

Please, add a @ChangeLog record.

On 26/02/2020 11:00, sergepetrenko wrote:
> https://github.com/tarantool/tarantool/issues/4739
> https://github.com/tarantool/tarantool/tree/sp/gh-4739-vclock-assert-v4
> 
> Changes in v4:
>   - move row skipping logic from recovery to relay
>   - encode a list of instances whose rows to skip
>     in SUBSCRIBE request insead of encoding
>     is_orhpan status
> 
> Changes in v3:
>   - review fixes as per review from Vlad
>   - instead of skipping rows on replica side,
>     do it on master side, by patching recovery
>     to silently follow rows coming from a certain
>     instance.
> 
> Changes in v2:
>  - review fixes as per review from Kostja
> 
> Serge Petrenko (4):
>   box: expose box_is_orphan method
>   wal: warn when trying to write a record with a broken lsn
>   replication: implement an instance id filter for relay
>   replication: do not relay rows coming from a remote instance back to
>     it
> 
>  src/box/applier.cc         |  3 ++-
>  src/box/box.cc             | 13 +++++++++---
>  src/box/box.h              |  3 +++
>  src/box/iproto_constants.h |  1 +
>  src/box/relay.cc           |  9 ++++++++-
>  src/box/relay.h            |  3 ++-
>  src/box/wal.c              | 17 +++++++++++++---
>  src/box/xrow.c             | 41 +++++++++++++++++++++++++++++++++++---
>  src/box/xrow.h             | 34 +++++++++++++++++++++----------
>  9 files changed, 101 insertions(+), 23 deletions(-)
> 

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it
  2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it sergepetrenko
  2020-02-26 10:23   ` Konstantin Osipov
@ 2020-02-26 23:54   ` Vladislav Shpilevoy
  2020-02-27  6:52     ` Konstantin Osipov
  2020-02-27 14:13     ` Serge Petrenko
  1 sibling, 2 replies; 25+ messages in thread
From: Vladislav Shpilevoy @ 2020-02-26 23:54 UTC (permalink / raw)
  To: sergepetrenko, kirichenkoga, kostja.osipov, alexander.turenko
  Cc: tarantool-patches

Thanks for the patch!

See 4 comments below.

>     replication: do not relay rows coming from a remote instance back to it
>     
>     We have a mechanism for restoring rows originating from an instance that
>     suffered a sudden power loss: remote masters resend the isntance's rows
>     received before a certain point in time, defined by remote master vclock
>     at the moment of subscribe.
>     However, this is useful only on initial replication configuraiton, when
>     an instance has just recovered, so that it can receive what it has
>     relayed but haven't synced to disk.
>     In other cases, when an instance is operating normally and master-master
>     replication is configured, the mechanism described above may lead to
>     instance re-applying instance's own rows, coming from a master it has just
>     subscribed to.
>     To fix the problem do not relay rows coming from a remote instance, if
>     the instance has already recovered.
>     
>     Closes #4739
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 911353425..73ffc0d68 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -866,8 +866,13 @@ applier_subscribe(struct applier *applier)
>  	struct vclock vclock;
>  	vclock_create(&vclock);
>  	vclock_copy(&vclock, &replicaset.vclock);
> +	/*
> +	 * Stop accepting local rows coming from a remote
> +	 * instance as soon as local WAL starts accepting writes.
> +	 */
> +	unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id;

1. I was always wondering, what if the instance got orphaned after it
started accepting writes? WAL is fully functional, it syncs whatever is
needed, and then a resubscribe happens. Can this break anything?

>  	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
> -				 &vclock, replication_anon, 0);
> +				 &vclock, replication_anon, id_filter);
>  	coio_write_xrow(coio, &row);
>  
>  	/* Read SUBSCRIBE response */
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 27bff662a..35ba7b072 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -278,8 +278,13 @@ tx_schedule_commit(struct cmsg *msg)
>  		/* Closes the input valve. */
>  		stailq_concat(&writer->rollback, &batch->rollback);
>  	}
> +
> +	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
>  	/* Update the tx vclock to the latest written by wal. */
>  	vclock_copy(&replicaset.vclock, &batch->vclock);
> +#ifndef NDEBUG
> +skip_update:
> +#endif

2. Consider this hack which I just invented. In that way you won't
depend on ERRINJ and NDEBUG interconnection.

====================
@@ -282,9 +282,7 @@ tx_schedule_commit(struct cmsg *msg)
 	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
 	/* Update the tx vclock to the latest written by wal. */
 	vclock_copy(&replicaset.vclock, &batch->vclock);
-#ifndef NDEBUG
-skip_update:
-#endif
+	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
 	tx_schedule_queue(&batch->commit);
 	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
 }
====================

Talking of the injection itself - don't know really. Perhaps
it would be better to add a delay to the wal_write_to_disk()
function, to its very end, after wal_notify_watchers(). In
that case relay will wake up, send whatever it wants, and TX
won't update the vclock until you let wal_write_to_disk()
finish. Seems more natural this way.

>  	tx_schedule_queue(&batch->commit);
>  	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
>  }
> diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
> new file mode 100644
> index 000000000..7dc2f7118
> --- /dev/null
> +++ b/test/replication/gh-4739-vclock-assert.result
> @@ -0,0 +1,82 @@
> +-- test-run result file version 2
> +env = require('test_run')
> + | ---
> + | ...
> +test_run = env.new()
> + | ---
> + | ...
> +
> +SERVERS = {'rebootstrap1', 'rebootstrap2'}
> + | ---
> + | ...
> +test_run:create_cluster(SERVERS, "replication")
> + | ---
> + | ...
> +test_run:wait_fullmesh(SERVERS)
> + | ---
> + | ...
> +
> +test_run:cmd('switch rebootstrap1')
> + | ---
> + | - true
> + | ...
> +fiber = require('fiber')
> + | ---
> + | ...
> +-- Stop updating replicaset vclock to simulate a situation, when
> +-- a row is already relayed to the remote master, but the local
> +-- vclock update hasn't happened yet.
> +box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
> + | ---
> + | - ok
> + | ...
> +lsn = box.info.lsn
> + | ---
> + | ...
> +box.space._schema:replace{'something'}
> + | ---
> + | - ['something']
> + | ...
> +-- Vclock isn't updated.
> +box.info.lsn == lsn
> + | ---
> + | - true
> + | ...
> +
> +-- Wait until the remote instance gets the row.
> +while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\
> +    fiber.sleep(0.01)\
> +end

3. There is a cool thing which I discovered relatively recently:
test_run:wait_cond(). It does fiber sleep and while cycle, and
has a finite timeout, so such a test won't hang for 10 minutes
in Travis in case of a problem.

> + | ---
> + | ...
> +
> +-- Restart the remote instance. This will make the first instance
> +-- resubscribe without entering orphan mode.
> +test_run:cmd('restart server rebootstrap2')
> + | ---
> + | - true
> + | ...
> +test_run:cmd('switch rebootstrap1')
> + | ---
> + | - true
> + | ...
> +-- Wait until resubscribe is sent
> +fiber.sleep(2 * box.cfg.replication_timeout)

4. Don't we collect any statistics on replication requests, just
like we do in box.stat()? Perhaps box.stat.net() can help? To
wait properly. Maybe just do test_run:wait_cond() for status 'sync'?

> + | ---
> + | ...
> +box.info.replication[2].upstream.status
> + | ---
> + | - sync
> + | ...
> +
> +box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
> + | ---
> + | - ok
> + | ...
> +test_run:cmd('switch default')
> + | ---
> + | - true
> + | ...
> +test_run:drop_cluster(SERVERS)
> + | ---
> + | ...

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay
  2020-02-26 23:54   ` Vladislav Shpilevoy
@ 2020-02-27  6:48     ` Konstantin Osipov
  2020-02-27 13:15     ` Serge Petrenko
  1 sibling, 0 replies; 25+ messages in thread
From: Konstantin Osipov @ 2020-02-27  6:48 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: kirichenkoga, tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [20/02/27 09:42]:
> > +		      unsigned int id_filter)
> 
> 3. Nit - it would be better to have it as uint32_t explicitly.
> Becuase max id count is 32. Unsigned int does not have size
> guarantees, formally speaking. It is at least 16 bits, no more
> info.

We use unsigned int in struct vclock, I think vclock.h also needs
a follow up then.

> > +	if (filter_size) {
> 
> 5. I wouldn't make it optional in encoding, since this is sent
> really rare, but could simplify the code a bit. However, up to
> you.

> 
> Also, won't it break replication from an old version? You
> will send subscribe with this new key, and the old instance
> should ignore it. Does it? I don't remember.
> 
> If the old instance would ignore it, it means that the bug
> still can explode when replicating from an old version, right?
> I don't know how to fix that, but if it is true, we need to
> document that, at least.

This is true, however, it only happens at reconfiguration, 
you're not supposed to actively use a heterogeneous cluster, only
upgrade it (bootstrap).

> 
> > +				*id_filter |= 1 << mp_decode_uint(&d);
> 
> 7. If someone would send a big ID (a program, pretending to be a Tarantool
> instance), it would cause unsigned bit shift overflow, which is undefined
> behaviour. Lets check that it is not bigger than 31.
> 
> However this won't really help much. This code will crash even if I will
> just send a truncated packet. From what I see.
> 
> Up to you whether you want to fix the bit shift.

Good catch, this would be a CVE then and a huge publicity issue.
There should be no crashes when parsing malformed requests.
> 

-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it
  2020-02-26 23:54   ` Vladislav Shpilevoy
@ 2020-02-27  6:52     ` Konstantin Osipov
  2020-02-27 14:13     ` Serge Petrenko
  1 sibling, 0 replies; 25+ messages in thread
From: Konstantin Osipov @ 2020-02-27  6:52 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: kirichenkoga, tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [20/02/27 09:42]:
> > +	/*
> > +	 * Stop accepting local rows coming from a remote
> > +	 * instance as soon as local WAL starts accepting writes.
> > +	 */
> > +	unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id;
> 
> 1. I was always wondering, what if the instance got orphaned after it
> started accepting writes? WAL is fully functional, it syncs whatever is
> needed, and then a resubscribe happens. Can this break anything?

Good catch. I wanted to make this comment too, but checked the
code and it seems we're safe, since we also switch engine vtab to
readonly. 

It is important not to spoil this invariant by future commits
though, so a comment on significance of orphan mode to replication
correctness in box_set_orphan would be nice...

Or extending the comment for is_orphan variable and its difference
from is_ro variable. Basically, is_ro is a user-level setting,
which doesn't prevent writes to temporary tables and some internal
writes e.g. to _cluster table, while is_orphan is server-wide
internal setting which is expected to freeze *all* writes except
from a remote, otherwise there will be correctness issues.

> 2. Consider this hack which I just invented. In that way you won't
> depend on ERRINJ and NDEBUG interconnection.
> 
> ====================
> @@ -282,9 +282,7 @@ tx_schedule_commit(struct cmsg *msg)
>  	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
>  	/* Update the tx vclock to the latest written by wal. */
>  	vclock_copy(&replicaset.vclock, &batch->vclock);
> -#ifndef NDEBUG
> -skip_update:
> -#endif
> +	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
>  	tx_schedule_queue(&batch->commit);
>  	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
>  }
> ====================
> 
> Talking of the injection itself - don't know really. Perhaps
> it would be better to add a delay to the wal_write_to_disk()
> function, to its very end, after wal_notify_watchers(). In
> that case relay will wake up, send whatever it wants, and TX
> won't update the vclock until you let wal_write_to_disk()
> finish. Seems more natural this way.

I agree.


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay
  2020-02-26 23:54   ` Vladislav Shpilevoy
  2020-02-27  6:48     ` Konstantin Osipov
@ 2020-02-27 13:15     ` Serge Petrenko
  2020-02-27 23:33       ` Vladislav Shpilevoy
  1 sibling, 1 reply; 25+ messages in thread
From: Serge Petrenko @ 2020-02-27 13:15 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: kirichenkoga, tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 16968 bytes --]



> 27 февр. 2020 г., в 02:54, Vladislav Shpilevoy <v.shpilevoy@tarantool.org> написал(а):
> 
> Thanks for the patch!
> 
> See 8 comments below.

Hi! Thanks for the review!
Please find my comments inline and the diff below.

> 
>>    replication: implement an instance id filter for relay
>> 
>>    Add a filter for relay to skip rows coming from unwanted instances.
>>    A list of instance ids whose rows replica doesn't want to fetch is encoded
>>    together with SUBSCRIBE request after a freshly introduced flag IPROTO_ID_FILTER.
>> 
>>    Filtering rows is needed to prevent an instance from fetching its own
>>    rows from a remote master, which is useful on initial configuration and
>>    harmful on resubscribe.
>> 
>>    Prerequisite #4739, #3294
>> 
>>    @TarantoolBot document
>> 
>>    Title: document new binary protocol key and subscribe request changes
>> 
>>    Add key `IPROTO_ID_FILTER = 0x51` to the internals reference.
>>    This is an optional key used in SUBSCRIBE request followed by an array
>>    of ids of instances whose rows won't be relayed to the replica.
>> 
>>    SUBSCRIBE request is supplemented with an optional field of the
>>    following structure:
>>    ```
>>    +====================+
>>    |      ID_FILTER     |
>>    |   0x51 : ID LIST   |
>>    | MP_INT : MP_ARRRAY |
>>    |                    |
>>    +====================+
>>    ```
>>    The field is encoded only when the id list is not empty.
>> 
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index b89632273..dc89b90e2 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -109,6 +109,7 @@ struct relay {
>> 	struct vclock recv_vclock;
>> 	/** Replicatoin slave version. */
>> 	uint32_t version_id;
>> +	unsigned int id_filter;
> 
> 1. Please, add a comment here explaining what is it,
> and why is needed.

Ok.

> 
>> 	/**
>> 	 * Local vclock at the moment of subscribe, used to check
>> 	 * dataset on the other side and send missing data rows if any.
>> @@ -763,6 +767,9 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
>> 		packet->group_id = GROUP_DEFAULT;
>> 		packet->bodycnt = 0;
>> 	}
>> +	/* Check if the rows from the instance are filtered. */
>> +	if (1 << packet->replica_id & relay->id_filter)
> 
> 2. Please use explicit != 0 here and below when check 'filter_size'
> variable.

No problem.

> 
>> +		return;
>> 	/*
>> 	 * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
>> 	 * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
>> diff --git a/src/box/xrow.c b/src/box/xrow.c
>> index 968c3a202..10edbf6a8 100644
>> --- a/src/box/xrow.c
>> +++ b/src/box/xrow.c
>> @@ -1194,17 +1194,23 @@ int
>> xrow_encode_subscribe(struct xrow_header *row,
>> 		      const struct tt_uuid *replicaset_uuid,
>> 		      const struct tt_uuid *instance_uuid,
>> -		      const struct vclock *vclock, bool anon)
>> +		      const struct vclock *vclock, bool anon,
>> +		      unsigned int id_filter)
> 
> 3. Nit - it would be better to have it as uint32_t explicitly.
> Becuase max id count is 32. Unsigned int does not have size
> guarantees, formally speaking. It is at least 16 bits, no more
> info.

Done, and I’ll send a followup to vclock.h later, as Kostja suggests.

> 
>> {
>> 	memset(row, 0, sizeof(*row));
>> 	size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
>> +	unsigned int filter_size = __builtin_popcount(id_filter);
>> +	if (filter_size) {
>> +		size += mp_sizeof_array(filter_size) + filter_size *
>> +			mp_sizeof_uint(VCLOCK_MAX);
> 
> 4. You didn't add mp_sizeof_uint(IPROTO_ID_FILTER) here, but you
> didn't update XROW_BODY_LEN_MAX either. Does it fit in the current
> value? Also seems like it can't be called XROW_BODY_LEN_MAX anymore.
> Because clearly it is not enough to fit the filter array, if you
> needed to patch the allocation here.

Let’s then just double XROW_BODY_LEN_MAX up.

> 
> You could also update XROW_BODY_LEN_MAX so as it includes the biggest
> possible filter. Max filter size is just 34 bytes. This is the
> simplest option, I think.
> 
>> +	}
>> 	char *buf = (char *) region_alloc(&fiber()->gc, size);
>> 	if (buf == NULL) {
>> 		diag_set(OutOfMemory, size, "region_alloc", "buf");
>> 		return -1;
>> 	}
>> 	char *data = buf;
>> -	data = mp_encode_map(data, 5);
>> +	data = mp_encode_map(data, filter_size ? 6 : 5);
>> 	data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
>> 	data = xrow_encode_uuid(data, replicaset_uuid);
>> 	data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
>> @@ -1215,6 +1221,17 @@ xrow_encode_subscribe(struct xrow_header *row,
>> 	data = mp_encode_uint(data, tarantool_version_id());
>> 	data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
>> 	data = mp_encode_bool(data, anon);
>> +	if (filter_size) {
> 
> 5. I wouldn't make it optional in encoding, since this is sent
> really rare, but could simplify the code a bit. However, up to
> you.

I don’t like the idea to pass zero length arrays in case there is no filter,
Lets just not encode this part. The filter is initially empty on decoding side
anyway.

> 
> Also, won't it break replication from an old version? You
> will send subscribe with this new key, and the old instance
> should ignore it. Does it? I don't remember.
> 

Yes, the old instance ignores all unknown keys.

> If the old instance would ignore it, it means that the bug
> still can explode when replicating from an old version, right?
> I don't know how to fix that, but if it is true, we need to
> document that, at least.

Yes, the bug’s still here when replicating from an old instance.
To trigger it, you have to set up master-master replication between
the old and the new instances and fit some additional conditions.
I don’t think it’s usual to set up master-master when upgrading, as
Kostja has pointed out.

> 
>> +		data = mp_encode_uint(data, IPROTO_ID_FILTER);
>> +		data = mp_encode_array(data, filter_size);
>> +		struct bit_iterator it;
>> +		bit_iterator_init(&it, &id_filter, sizeof(id_filter),
>> +				  true);
>> +		for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;
>> +		     id = bit_iterator_next(&it)) {
>> +			data = mp_encode_uint(data, id);
>> +		}
>> +	}
>> 	assert(data <= buf + size);
>> 	row->body[0].iov_base = buf;
>> 	row->body[0].iov_len = (data - buf);
>> @@ -1301,6 +1321,21 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
>> 			}
>> 			*anon = mp_decode_bool(&d);
>> 			break;
>> +		case IPROTO_ID_FILTER:
>> +			if (id_filter == NULL)
>> +				goto skip;
>> +			if (mp_typeof(*d) != MP_ARRAY) {
>> +decode_err:			xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
>> +						   "invalid id_filter");
> 
> 6. Lets say it in caps: ID_FILTER, just like with other decode
> error messages. Also I would name this label 'id_filter_decode_err',
> becuase other keys can't jump here.

Okay.

> 
>> +				return -1;
>> +			}
>> +			uint32_t len = mp_decode_array(&d);
>> +			for(uint32_t i = 0; i < len; ++i) {
>> +				if (mp_typeof(*d) != MP_UINT)
>> +					goto decode_err;
>> +				*id_filter |= 1 << mp_decode_uint(&d);
> 
> 7. If someone would send a big ID (a program, pretending to be a Tarantool
> instance), it would cause unsigned bit shift overflow, which is undefined
> behaviour. Lets check that it is not bigger than 31.
> 
> However this won't really help much. This code will crash even if I will
> just send a truncated packet. From what I see.

I’m not sure I understand what you’re speaking about. This piece of code is
similar to the one we have in mp_decode_vclock. The situation didn’t get worse,
at least.

> 
> Up to you whether you want to fix the bit shift.

Fixed.

> 
>> +			}
>> +			break;
>> 		default: skip:
>> 			mp_next(&d); /* value */
>> 		}
>> diff --git a/src/box/xrow.h b/src/box/xrow.h
>> index 0973c497d..8e5716b30 100644
>> --- a/src/box/xrow.h
>> +++ b/src/box/xrow.h
>> @@ -322,6 +322,8 @@ xrow_encode_register(struct xrow_header *row,
>>  * @param instance_uuid Instance uuid.
>>  * @param vclock Replication clock.
>>  * @param anon Whether it is an anonymous subscribe request or not.
>> + * @param id_filter A List of replica ids to skip rows from
>> + *                      when feeding a replica.
> 
> 8. Looks like a huge indentation. Keep if you want. I noticed
> just in case it was an accident. In the next @param description
> you aligned it differently.

Thanks! I fixed both pieces.

> 
>>  *
>>  * @retval  0 Success.
>>  * @retval -1 Memory error.
>> @@ -330,7 +332,8 @@ int
>> @@ -340,6 +343,8 @@ xrow_encode_subscribe(struct xrow_header *row,
>>  * @param[out] vclock.
>>  * @param[out] version_id.
>>  * @param[out] anon Whether it is an anonymous subscribe.
>> + * @param[out] id_filter A list of ids to skip rows from when
>> + *             feeding replica.
>>  *
>>  * @retval  0 Success.
>>  * @retval -1 Memory or format error.


diff --git a/src/box/box.cc b/src/box/box.cc
index 94267c74e..09dd67ab4 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1787,7 +1787,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 	uint32_t replica_version_id;
 	vclock_create(&replica_clock);
 	bool anon;
-	unsigned int id_filter;
+	uint32_t id_filter;
 	xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock,
 				 &replica_version_id, &anon, &id_filter);
 
diff --git a/src/box/relay.cc b/src/box/relay.cc
index dc89b90e2..95245a3cf 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -109,7 +109,13 @@ struct relay {
 	struct vclock recv_vclock;
 	/** Replicatoin slave version. */
 	uint32_t version_id;
-	unsigned int id_filter;
+	/**
+	 * A filter of replica ids whose rows should be ignored.
+	 * Each set filter bit corresponds to a replica id whose
+	 * rows shouldn't be relayed. The list of ids to ignore
+	 * is passed by the replica on subscribe.
+	 */
+	uint32_t id_filter;
 	/**
 	 * Local vclock at the moment of subscribe, used to check
 	 * dataset on the other side and send missing data rows if any.
@@ -678,7 +684,7 @@ relay_subscribe_f(va_list ap)
 void
 relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 		struct vclock *replica_clock, uint32_t replica_version_id,
-		unsigned int replica_id_filter)
+		uint32_t replica_id_filter)
 {
 	assert(replica->anon || replica->id != REPLICA_ID_NIL);
 	struct relay *relay = replica->relay;
@@ -768,7 +774,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 		packet->bodycnt = 0;
 	}
 	/* Check if the rows from the instance are filtered. */
-	if (1 << packet->replica_id & relay->id_filter)
+	if ((1 << packet->replica_id & relay->id_filter) != 0)
 		return;
 	/*
 	 * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
diff --git a/src/box/relay.h b/src/box/relay.h
index 6e7eebab1..0632fa912 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -125,6 +125,6 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 void
 relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 		struct vclock *replica_vclock, uint32_t replica_version_id,
-		unsigned int replica_id_filter);
+		uint32_t replica_id_filter);
 
 #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 10edbf6a8..602049004 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1195,22 +1195,18 @@ xrow_encode_subscribe(struct xrow_header *row,
 		      const struct tt_uuid *replicaset_uuid,
 		      const struct tt_uuid *instance_uuid,
 		      const struct vclock *vclock, bool anon,
-		      unsigned int id_filter)
+		      uint32_t id_filter)
 {
 	memset(row, 0, sizeof(*row));
 	size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
-	unsigned int filter_size = __builtin_popcount(id_filter);
-	if (filter_size) {
-		size += mp_sizeof_array(filter_size) + filter_size *
-			mp_sizeof_uint(VCLOCK_MAX);
-	}
 	char *buf = (char *) region_alloc(&fiber()->gc, size);
 	if (buf == NULL) {
 		diag_set(OutOfMemory, size, "region_alloc", "buf");
 		return -1;
 	}
 	char *data = buf;
-	data = mp_encode_map(data, filter_size ? 6 : 5);
+	int filter_size = __builtin_popcount(id_filter);
+	data = mp_encode_map(data, filter_size != 0 ? 6 : 5);
 	data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
 	data = xrow_encode_uuid(data, replicaset_uuid);
 	data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
@@ -1221,7 +1217,7 @@ xrow_encode_subscribe(struct xrow_header *row,
 	data = mp_encode_uint(data, tarantool_version_id());
 	data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
 	data = mp_encode_bool(data, anon);
-	if (filter_size) {
+	if (filter_size != 0) {
 		data = mp_encode_uint(data, IPROTO_ID_FILTER);
 		data = mp_encode_array(data, filter_size);
 		struct bit_iterator it;
@@ -1244,7 +1240,7 @@ int
 xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 		      struct tt_uuid *instance_uuid, struct vclock *vclock,
 		      uint32_t *version_id, bool *anon,
-		      unsigned int *id_filter)
+		      uint32_t *id_filter)
 {
 	if (row->bodycnt == 0) {
 		diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -1325,15 +1321,18 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 			if (id_filter == NULL)
 				goto skip;
 			if (mp_typeof(*d) != MP_ARRAY) {
-decode_err:			xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
-						   "invalid id_filter");
+id_filter_decode_err:		xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+						   "invalid ID_FILTER");
 				return -1;
 			}
 			uint32_t len = mp_decode_array(&d);
-			for(uint32_t i = 0; i < len; ++i) {
+			for (uint32_t i = 0; i < len; ++i) {
 				if (mp_typeof(*d) != MP_UINT)
-					goto decode_err;
-				*id_filter |= 1 << mp_decode_uint(&d);
+					goto id_filter_decode_err;
+				uint64_t val = mp_decode_uint(&d);
+				if (val >= VCLOCK_MAX)
+					goto id_filter_decode_err;
+				*id_filter |= 1 << val;
 			}
 			break;
 		default: skip:
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 8e5716b30..2a0a9c852 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -48,7 +48,7 @@ enum {
 	XROW_BODY_IOVMAX = 2,
 	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
 	XROW_HEADER_LEN_MAX = 52,
-	XROW_BODY_LEN_MAX = 128,
+	XROW_BODY_LEN_MAX = 256,
 	IPROTO_HEADER_LEN = 28,
 	/** 7 = sizeof(iproto_body_bin). */
 	IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7,
@@ -323,7 +323,7 @@ xrow_encode_register(struct xrow_header *row,
  * @param vclock Replication clock.
  * @param anon Whether it is an anonymous subscribe request or not.
  * @param id_filter A List of replica ids to skip rows from
- *                      when feeding a replica.
+ *		    when feeding a replica.
  *
  * @retval  0 Success.
  * @retval -1 Memory error.
@@ -333,7 +333,7 @@ xrow_encode_subscribe(struct xrow_header *row,
 		      const struct tt_uuid *replicaset_uuid,
 		      const struct tt_uuid *instance_uuid,
 		      const struct vclock *vclock, bool anon,
-		      unsigned int id_filter);
+		      uint32_t id_filter);
 
 /**
  * Decode SUBSCRIBE command.
@@ -344,7 +344,7 @@ xrow_encode_subscribe(struct xrow_header *row,
  * @param[out] version_id.
  * @param[out] anon Whether it is an anonymous subscribe.
  * @param[out] id_filter A list of ids to skip rows from when
- *             feeding replica.
+ *			 feeding a replica.
  *
  * @retval  0 Success.
  * @retval -1 Memory or format error.
@@ -353,7 +353,7 @@ int
 xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 		      struct tt_uuid *instance_uuid, struct vclock *vclock,
 		      uint32_t *version_id, bool *anon,
-		      unsigned int *id_filter);
+		      uint32_t *id_filter);
 
 /**
  * Encode JOIN command.
@@ -827,7 +827,7 @@ xrow_encode_subscribe_xc(struct xrow_header *row,
 			 const struct tt_uuid *replicaset_uuid,
 			 const struct tt_uuid *instance_uuid,
 			 const struct vclock *vclock, bool anon,
-			 unsigned int id_filter)
+			 uint32_t id_filter)
 {
 	if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
 				  vclock, anon, id_filter) != 0)
@@ -840,7 +840,7 @@ xrow_decode_subscribe_xc(struct xrow_header *row,
 			 struct tt_uuid *replicaset_uuid,
 			 struct tt_uuid *instance_uuid, struct vclock *vclock,
 			 uint32_t *replica_version_id, bool *anon,
-			 unsigned int *id_filter)
+			 uint32_t *id_filter)
 {
 	if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
 				  vclock, replica_version_id, anon,

--
Serge Petrenko
sergepetrenko@tarantool.org <mailto:sergepetrenko@tarantool.org>



[-- Attachment #2: Type: text/html, Size: 43466 bytes --]

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it
  2020-02-26 23:54   ` Vladislav Shpilevoy
  2020-02-27  6:52     ` Konstantin Osipov
@ 2020-02-27 14:13     ` Serge Petrenko
  2020-02-27 21:17       ` Serge Petrenko
  1 sibling, 1 reply; 25+ messages in thread
From: Serge Petrenko @ 2020-02-27 14:13 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: kirichenkoga, tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 10627 bytes --]


> 27 февр. 2020 г., в 02:54, Vladislav Shpilevoy <v.shpilevoy@tarantool.org> написал(а):
> 
> Thanks for the patch!
> 

Hi! Thanks for the review!

Please find my comments and the new diff below.

> See 4 comments below.
> 
>>    replication: do not relay rows coming from a remote instance back to it
>> 
>>    We have a mechanism for restoring rows originating from an instance that
>>    suffered a sudden power loss: remote masters resend the isntance's rows
>>    received before a certain point in time, defined by remote master vclock
>>    at the moment of subscribe.
>>    However, this is useful only on initial replication configuraiton, when
>>    an instance has just recovered, so that it can receive what it has
>>    relayed but haven't synced to disk.
>>    In other cases, when an instance is operating normally and master-master
>>    replication is configured, the mechanism described above may lead to
>>    instance re-applying instance's own rows, coming from a master it has just
>>    subscribed to.
>>    To fix the problem do not relay rows coming from a remote instance, if
>>    the instance has already recovered.
>> 
>>    Closes #4739
>> 
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 911353425..73ffc0d68 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -866,8 +866,13 @@ applier_subscribe(struct applier *applier)
>> 	struct vclock vclock;
>> 	vclock_create(&vclock);
>> 	vclock_copy(&vclock, &replicaset.vclock);
>> +	/*
>> +	 * Stop accepting local rows coming from a remote
>> +	 * instance as soon as local WAL starts accepting writes.
>> +	 */
>> +	unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id;
> 
> 1. I was always wondering, what if the instance got orphaned after it
> started accepting writes? WAL is fully functional, it syncs whatever is
> needed, and then a resubscribe happens. Can this break anything?
> 
>> 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
>> -				 &vclock, replication_anon, 0);
>> +				 &vclock, replication_anon, id_filter);
>> 	coio_write_xrow(coio, &row);
>> 
>> 	/* Read SUBSCRIBE response */
>> diff --git a/src/box/wal.c b/src/box/wal.c
>> index 27bff662a..35ba7b072 100644
>> --- a/src/box/wal.c
>> +++ b/src/box/wal.c
>> @@ -278,8 +278,13 @@ tx_schedule_commit(struct cmsg *msg)
>> 		/* Closes the input valve. */
>> 		stailq_concat(&writer->rollback, &batch->rollback);
>> 	}
>> +
>> +	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
>> 	/* Update the tx vclock to the latest written by wal. */
>> 	vclock_copy(&replicaset.vclock, &batch->vclock);
>> +#ifndef NDEBUG
>> +skip_update:
>> +#endif
> 
> 2. Consider this hack which I just invented. In that way you won't
> depend on ERRINJ and NDEBUG interconnection.
> 
> ====================
> @@ -282,9 +282,7 @@ tx_schedule_commit(struct cmsg *msg)
> 	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
> 	/* Update the tx vclock to the latest written by wal. */
> 	vclock_copy(&replicaset.vclock, &batch->vclock);
> -#ifndef NDEBUG
> -skip_update:
> -#endif
> +	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
> 	tx_schedule_queue(&batch->commit);
> 	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
> }
> ====================

Good one, applied.

> 
> Talking of the injection itself - don't know really. Perhaps
> it would be better to add a delay to the wal_write_to_disk()
> function, to its very end, after wal_notify_watchers(). In
> that case relay will wake up, send whatever it wants, and TX
> won't update the vclock until you let wal_write_to_disk()
> finish. Seems more natural this way.

I tried to add a sleep first. It’s impossible to sleep in tx_schedule_commit(),
since it’s processed in tx_prio endpoint, where yielding is impossible.
I also tried to add a sleep at the end of wal_write_to_disk(), just like you
suggest. This didn’t work out either. I’ll give you more details in the evening,
when I give it another try. I’ll send a follow-up if I succeed with adding a sleep.

> 
>> 	tx_schedule_queue(&batch->commit);
>> 	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
>> }
>> diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
>> new file mode 100644
>> index 000000000..7dc2f7118
>> --- /dev/null
>> +++ b/test/replication/gh-4739-vclock-assert.result
>> @@ -0,0 +1,82 @@
>> +-- test-run result file version 2
>> +env = require('test_run')
>> + | ---
>> + | ...
>> +test_run = env.new()
>> + | ---
>> + | ...
>> +
>> +SERVERS = {'rebootstrap1', 'rebootstrap2'}
>> + | ---
>> + | ...
>> +test_run:create_cluster(SERVERS, "replication")
>> + | ---
>> + | ...
>> +test_run:wait_fullmesh(SERVERS)
>> + | ---
>> + | ...
>> +
>> +test_run:cmd('switch rebootstrap1')
>> + | ---
>> + | - true
>> + | ...
>> +fiber = require('fiber')
>> + | ---
>> + | ...
>> +-- Stop updating replicaset vclock to simulate a situation, when
>> +-- a row is already relayed to the remote master, but the local
>> +-- vclock update hasn't happened yet.
>> +box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
>> + | ---
>> + | - ok
>> + | ...
>> +lsn = box.info.lsn
>> + | ---
>> + | ...
>> +box.space._schema:replace{'something'}
>> + | ---
>> + | - ['something']
>> + | ...
>> +-- Vclock isn't updated.
>> +box.info.lsn == lsn
>> + | ---
>> + | - true
>> + | ...
>> +
>> +-- Wait until the remote instance gets the row.
>> +while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\
>> +    fiber.sleep(0.01)\
>> +end
> 
> 3. There is a cool thing which I discovered relatively recently:
> test_run:wait_cond(). It does fiber sleep and while cycle, and
> has a finite timeout, so such a test won't hang for 10 minutes
> in Travis in case of a problem.

Thanks!

> 
>> + | ---
>> + | ...
>> +
>> +-- Restart the remote instance. This will make the first instance
>> +-- resubscribe without entering orphan mode.
>> +test_run:cmd('restart server rebootstrap2')
>> + | ---
>> + | - true
>> + | ...
>> +test_run:cmd('switch rebootstrap1')
>> + | ---
>> + | - true
>> + | ...
>> +-- Wait until resubscribe is sent
>> +fiber.sleep(2 * box.cfg.replication_timeout)
> 
> 4. Don't we collect any statistics on replication requests, just
> like we do in box.stat()? Perhaps box.stat.net() can help? To
> wait properly. Maybe just do test_run:wait_cond() for status 'sync'?

wait_cond for ’sync’ is enough. Applied.

> 
>> + | ---
>> + | ...
>> +box.info.replication[2].upstream.status
>> + | ---
>> + | - sync
>> + | ...
>> +
>> +box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
>> + | ---
>> + | - ok
>> + | ...
>> +test_run:cmd('switch default')
>> + | ---
>> + | - true
>> + | ...
>> +test_run:drop_cluster(SERVERS)
>> + | ---
>> + | …

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 73ffc0d68..78f3d8a73 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -870,7 +870,7 @@ applier_subscribe(struct applier *applier)
 	 * Stop accepting local rows coming from a remote
 	 * instance as soon as local WAL starts accepting writes.
 	 */
-	unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id;
+	uint32_t id_filter = box_is_orphan() ? 0 : 1 << instance_id;
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
 				 &vclock, replication_anon, id_filter);
 	coio_write_xrow(coio, &row);
diff --git a/src/box/wal.c b/src/box/wal.c
index 35ba7b072..bf127b259 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -282,9 +282,7 @@ tx_schedule_commit(struct cmsg *msg)
 	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
 	/* Update the tx vclock to the latest written by wal. */
 	vclock_copy(&replicaset.vclock, &batch->vclock);
-#ifndef NDEBUG
-skip_update:
-#endif
+	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
 	tx_schedule_queue(&batch->commit);
 	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
 }
diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
index 7dc2f7118..a612826a0 100644
--- a/test/replication/gh-4739-vclock-assert.result
+++ b/test/replication/gh-4739-vclock-assert.result
@@ -44,10 +44,11 @@ box.info.lsn == lsn
  | ...
 
 -- Wait until the remote instance gets the row.
-while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\
-    fiber.sleep(0.01)\
-end
+test_run:wait_cond(function()\
+    return test_run:get_vclock('rebootstrap2')[box.info.id] > lsn\
+end, 10)
  | ---
+ | - true
  | ...
 
 -- Restart the remote instance. This will make the first instance
@@ -61,14 +62,12 @@ test_run:cmd('switch rebootstrap1')
  | - true
  | ...
 -- Wait until resubscribe is sent
-fiber.sleep(2 * box.cfg.replication_timeout)
- | ---
- | ...
-box.info.replication[2].upstream.status
+test_run:wait_cond(function()\
+    return box.info.replication[2].upstream.status == 'sync'\
+end, 10)
  | ---
- | - sync
+ | - true
  | ...
-
 box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
  | ---
  | - ok
diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua
index 26dc781e2..b6a7caf3b 100644
--- a/test/replication/gh-4739-vclock-assert.test.lua
+++ b/test/replication/gh-4739-vclock-assert.test.lua
@@ -17,18 +17,18 @@ box.space._schema:replace{'something'}
 box.info.lsn == lsn
 
 -- Wait until the remote instance gets the row.
-while test_run:get_vclock('rebootstrap2')[box.info.id] == lsn do\
-    fiber.sleep(0.01)\
-end
+test_run:wait_cond(function()\
+    return test_run:get_vclock('rebootstrap2')[box.info.id] > lsn\
+end, 10)
 
 -- Restart the remote instance. This will make the first instance
 -- resubscribe without entering orphan mode.
 test_run:cmd('restart server rebootstrap2')
 test_run:cmd('switch rebootstrap1')
 -- Wait until resubscribe is sent
-fiber.sleep(2 * box.cfg.replication_timeout)
-box.info.replication[2].upstream.status
-
+test_run:wait_cond(function()\
+    return box.info.replication[2].upstream.status == 'sync'\
+end, 10)
 box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
 test_run:cmd('switch default')
 test_run:drop_cluster(SERVERS)

--
Serge Petrenko
sergepetrenko@tarantool.org



[-- Attachment #2: Type: text/html, Size: 19192 bytes --]

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it
  2020-02-27 14:13     ` Serge Petrenko
@ 2020-02-27 21:17       ` Serge Petrenko
  2020-02-27 23:22         ` Vladislav Shpilevoy
  0 siblings, 1 reply; 25+ messages in thread
From: Serge Petrenko @ 2020-02-27 21:17 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: kirichenkoga, tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 11747 bytes --]


  
>Четверг, 27 февраля 2020, 17:13 +03:00 от Serge Petrenko <sergepetrenko@tarantool.org>:
> 
> 
>>27 февр. 2020 г., в 02:54, Vladislav Shpilevoy < v.shpilevoy@tarantool.org > написал(а):  
>>Thanks for the patch!
>> 
> 
>Hi! Thanks for the review!
> 
>Please find my comments and the new diff below.  
>>See 4 comments below.
>>  
>>>   replication: do not relay rows coming from a remote instance back to it
>>>
>>>   We have a mechanism for restoring rows originating from an instance that
>>>   suffered a sudden power loss: remote masters resend the isntance's rows
>>>   received before a certain point in time, defined by remote master vclock
>>>   at the moment of subscribe.
>>>   However, this is useful only on initial replication configuraiton, when
>>>   an instance has just recovered, so that it can receive what it has
>>>   relayed but haven't synced to disk.
>>>   In other cases, when an instance is operating normally and master-master
>>>   replication is configured, the mechanism described above may lead to
>>>   instance re-applying instance's own rows, coming from a master it has just
>>>   subscribed to.
>>>   To fix the problem do not relay rows coming from a remote instance, if
>>>   the instance has already recovered.
>>>
>>>   Closes #4739
>>>
>>>diff --git a/src/box/ applier.cc b/src/box/ applier.cc
>>>index 911353425..73ffc0d68 100644
>>>--- a/src/box/ applier.cc
>>>+++ b/src/box/ applier.cc
>>>@@ -866,8 +866,13 @@ applier_subscribe(struct applier *applier)
>>>struct vclock vclock;
>>>vclock_create(&vclock);
>>>vclock_copy(&vclock, &replicaset.vclock);
>>>+ /*
>>>+ * Stop accepting local rows coming from a remote
>>>+ * instance as soon as local WAL starts accepting writes.
>>>+ */
>>>+ unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id;
>>1. I was always wondering, what if the instance got orphaned after it
>>started accepting writes? WAL is fully functional, it syncs whatever is
>>needed, and then a resubscribe happens. Can this break anything?
>>  
>>>xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
>>>- &vclock, replication_anon, 0);
>>>+ &vclock, replication_anon, id_filter);
>>>coio_write_xrow(coio, &row);
>>>
>>>/* Read SUBSCRIBE response */
>>>diff --git a/src/box/wal.c b/src/box/wal.c
>>>index 27bff662a..35ba7b072 100644
>>>--- a/src/box/wal.c
>>>+++ b/src/box/wal.c
>>>@@ -278,8 +278,13 @@ tx_schedule_commit(struct cmsg *msg)
>>>/* Closes the input valve. */
>>>stailq_concat(&writer->rollback, &batch->rollback);
>>>}
>>>+
>>>+ ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
>>>/* Update the tx vclock to the latest written by wal. */
>>>vclock_copy(&replicaset.vclock, &batch->vclock);
>>>+#ifndef NDEBUG
>>>+skip_update:
>>>+#endif
>>2. Consider this hack which I just invented. In that way you won't
>>depend on ERRINJ and NDEBUG interconnection.
>>
>>====================
>>@@ -282,9 +282,7 @@ tx_schedule_commit(struct cmsg *msg)
>>ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
>>/* Update the tx vclock to the latest written by wal. */
>>vclock_copy(&replicaset.vclock, &batch->vclock);
>>-#ifndef NDEBUG
>>-skip_update:
>>-#endif
>>+ ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
>>tx_schedule_queue(&batch->commit);
>>mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
>>}
>>====================
> 
>Good one, applied.  
>>
>>Talking of the injection itself - don't know really. Perhaps
>>it would be better to add a delay to the wal_write_to_disk()
>>function, to its very end, after wal_notify_watchers(). In
>>that case relay will wake up, send whatever it wants, and TX
>>won't update the vclock until you let wal_write_to_disk()
>>finish. Seems more natural this way.
> 
>I tried to add a sleep first. It’s impossible to sleep in tx_schedule_commit(),
>since it’s processed in tx_prio endpoint, where yielding is impossible.
>I also tried to add a sleep at the end of wal_write_to_disk(), just like you
>suggest. This didn’t work out either. I’ll give you more details in the evening,
>when I give it another try. I’ll send a follow-up if I succeed with adding a sleep.
 
Ok. I tried to add either ERROR_INJECT_YIELD or ERROR_INJECT_SLEEP at the end
of wal_write_to_disk().
It looks like you cannot yield in wal_write_to_disk(). (is it possible to yield in WAL thread at all?)
Firing this injection with ERROR_INJECT_YIELD and then resetting it leads to wal thread stopping
processing messages. This leads to tarantool hanging infinitely on shutdown when tx waits for wal
thread to exit, but wal never gets the shutdown signal.
 
Using ERROR_INJECT_SLEEP leads to wal watchers not being notified until the injection is reset. This
probably happens because of  wal_notify_watchers’ use of cpipe_flush_input(), which doesn’t flush the input until
the end of event loop iteration, if there are not enough messages (only one message in our case).
The event loop iteration never ends, because we sleep right after wal_notify_watchers() call.
 
So, I see skipping vclock assignment in tx_schedule_commit() as the only possible alternative.
Hope my explanation was clear enough and, more imortantly, correct. If not, lest discuss.
>>  
>>>tx_schedule_queue(&batch->commit);
>>>mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
>>>}
>>>diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
>>>new file mode 100644
>>>index 000000000..7dc2f7118
>>>--- /dev/null
>>>+++ b/test/replication/gh-4739-vclock-assert.result
>>>@@ -0,0 +1,82 @@
>>>+-- test-run result file version 2
>>>+env = require('test_run')
>>>+ | ---
>>>+ | ...
>>>+test_run = env.new()
>>>+ | ---
>>>+ | ...
>>>+
>>>+SERVERS = {'rebootstrap1', 'rebootstrap2'}
>>>+ | ---
>>>+ | ...
>>>+test_run:create_cluster(SERVERS, "replication")
>>>+ | ---
>>>+ | ...
>>>+test_run:wait_fullmesh(SERVERS)
>>>+ | ---
>>>+ | ...
>>>+
>>>+test_run:cmd('switch rebootstrap1')
>>>+ | ---
>>>+ | - true
>>>+ | ...
>>>+fiber = require('fiber')
>>>+ | ---
>>>+ | ...
>>>+-- Stop updating replicaset vclock to simulate a situation, when
>>>+-- a row is already relayed to the remote master, but the local
>>>+-- vclock update hasn't happened yet.
>>>+box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
>>>+ | ---
>>>+ | - ok
>>>+ | ...
>>>+lsn = box.info.lsn
>>>+ | ---
>>>+ | ...
>>>+box.space._schema:replace{'something'}
>>>+ | ---
>>>+ | - ['something']
>>>+ | ...
>>>+-- Vclock isn't updated.
>>>+box.info.lsn == lsn
>>>+ | ---
>>>+ | - true
>>>+ | ...
>>>+
>>>+-- Wait until the remote instance gets the row.
>>>+while test_run:get_vclock('rebootstrap2')[ box.info.id ] == lsn do\
>>>+    fiber.sleep(0.01)\
>>>+end
>>3. There is a cool thing which I discovered relatively recently:
>>test_run:wait_cond(). It does fiber sleep and while cycle, and
>>has a finite timeout, so such a test won't hang for 10 minutes
>>in Travis in case of a problem.
> 
>Thanks!  
>>  
>>>+ | ---
>>>+ | ...
>>>+
>>>+-- Restart the remote instance. This will make the first instance
>>>+-- resubscribe without entering orphan mode.
>>>+test_run:cmd('restart server rebootstrap2')
>>>+ | ---
>>>+ | - true
>>>+ | ...
>>>+test_run:cmd('switch rebootstrap1')
>>>+ | ---
>>>+ | - true
>>>+ | ...
>>>+-- Wait until resubscribe is sent
>>>+fiber.sleep(2 * box.cfg.replication_timeout)
>>4. Don't we collect any statistics on replication requests, just
>>like we do in box.stat()? Perhaps  box.stat.net () can help? To
>>wait properly. Maybe just do test_run:wait_cond() for status 'sync'?
> 
>wait_cond for ’sync’ is enough. Applied.  
>>  
>>>+ | ---
>>>+ | ...
>>>+box.info.replication[2].upstream.status
>>>+ | ---
>>>+ | - sync
>>>+ | ...
>>>+
>>>+box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
>>>+ | ---
>>>+ | - ok
>>>+ | ...
>>>+test_run:cmd('switch default')
>>>+ | ---
>>>+ | - true
>>>+ | ...
>>>+test_run:drop_cluster(SERVERS)
>>>+ | ---
>>>+ | … 
>diff --git a/src/box/ applier.cc b/src/box/ applier.cc
>index 73ffc0d68..78f3d8a73 100644
>--- a/src/box/ applier.cc
>+++ b/src/box/ applier.cc
>@@ -870,7 +870,7 @@ applier_subscribe(struct applier *applier)
>   * Stop accepting local rows coming from a remote
>   * instance as soon as local WAL starts accepting writes.
>   */
>- unsigned int id_filter = box_is_orphan() ? 0 : 1 << instance_id;
>+ uint32_t id_filter = box_is_orphan() ? 0 : 1 << instance_id;
>  xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
>   &vclock, replication_anon, id_filter);
>  coio_write_xrow(coio, &row);
>diff --git a/src/box/wal.c b/src/box/wal.c
>index 35ba7b072..bf127b259 100644
>--- a/src/box/wal.c
>+++ b/src/box/wal.c
>@@ -282,9 +282,7 @@ tx_schedule_commit(struct cmsg *msg)
>  ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
>  /* Update the tx vclock to the latest written by wal. */
>  vclock_copy(&replicaset.vclock, &batch->vclock);
>-#ifndef NDEBUG
>-skip_update:
>-#endif
>+ ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
>  tx_schedule_queue(&batch->commit);
>  mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
> }
>diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
>index 7dc2f7118..a612826a0 100644
>--- a/test/replication/gh-4739-vclock-assert.result
>+++ b/test/replication/gh-4739-vclock-assert.result
>@@ -44,10 +44,11 @@ box.info.lsn == lsn
>  | ...
> 
> -- Wait until the remote instance gets the row.
>-while test_run:get_vclock('rebootstrap2')[ box.info.id ] == lsn do\
>-    fiber.sleep(0.01)\
>-end
>+test_run:wait_cond(function()\
>+    return test_run:get_vclock('rebootstrap2')[ box.info.id ] > lsn\
>+end, 10)
>  | ---
>+ | - true
>  | ...
> 
> -- Restart the remote instance. This will make the first instance
>@@ -61,14 +62,12 @@ test_run:cmd('switch rebootstrap1')
>  | - true
>  | ...
> -- Wait until resubscribe is sent
>-fiber.sleep(2 * box.cfg.replication_timeout)
>- | ---
>- | ...
>-box.info.replication[2].upstream.status
>+test_run:wait_cond(function()\
>+    return box.info.replication[2].upstream.status == 'sync'\
>+end, 10)
>  | ---
>- | - sync
>+ | - true
>  | ...
>-
> box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
>  | ---
>  | - ok
>diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua
>index 26dc781e2..b6a7caf3b 100644
>--- a/test/replication/gh-4739-vclock-assert.test.lua
>+++ b/test/replication/gh-4739-vclock-assert.test.lua
>@@ -17,18 +17,18 @@ box.space._schema:replace{'something'}
> box.info.lsn == lsn
> 
> -- Wait until the remote instance gets the row.
>-while test_run:get_vclock('rebootstrap2')[ box.info.id ] == lsn do\
>-    fiber.sleep(0.01)\
>-end
>+test_run:wait_cond(function()\
>+    return test_run:get_vclock('rebootstrap2')[ box.info.id ] > lsn\
>+end, 10)
> 
> -- Restart the remote instance. This will make the first instance
> -- resubscribe without entering orphan mode.
> test_run:cmd('restart server rebootstrap2')
> test_run:cmd('switch rebootstrap1')
> -- Wait until resubscribe is sent
>-fiber.sleep(2 * box.cfg.replication_timeout)
>-box.info.replication[2].upstream.status
>-
>+test_run:wait_cond(function()\
>+    return box.info.replication[2].upstream.status == 'sync'\
>+end, 10)
> box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
> test_run:cmd('switch default')
> test_run:drop_cluster(SERVERS)
> 
>--
>Serge Petrenko
>sergepetrenko@tarantool.org
> 
 

[-- Attachment #2: Type: text/html, Size: 18126 bytes --]

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance
  2020-02-26 23:54 ` [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance Vladislav Shpilevoy
@ 2020-02-27 21:24   ` Serge Petrenko
  2020-02-27 23:24     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 25+ messages in thread
From: Serge Petrenko @ 2020-02-27 21:24 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: kirichenkoga, tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 1650 bytes --]


  
>Четверг, 27 февраля 2020, 2:54 +03:00 от Vladislav Shpilevoy <v.shpilevoy@tarantool.org>:
> 
>Hi! Thanks for the patch!
>
>Please, add a @ChangeLog record.
Hi!
What kind of a changelog should I write?
This is a bug fix with no visible behaviour change (speaking from the users point of view)
--
Serge Petrenko
 
>
>On 26/02/2020 11:00, sergepetrenko wrote:
>>  https://github.com/tarantool/tarantool/issues/4739
>>  https://github.com/tarantool/tarantool/tree/sp/gh-4739-vclock-assert-v4
>>
>> Changes in v4:
>> - move row skipping logic from recovery to relay
>> - encode a list of instances whose rows to skip
>> in SUBSCRIBE request insead of encoding
>> is_orhpan status
>>
>> Changes in v3:
>> - review fixes as per review from Vlad
>> - instead of skipping rows on replica side,
>> do it on master side, by patching recovery
>> to silently follow rows coming from a certain
>> instance.
>>
>> Changes in v2:
>> - review fixes as per review from Kostja
>>
>> Serge Petrenko (4):
>> box: expose box_is_orphan method
>> wal: warn when trying to write a record with a broken lsn
>> replication: implement an instance id filter for relay
>> replication: do not relay rows coming from a remote instance back to
>> it
>>
>> src/box/applier.cc | 3 ++-
>> src/box/box.cc | 13 +++++++++---
>> src/box/box.h | 3 +++
>> src/box/iproto_constants.h | 1 +
>> src/box/relay.cc | 9 ++++++++-
>> src/box/relay.h | 3 ++-
>> src/box/wal.c | 17 +++++++++++++---
>> src/box/xrow.c | 41 +++++++++++++++++++++++++++++++++++---
>> src/box/xrow.h | 34 +++++++++++++++++++++----------
>> 9 files changed, 101 insertions(+), 23 deletions(-)
>>
 

[-- Attachment #2: Type: text/html, Size: 2605 bytes --]

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it
  2020-02-27 21:17       ` Serge Petrenko
@ 2020-02-27 23:22         ` Vladislav Shpilevoy
  2020-02-28  8:03           ` Serge Petrenko
  0 siblings, 1 reply; 25+ messages in thread
From: Vladislav Shpilevoy @ 2020-02-27 23:22 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: kirichenkoga, tarantool-patches

> Ok. I tried to add either ERROR_INJECT_YIELD or ERROR_INJECT_SLEEP at the end
> of wal_write_to_disk().
> It looks like you cannot yield in wal_write_to_disk(). (is it possible to yield in WAL thread at all?)
> Firing this injection with ERROR_INJECT_YIELD and then resetting it leads to wal thread stopping
> processing messages. This leads to tarantool hanging infinitely on shutdown when tx waits for wal
> thread to exit, but wal never gets the shutdown signal.
>  
> Using ERROR_INJECT_SLEEP leads to wal watchers not being notified until the injection is reset. This
> probably happens because of  wal_notify_watchers’ use of cpipe_flush_input(), which doesn’t flush the input until
> the end of event loop iteration, if there are not enough messages (only one message in our case).
> The event loop iteration never ends, because we sleep right after wal_notify_watchers() call.
>  
> So, I see skipping vclock assignment in tx_schedule_commit() as the only possible alternative.
> Hope my explanation was clear enough and, more imortantly, correct. If not, lest discuss.

Yeah, I stumbled into the same problems. And realized that the current test,
after all, is not valid. So we need to change it anyway.

First I tried to solve them by trying to block TX thread totally on one
instance after it tried to commit something. Since that would block the
test too, I tried to introduce a new thread - errinj thread, which would
listen on an ip/port or a unix socket, and will receive requests to set
error injections from another instance. So rebootstrap1's TX thread would
freeze, and I could control that instance via interaction with its errinj
thread from rebootstrap2 instance or from default instance.

Despite the idea would work for sure, it appeared to be hard to implement
for a short time, so I postponed that, and probably will open a ticket to
implement such thing. It could be useful for any test, which needs to test
behaviour of other threads, when TX is not scheduled for a long time. Also
we can implement that logic as a part of iproto thread.

For our case I found a simpler solution - sleep in wal_write_to_disk, but
deliver all watcher events immediately. Then the test works. And still
crashes without your patch.

Here is my diff, which I pushed on top of your branch. If you don't agree -
lets discuss. Otherwise squash and the patchset LGTM.

================================================================================

commit 7054ed8ffc5cff690858261073cdfb1822e241b7
Author: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Date:   Fri Feb 28 00:02:10 2020 +0100

    Review fixes

diff --git a/src/box/wal.c b/src/box/wal.c
index bf127b259..1668c9348 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -278,11 +278,8 @@ tx_schedule_commit(struct cmsg *msg)
 		/* Closes the input valve. */
 		stailq_concat(&writer->rollback, &batch->rollback);
 	}
-
-	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
 	/* Update the tx vclock to the latest written by wal. */
 	vclock_copy(&replicaset.vclock, &batch->vclock);
-	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
 	tx_schedule_queue(&batch->commit);
 	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
 }
@@ -1117,6 +1114,7 @@ done:
 	}
 	fiber_gc();
 	wal_notify_watchers(writer, WAL_EVENT_WRITE);
+	ERROR_INJECT_SLEEP(ERRINJ_RELAY_FASTER_THAN_TX);
 }
 
 /** WAL writer main loop.  */
@@ -1328,6 +1326,8 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 	msg->events = events;
 	cmsg_init(&msg->cmsg, watcher->route);
 	cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
+	ERROR_INJECT(ERRINJ_RELAY_FASTER_THAN_TX,
+		     cpipe_deliver_now(&watcher->watcher_pipe));
 }
 
 static void
diff --git a/src/lib/core/cbus.h b/src/lib/core/cbus.h
index 16d122779..f0101cb8b 100644
--- a/src/lib/core/cbus.h
+++ b/src/lib/core/cbus.h
@@ -176,6 +176,13 @@ cpipe_set_max_input(struct cpipe *pipe, int max_input)
 	pipe->max_input = max_input;
 }
 
+static inline void
+cpipe_deliver_now(struct cpipe *pipe)
+{
+	if (pipe->n_input > 0)
+		ev_invoke(pipe->producer, &pipe->flush_input, EV_CUSTOM);
+}
+
 /**
  * Flush all staged messages into the pipe and eventually to the
  * consumer.
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index 58fe158fd..d8cdf3f27 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -137,7 +137,7 @@ struct errinj {
 	_(ERRINJ_DYN_MODULE_COUNT, ERRINJ_INT, {.iparam = 0}) \
 	_(ERRINJ_FIBER_MADVISE, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_FIBER_MPROTECT, ERRINJ_INT, {.iparam = -1}) \
-	_(ERRINJ_REPLICASET_VCLOCK_UPDATE, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_RELAY_FASTER_THAN_TX, ERRINJ_BOOL, {.bparam = false}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index eb0905238..4ad24d0c1 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -59,12 +59,12 @@ evals
   - ERRINJ_PORT_DUMP: false
   - ERRINJ_RELAY_BREAK_LSN: -1
   - ERRINJ_RELAY_EXIT_DELAY: 0
+  - ERRINJ_RELAY_FASTER_THAN_TX: false
   - ERRINJ_RELAY_FINAL_JOIN: false
   - ERRINJ_RELAY_FINAL_SLEEP: false
   - ERRINJ_RELAY_REPORT_INTERVAL: 0
   - ERRINJ_RELAY_SEND_DELAY: false
   - ERRINJ_RELAY_TIMEOUT: 0
-  - ERRINJ_REPLICASET_VCLOCK_UPDATE: false
   - ERRINJ_REPLICA_JOIN_DELAY: false
   - ERRINJ_SIO_READ_MAX: -1
   - ERRINJ_SNAP_COMMIT_DELAY: false
diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
index a612826a0..43d3f27f3 100644
--- a/test/replication/gh-4739-vclock-assert.result
+++ b/test/replication/gh-4739-vclock-assert.result
@@ -26,16 +26,19 @@ fiber = require('fiber')
 -- Stop updating replicaset vclock to simulate a situation, when
 -- a row is already relayed to the remote master, but the local
 -- vclock update hasn't happened yet.
-box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
+box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true)
  | ---
  | - ok
  | ...
 lsn = box.info.lsn
  | ---
  | ...
-box.space._schema:replace{'something'}
+f = fiber.create(function() box.space._schema:replace{'something'} end)
  | ---
- | - ['something']
+ | ...
+test_run:wait_cond(function() return f:status() == 'suspended' end)
+ | ---
+ | - true
  | ...
 -- Vclock isn't updated.
 box.info.lsn == lsn
@@ -53,7 +56,7 @@ end, 10)
 
 -- Restart the remote instance. This will make the first instance
 -- resubscribe without entering orphan mode.
-test_run:cmd('restart server rebootstrap2')
+test_run:cmd('restart server rebootstrap2 with wait=False')
  | ---
  | - true
  | ...
@@ -68,10 +71,14 @@ end, 10)
  | ---
  | - true
  | ...
-box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
+box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false)
  | ---
  | - ok
  | ...
+box.space._schema:get{'something'}
+ | ---
+ | - ['something']
+ | ...
 test_run:cmd('switch default')
  | ---
  | - true
diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua
index b6a7caf3b..f8dd86688 100644
--- a/test/replication/gh-4739-vclock-assert.test.lua
+++ b/test/replication/gh-4739-vclock-assert.test.lua
@@ -10,9 +10,10 @@ fiber = require('fiber')
 -- Stop updating replicaset vclock to simulate a situation, when
 -- a row is already relayed to the remote master, but the local
 -- vclock update hasn't happened yet.
-box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
+box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true)
 lsn = box.info.lsn
-box.space._schema:replace{'something'}
+f = fiber.create(function() box.space._schema:replace{'something'} end)
+test_run:wait_cond(function() return f:status() == 'suspended' end)
 -- Vclock isn't updated.
 box.info.lsn == lsn
 
@@ -23,12 +24,13 @@ end, 10)
 
 -- Restart the remote instance. This will make the first instance
 -- resubscribe without entering orphan mode.
-test_run:cmd('restart server rebootstrap2')
+test_run:cmd('restart server rebootstrap2 with wait=False')
 test_run:cmd('switch rebootstrap1')
 -- Wait until resubscribe is sent
 test_run:wait_cond(function()\
     return box.info.replication[2].upstream.status == 'sync'\
 end, 10)
-box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
+box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false)
+box.space._schema:get{'something'}
 test_run:cmd('switch default')
 test_run:drop_cluster(SERVERS)

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance
  2020-02-27 21:24   ` Serge Petrenko
@ 2020-02-27 23:24     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 25+ messages in thread
From: Vladislav Shpilevoy @ 2020-02-27 23:24 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: kirichenkoga, tarantool-patches

On 27/02/2020 22:24, Serge Petrenko wrote:
>  
> 
>     Четверг, 27 февраля 2020, 2:54 +03:00 от Vladislav Shpilevoy <v.shpilevoy@tarantool.org>:
>      
>     Hi! Thanks for the patch!
> 
>     Please, add a @ChangeLog record.
> 
> Hi!
> What kind of a changelog should I write?
> This is a bug fix with no visible behaviour change (speaking from the users point of view)

We add change log record for everything. For bugs too. It
should shortly describe what a bug was fixed. See examples in
https://github.com/tarantool/tarantool/releases

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay
  2020-02-27 13:15     ` Serge Petrenko
@ 2020-02-27 23:33       ` Vladislav Shpilevoy
  0 siblings, 0 replies; 25+ messages in thread
From: Vladislav Shpilevoy @ 2020-02-27 23:33 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: kirichenkoga, tarantool-patches

>>> +return -1;
>>> +}
>>> +uint32_t len = mp_decode_array(&d);
>>> +for(uint32_t i = 0; i < len; ++i) {
>>> +if (mp_typeof(*d) != MP_UINT)
>>> +goto decode_err;
>>> +*id_filter |= 1 << mp_decode_uint(&d);
>>
>> 7. If someone would send a big ID (a program, pretending to be a Tarantool
>> instance), it would cause unsigned bit shift overflow, which is undefined
>> behaviour. Lets check that it is not bigger than 31.
>>
>> However this won't really help much. This code will crash even if I will
>> just send a truncated packet. From what I see.
> 
> I’m not sure I understand what you’re speaking about. This piece of code is
> similar to the one we have in mp_decode_vclock. The situation didn’t get worse,
> at least.

Never mind, I see iproto_msg_decode() checks MessagePack is correct.

^ permalink raw reply	[flat|nested] 25+ messages in thread

* Re: [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it
  2020-02-27 23:22         ` Vladislav Shpilevoy
@ 2020-02-28  8:03           ` Serge Petrenko
  0 siblings, 0 replies; 25+ messages in thread
From: Serge Petrenko @ 2020-02-28  8:03 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: kirichenkoga, tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 10788 bytes --]



> 28 февр. 2020 г., в 02:22, Vladislav Shpilevoy <v.shpilevoy@tarantool.org> написал(а):
> 
>> Ok. I tried to add either ERROR_INJECT_YIELD or ERROR_INJECT_SLEEP at the end
>> of wal_write_to_disk().
>> It looks like you cannot yield in wal_write_to_disk(). (is it possible to yield in WAL thread at all?)
>> Firing this injection with ERROR_INJECT_YIELD and then resetting it leads to wal thread stopping
>> processing messages. This leads to tarantool hanging infinitely on shutdown when tx waits for wal
>> thread to exit, but wal never gets the shutdown signal.
>>  
>> Using ERROR_INJECT_SLEEP leads to wal watchers not being notified until the injection is reset. This
>> probably happens because of  wal_notify_watchers’ use of cpipe_flush_input(), which doesn’t flush the input until
>> the end of event loop iteration, if there are not enough messages (only one message in our case).
>> The event loop iteration never ends, because we sleep right after wal_notify_watchers() call.
>>  
>> So, I see skipping vclock assignment in tx_schedule_commit() as the only possible alternative.
>> Hope my explanation was clear enough and, more imortantly, correct. If not, lest discuss.
> 
> Yeah, I stumbled into the same problems. And realized that the current test,
> after all, is not valid. So we need to change it anyway.
> 
> First I tried to solve them by trying to block TX thread totally on one
> instance after it tried to commit something. Since that would block the
> test too, I tried to introduce a new thread - errinj thread, which would
> listen on an ip/port or a unix socket, and will receive requests to set
> error injections from another instance. So rebootstrap1's TX thread would
> freeze, and I could control that instance via interaction with its errinj
> thread from rebootstrap2 instance or from default instance.
> 
> Despite the idea would work for sure, it appeared to be hard to implement
> for a short time, so I postponed that, and probably will open a ticket to
> implement such thing. It could be useful for any test, which needs to test
> behaviour of other threads, when TX is not scheduled for a long time. Also
> we can implement that logic as a part of iproto thread.
> 
> For our case I found a simpler solution - sleep in wal_write_to_disk, but
> deliver all watcher events immediately. Then the test works. And still
> crashes without your patch.

Thanks for your amendments!
The new test LGTM. Do you mean the old test is incorrect because the
injection simulates an impossible situation? If yes, then I agree.
I applied your diff with a tiny fix. See below.
I’ll send v5 with a changelog in the cover letter shortly.

> 
> Here is my diff, which I pushed on top of your branch. If you don't agree -
> lets discuss. Otherwise squash and the patchset LGTM.
> 
> ================================================================================
> 
> commit 7054ed8ffc5cff690858261073cdfb1822e241b7
> Author: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
> Date:   Fri Feb 28 00:02:10 2020 +0100
> 
>    Review fixes
> 
> diff --git a/src/box/wal.c b/src/box/wal.c
> index bf127b259..1668c9348 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -278,11 +278,8 @@ tx_schedule_commit(struct cmsg *msg)
> 		/* Closes the input valve. */
> 		stailq_concat(&writer->rollback, &batch->rollback);
> 	}
> -
> -	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, { goto skip_update; });
> 	/* Update the tx vclock to the latest written by wal. */
> 	vclock_copy(&replicaset.vclock, &batch->vclock);
> -	ERROR_INJECT(ERRINJ_REPLICASET_VCLOCK_UPDATE, {skip_update:;});
> 	tx_schedule_queue(&batch->commit);
> 	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
> }
> @@ -1117,6 +1114,7 @@ done:
> 	}
> 	fiber_gc();
> 	wal_notify_watchers(writer, WAL_EVENT_WRITE);
> +	ERROR_INJECT_SLEEP(ERRINJ_RELAY_FASTER_THAN_TX);
> }
> 
> /** WAL writer main loop.  */
> @@ -1328,6 +1326,8 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
> 	msg->events = events;
> 	cmsg_init(&msg->cmsg, watcher->route);
> 	cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
> +	ERROR_INJECT(ERRINJ_RELAY_FASTER_THAN_TX,
> +		     cpipe_deliver_now(&watcher->watcher_pipe));
> }
> 
> static void
> diff --git a/src/lib/core/cbus.h b/src/lib/core/cbus.h
> index 16d122779..f0101cb8b 100644
> --- a/src/lib/core/cbus.h
> +++ b/src/lib/core/cbus.h
> @@ -176,6 +176,13 @@ cpipe_set_max_input(struct cpipe *pipe, int max_input)
> 	pipe->max_input = max_input;
> }
> 
> +static inline void
> +cpipe_deliver_now(struct cpipe *pipe)
> +{
> +	if (pipe->n_input > 0)
> +		ev_invoke(pipe->producer, &pipe->flush_input, EV_CUSTOM);
> +}
> +
> /**
>  * Flush all staged messages into the pipe and eventually to the
>  * consumer.
> diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
> index 58fe158fd..d8cdf3f27 100644
> --- a/src/lib/core/errinj.h
> +++ b/src/lib/core/errinj.h
> @@ -137,7 +137,7 @@ struct errinj {
> 	_(ERRINJ_DYN_MODULE_COUNT, ERRINJ_INT, {.iparam = 0}) \
> 	_(ERRINJ_FIBER_MADVISE, ERRINJ_BOOL, {.bparam = false}) \
> 	_(ERRINJ_FIBER_MPROTECT, ERRINJ_INT, {.iparam = -1}) \
> -	_(ERRINJ_REPLICASET_VCLOCK_UPDATE, ERRINJ_BOOL, {.bparam = false}) \
> +	_(ERRINJ_RELAY_FASTER_THAN_TX, ERRINJ_BOOL, {.bparam = false}) \
> 
> ENUM0(errinj_id, ERRINJ_LIST);
> extern struct errinj errinjs[];
> diff --git a/test/box/errinj.result b/test/box/errinj.result
> index eb0905238..4ad24d0c1 100644
> --- a/test/box/errinj.result
> +++ b/test/box/errinj.result
> @@ -59,12 +59,12 @@ evals
>   - ERRINJ_PORT_DUMP: false
>   - ERRINJ_RELAY_BREAK_LSN: -1
>   - ERRINJ_RELAY_EXIT_DELAY: 0
> +  - ERRINJ_RELAY_FASTER_THAN_TX: false
>   - ERRINJ_RELAY_FINAL_JOIN: false
>   - ERRINJ_RELAY_FINAL_SLEEP: false
>   - ERRINJ_RELAY_REPORT_INTERVAL: 0
>   - ERRINJ_RELAY_SEND_DELAY: false
>   - ERRINJ_RELAY_TIMEOUT: 0
> -  - ERRINJ_REPLICASET_VCLOCK_UPDATE: false
>   - ERRINJ_REPLICA_JOIN_DELAY: false
>   - ERRINJ_SIO_READ_MAX: -1
>   - ERRINJ_SNAP_COMMIT_DELAY: false
> diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
> index a612826a0..43d3f27f3 100644
> --- a/test/replication/gh-4739-vclock-assert.result
> +++ b/test/replication/gh-4739-vclock-assert.result
> @@ -26,16 +26,19 @@ fiber = require('fiber')
> -- Stop updating replicaset vclock to simulate a situation, when
> -- a row is already relayed to the remote master, but the local
> -- vclock update hasn't happened yet.
> -box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
> +box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true)
>  | ---
>  | - ok
>  | ...
> lsn = box.info.lsn
>  | ---
>  | ...
> -box.space._schema:replace{'something'}
> +f = fiber.create(function() box.space._schema:replace{'something'} end)
>  | ---
> - | - ['something']
> + | ...
> +test_run:wait_cond(function() return f:status() == 'suspended' end)
> + | ---
> + | - true
>  | ...

No need to call wait_cond() here. The fiber is suspended as soon as control is returned
to console, where we’re trying to call wait_cond.

diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
index 43d3f27f3..83896c4e1 100644
--- a/test/replication/gh-4739-vclock-assert.result
+++ b/test/replication/gh-4739-vclock-assert.result
@@ -36,9 +36,9 @@ lsn = box.info.lsn
 f = fiber.create(function() box.space._schema:replace{'something'} end)
  | ---
  | ...
-test_run:wait_cond(function() return f:status() == 'suspended' end)
+f:status()
  | ---
- | - true
+ | - suspended
  | ...
 -- Vclock isn't updated.
 box.info.lsn == lsn

> -- Vclock isn't updated.
> box.info.lsn == lsn
> @@ -53,7 +56,7 @@ end, 10)
> 
> -- Restart the remote instance. This will make the first instance
> -- resubscribe without entering orphan mode.
> -test_run:cmd('restart server rebootstrap2')
> +test_run:cmd('restart server rebootstrap2 with wait=False')
>  | ---
>  | - true
>  | ...
> @@ -68,10 +71,14 @@ end, 10)
>  | ---
>  | - true
>  | ...
> -box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
> +box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false)
>  | ---
>  | - ok
>  | ...
> +box.space._schema:get{'something'}
> + | ---
> + | - ['something']
> + | ...
> test_run:cmd('switch default')
>  | ---
>  | - true
> diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua
> index b6a7caf3b..f8dd86688 100644
> --- a/test/replication/gh-4739-vclock-assert.test.lua
> +++ b/test/replication/gh-4739-vclock-assert.test.lua
> @@ -10,9 +10,10 @@ fiber = require('fiber')
> -- Stop updating replicaset vclock to simulate a situation, when
> -- a row is already relayed to the remote master, but the local
> -- vclock update hasn't happened yet.
> -box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', true)
> +box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true)
> lsn = box.info.lsn
> -box.space._schema:replace{'something'}
> +f = fiber.create(function() box.space._schema:replace{'something'} end)
> +test_run:wait_cond(function() return f:status() == 'suspended' end)

diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua
index f8dd86688..5755ad752 100644
--- a/test/replication/gh-4739-vclock-assert.test.lua
+++ b/test/replication/gh-4739-vclock-assert.test.lua
@@ -13,7 +13,7 @@ fiber = require('fiber')
 box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true)
 lsn = box.info.lsn
 f = fiber.create(function() box.space._schema:replace{'something'} end)
-test_run:wait_cond(function() return f:status() == 'suspended' end)
+f:status()
 -- Vclock isn't updated.
 box.info.lsn == lsn
 

> -- Vclock isn't updated.
> box.info.lsn == lsn
> 
> @@ -23,12 +24,13 @@ end, 10)
> 
> -- Restart the remote instance. This will make the first instance
> -- resubscribe without entering orphan mode.
> -test_run:cmd('restart server rebootstrap2')
> +test_run:cmd('restart server rebootstrap2 with wait=False')
> test_run:cmd('switch rebootstrap1')
> -- Wait until resubscribe is sent
> test_run:wait_cond(function()\
>     return box.info.replication[2].upstream.status == 'sync'\
> end, 10)
> -box.error.injection.set('ERRINJ_REPLICASET_VCLOCK_UPDATE', false)
> +box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false)
> +box.space._schema:get{'something'}
> test_run:cmd('switch default')
> test_run:drop_cluster(SERVERS)


--
Serge Petrenko
sergepetrenko@tarantool.org


[-- Attachment #2: Type: text/html, Size: 17090 bytes --]

^ permalink raw reply	[flat|nested] 25+ messages in thread

end of thread, other threads:[~2020-02-28  8:03 UTC | newest]

Thread overview: 25+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-02-26 10:00 [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance sergepetrenko
2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 1/4] box: expose box_is_orphan method sergepetrenko
2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 2/4] wal: warn when trying to write a record with a broken lsn sergepetrenko
2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 3/4] replication: implement an instance id filter for relay sergepetrenko
2020-02-26 10:18   ` Konstantin Osipov
2020-02-26 11:16     ` Serge Petrenko
2020-02-26 23:54   ` Vladislav Shpilevoy
2020-02-27  6:48     ` Konstantin Osipov
2020-02-27 13:15     ` Serge Petrenko
2020-02-27 23:33       ` Vladislav Shpilevoy
2020-02-26 10:00 ` [Tarantool-patches] [PATCH v4 4/4] replication: do not relay rows coming from a remote instance back to it sergepetrenko
2020-02-26 10:23   ` Konstantin Osipov
2020-02-26 11:21     ` Serge Petrenko
2020-02-26 11:58       ` Konstantin Osipov
2020-02-26 15:58         ` Serge Petrenko
2020-02-26 16:40           ` Konstantin Osipov
2020-02-26 23:54   ` Vladislav Shpilevoy
2020-02-27  6:52     ` Konstantin Osipov
2020-02-27 14:13     ` Serge Petrenko
2020-02-27 21:17       ` Serge Petrenko
2020-02-27 23:22         ` Vladislav Shpilevoy
2020-02-28  8:03           ` Serge Petrenko
2020-02-26 23:54 ` [Tarantool-patches] [PATCH v4 0/4] replication: fix applying of rows originating from local instance Vladislav Shpilevoy
2020-02-27 21:24   ` Serge Petrenko
2020-02-27 23:24     ` Vladislav Shpilevoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox