[Tarantool-patches] [PATCH v3 3/4] replication: do not relay rows coming from a remote instance back to it
Serge Petrenko
sergepetrenko at tarantool.org
Tue Feb 18 20:37:06 MSK 2020
We have a mechanism for restoring rows originating from an instance that
suffered a sudden power loss: remote masters resend the isntance's rows
received before a certain point in time, defined by remote master vclock
at the moment of subscribe.
However, this is useful only on initial replication configuraiton, when
an instance has just recovered, so that it can receive what it has
relayed but haven't synced to disk.
In other cases, when an instance is operating normally and master-master
replication is configured, the mechanism described above may lead to
instance re-applying instance's own rows, coming from a master it has just
subscribed to.
To fix the problem do not relay rows coming from a remote instance, if
the instance has already recovered.
Closes #4739
---
src/box/applier.cc | 2 +-
src/box/box.cc | 7 ++++---
src/box/iproto_constants.h | 1 +
src/box/relay.cc | 12 ++++++++++--
src/box/relay.h | 3 ++-
src/box/xrow.c | 18 +++++++++++++++---
src/box/xrow.h | 26 ++++++++++++++++----------
7 files changed, 49 insertions(+), 20 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index ae3d281a5..542144d14 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -867,7 +867,7 @@ applier_subscribe(struct applier *applier)
vclock_create(&vclock);
vclock_copy(&vclock, &replicaset.vclock);
xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
- &vclock, replication_anon);
+ &vclock, replication_anon, box_is_orphan());
coio_write_xrow(coio, &row);
/* Read SUBSCRIBE response */
diff --git a/src/box/box.cc b/src/box/box.cc
index a4d823df0..d485845c7 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1787,8 +1787,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
uint32_t replica_version_id;
vclock_create(&replica_clock);
bool anon;
- xrow_decode_subscribe_xc(header, NULL, &replica_uuid,
- &replica_clock, &replica_version_id, &anon);
+ bool is_orphan;
+ xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock,
+ &replica_version_id, &anon, &is_orphan);
/* Forbid connection to itself */
if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID))
@@ -1871,7 +1872,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
* indefinitely).
*/
relay_subscribe(replica, io->fd, header->sync, &replica_clock,
- replica_version_id);
+ replica_version_id, is_orphan);
}
void
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index b66c05c06..9616cbcf0 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -125,6 +125,7 @@ enum iproto_key {
IPROTO_STMT_ID = 0x43,
/* Leave a gap between SQL keys and additional request keys */
IPROTO_REPLICA_ANON = 0x50,
+ IPROTO_REPLICA_IS_ORPHAN = 0x51,
IPROTO_KEY_MAX
};
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 741a09201..384773b05 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -676,7 +676,8 @@ relay_subscribe_f(va_list ap)
/** Replication acceptor fiber handler. */
void
relay_subscribe(struct replica *replica, int fd, uint64_t sync,
- struct vclock *replica_clock, uint32_t replica_version_id)
+ struct vclock *replica_clock, uint32_t replica_version_id,
+ bool replica_is_orphan)
{
assert(replica->anon || replica->id != REPLICA_ID_NIL);
struct relay *relay = replica->relay;
@@ -700,8 +701,15 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
});
vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
+ /*
+ * If the remote instance is already synced, don't relay
+ * the instance's rows back to it. In order to do so,
+ * make recovery silently skip rows originating from the
+ * instance.
+ */
relay->r = recovery_new(cfg_gets("wal_dir"), false,
- replica_clock, 0);
+ replica_clock, replica_is_orphan ?
+ 0 : 1 << replica->id);
vclock_copy(&relay->tx.vclock, replica_clock);
relay->version_id = replica_version_id;
diff --git a/src/box/relay.h b/src/box/relay.h
index e1782d78f..9ba878faf 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -124,6 +124,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
*/
void
relay_subscribe(struct replica *replica, int fd, uint64_t sync,
- struct vclock *replica_vclock, uint32_t replica_version_id);
+ struct vclock *replica_vclock, uint32_t replica_version_id,
+ bool replica_is_orphan);
#endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 968c3a202..6b8155859 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1194,7 +1194,7 @@ int
xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
- const struct vclock *vclock, bool anon)
+ const struct vclock *vclock, bool anon, bool is_orphan)
{
memset(row, 0, sizeof(*row));
size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
@@ -1204,7 +1204,7 @@ xrow_encode_subscribe(struct xrow_header *row,
return -1;
}
char *data = buf;
- data = mp_encode_map(data, 5);
+ data = mp_encode_map(data, 6);
data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
data = xrow_encode_uuid(data, replicaset_uuid);
data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
@@ -1215,6 +1215,8 @@ xrow_encode_subscribe(struct xrow_header *row,
data = mp_encode_uint(data, tarantool_version_id());
data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
data = mp_encode_bool(data, anon);
+ data = mp_encode_uint(data, IPROTO_REPLICA_IS_ORPHAN);
+ data = mp_encode_bool(data, is_orphan);
assert(data <= buf + size);
row->body[0].iov_base = buf;
row->body[0].iov_len = (data - buf);
@@ -1226,7 +1228,7 @@ xrow_encode_subscribe(struct xrow_header *row,
int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
- uint32_t *version_id, bool *anon)
+ uint32_t *version_id, bool *anon, bool *is_orphan)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -1301,6 +1303,16 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
}
*anon = mp_decode_bool(&d);
break;
+ case IPROTO_REPLICA_IS_ORPHAN:
+ if (is_orphan == NULL)
+ goto skip;
+ if (mp_typeof(*d) != MP_BOOL) {
+ xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+ "invalid REPLICA_IS_ORPHAN flag");
+ return -1;
+ }
+ *is_orphan = mp_decode_bool(&d);
+ break;
default: skip:
mp_next(&d); /* value */
}
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 0973c497d..3f188870a 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -322,6 +322,7 @@ xrow_encode_register(struct xrow_header *row,
* @param instance_uuid Instance uuid.
* @param vclock Replication clock.
* @param anon Whether it is an anonymous subscribe request or not.
+ * @param is_orphan Whether the instance is in orphan mode.
*
* @retval 0 Success.
* @retval -1 Memory error.
@@ -330,7 +331,7 @@ int
xrow_encode_subscribe(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
- const struct vclock *vclock, bool anon);
+ const struct vclock *vclock, bool anon, bool is_orphan);
/**
* Decode SUBSCRIBE command.
@@ -347,7 +348,7 @@ xrow_encode_subscribe(struct xrow_header *row,
int
xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
- uint32_t *version_id, bool *anon);
+ uint32_t *version_id, bool *anon, bool *is_orphan);
/**
* Encode JOIN command.
@@ -371,7 +372,8 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid);
static inline int
xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid)
{
- return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL);
+ return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL,
+ NULL);
}
/**
@@ -386,7 +388,8 @@ static inline int
xrow_decode_register(struct xrow_header *row, struct tt_uuid *instance_uuid,
struct vclock *vclock)
{
- return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL, NULL);
+ return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL,
+ NULL, NULL);
}
/**
@@ -411,7 +414,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock);
static inline int
xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock)
{
- return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL);
+ return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL, NULL);
}
/**
@@ -442,7 +445,8 @@ xrow_decode_subscribe_response(struct xrow_header *row,
struct tt_uuid *replicaset_uuid,
struct vclock *vclock)
{
- return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL, NULL);
+ return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL,
+ NULL, NULL);
}
/**
@@ -817,10 +821,10 @@ static inline void
xrow_encode_subscribe_xc(struct xrow_header *row,
const struct tt_uuid *replicaset_uuid,
const struct tt_uuid *instance_uuid,
- const struct vclock *vclock, bool anon)
+ const struct vclock *vclock, bool anon, bool is_orphan)
{
if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
- vclock, anon) != 0)
+ vclock, anon, is_orphan) != 0)
diag_raise();
}
@@ -829,10 +833,12 @@ static inline void
xrow_decode_subscribe_xc(struct xrow_header *row,
struct tt_uuid *replicaset_uuid,
struct tt_uuid *instance_uuid, struct vclock *vclock,
- uint32_t *replica_version_id, bool *anon)
+ uint32_t *replica_version_id, bool *anon,
+ bool *is_orphan)
{
if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
- vclock, replica_version_id, anon) != 0)
+ vclock, replica_version_id, anon,
+ is_orphan) != 0)
diag_raise();
}
--
2.21.1 (Apple Git-122.3)
More information about the Tarantool-patches
mailing list