From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp55.i.mail.ru (smtp55.i.mail.ru [217.69.128.35]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 47A624696C4 for ; Tue, 18 Feb 2020 20:37:26 +0300 (MSK) From: Serge Petrenko Date: Tue, 18 Feb 2020 20:37:06 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v3 3/4] replication: do not relay rows coming from a remote instance back to it List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: v.shpilevoy@tarantool.org, alexander.turenko@tarantool.org, kostja.osipov@gmail.com Cc: tarantool-patches@dev.tarantool.org We have a mechanism for restoring rows originating from an instance that suffered a sudden power loss: remote masters resend the isntance's rows received before a certain point in time, defined by remote master vclock at the moment of subscribe. However, this is useful only on initial replication configuraiton, when an instance has just recovered, so that it can receive what it has relayed but haven't synced to disk. In other cases, when an instance is operating normally and master-master replication is configured, the mechanism described above may lead to instance re-applying instance's own rows, coming from a master it has just subscribed to. To fix the problem do not relay rows coming from a remote instance, if the instance has already recovered. Closes #4739 --- src/box/applier.cc | 2 +- src/box/box.cc | 7 ++++--- src/box/iproto_constants.h | 1 + src/box/relay.cc | 12 ++++++++++-- src/box/relay.h | 3 ++- src/box/xrow.c | 18 +++++++++++++++--- src/box/xrow.h | 26 ++++++++++++++++---------- 7 files changed, 49 insertions(+), 20 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index ae3d281a5..542144d14 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -867,7 +867,7 @@ applier_subscribe(struct applier *applier) vclock_create(&vclock); vclock_copy(&vclock, &replicaset.vclock); xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, - &vclock, replication_anon); + &vclock, replication_anon, box_is_orphan()); coio_write_xrow(coio, &row); /* Read SUBSCRIBE response */ diff --git a/src/box/box.cc b/src/box/box.cc index a4d823df0..d485845c7 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1787,8 +1787,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) uint32_t replica_version_id; vclock_create(&replica_clock); bool anon; - xrow_decode_subscribe_xc(header, NULL, &replica_uuid, - &replica_clock, &replica_version_id, &anon); + bool is_orphan; + xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock, + &replica_version_id, &anon, &is_orphan); /* Forbid connection to itself */ if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID)) @@ -1871,7 +1872,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) * indefinitely). */ relay_subscribe(replica, io->fd, header->sync, &replica_clock, - replica_version_id); + replica_version_id, is_orphan); } void diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index b66c05c06..9616cbcf0 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -125,6 +125,7 @@ enum iproto_key { IPROTO_STMT_ID = 0x43, /* Leave a gap between SQL keys and additional request keys */ IPROTO_REPLICA_ANON = 0x50, + IPROTO_REPLICA_IS_ORPHAN = 0x51, IPROTO_KEY_MAX }; diff --git a/src/box/relay.cc b/src/box/relay.cc index 741a09201..384773b05 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -676,7 +676,8 @@ relay_subscribe_f(va_list ap) /** Replication acceptor fiber handler. */ void relay_subscribe(struct replica *replica, int fd, uint64_t sync, - struct vclock *replica_clock, uint32_t replica_version_id) + struct vclock *replica_clock, uint32_t replica_version_id, + bool replica_is_orphan) { assert(replica->anon || replica->id != REPLICA_ID_NIL); struct relay *relay = replica->relay; @@ -700,8 +701,15 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, }); vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock); + /* + * If the remote instance is already synced, don't relay + * the instance's rows back to it. In order to do so, + * make recovery silently skip rows originating from the + * instance. + */ relay->r = recovery_new(cfg_gets("wal_dir"), false, - replica_clock, 0); + replica_clock, replica_is_orphan ? + 0 : 1 << replica->id); vclock_copy(&relay->tx.vclock, replica_clock); relay->version_id = replica_version_id; diff --git a/src/box/relay.h b/src/box/relay.h index e1782d78f..9ba878faf 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -124,6 +124,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, */ void relay_subscribe(struct replica *replica, int fd, uint64_t sync, - struct vclock *replica_vclock, uint32_t replica_version_id); + struct vclock *replica_vclock, uint32_t replica_version_id, + bool replica_is_orphan); #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */ diff --git a/src/box/xrow.c b/src/box/xrow.c index 968c3a202..6b8155859 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -1194,7 +1194,7 @@ int xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock, bool anon) + const struct vclock *vclock, bool anon, bool is_orphan) { memset(row, 0, sizeof(*row)); size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock); @@ -1204,7 +1204,7 @@ xrow_encode_subscribe(struct xrow_header *row, return -1; } char *data = buf; - data = mp_encode_map(data, 5); + data = mp_encode_map(data, 6); data = mp_encode_uint(data, IPROTO_CLUSTER_UUID); data = xrow_encode_uuid(data, replicaset_uuid); data = mp_encode_uint(data, IPROTO_INSTANCE_UUID); @@ -1215,6 +1215,8 @@ xrow_encode_subscribe(struct xrow_header *row, data = mp_encode_uint(data, tarantool_version_id()); data = mp_encode_uint(data, IPROTO_REPLICA_ANON); data = mp_encode_bool(data, anon); + data = mp_encode_uint(data, IPROTO_REPLICA_IS_ORPHAN); + data = mp_encode_bool(data, is_orphan); assert(data <= buf + size); row->body[0].iov_base = buf; row->body[0].iov_len = (data - buf); @@ -1226,7 +1228,7 @@ xrow_encode_subscribe(struct xrow_header *row, int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *version_id, bool *anon) + uint32_t *version_id, bool *anon, bool *is_orphan) { if (row->bodycnt == 0) { diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); @@ -1301,6 +1303,16 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, } *anon = mp_decode_bool(&d); break; + case IPROTO_REPLICA_IS_ORPHAN: + if (is_orphan == NULL) + goto skip; + if (mp_typeof(*d) != MP_BOOL) { + xrow_on_decode_err(data, end, ER_INVALID_MSGPACK, + "invalid REPLICA_IS_ORPHAN flag"); + return -1; + } + *is_orphan = mp_decode_bool(&d); + break; default: skip: mp_next(&d); /* value */ } diff --git a/src/box/xrow.h b/src/box/xrow.h index 0973c497d..3f188870a 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -322,6 +322,7 @@ xrow_encode_register(struct xrow_header *row, * @param instance_uuid Instance uuid. * @param vclock Replication clock. * @param anon Whether it is an anonymous subscribe request or not. + * @param is_orphan Whether the instance is in orphan mode. * * @retval 0 Success. * @retval -1 Memory error. @@ -330,7 +331,7 @@ int xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock, bool anon); + const struct vclock *vclock, bool anon, bool is_orphan); /** * Decode SUBSCRIBE command. @@ -347,7 +348,7 @@ xrow_encode_subscribe(struct xrow_header *row, int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *version_id, bool *anon); + uint32_t *version_id, bool *anon, bool *is_orphan); /** * Encode JOIN command. @@ -371,7 +372,8 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid); static inline int xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid) { - return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL); + return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL, + NULL); } /** @@ -386,7 +388,8 @@ static inline int xrow_decode_register(struct xrow_header *row, struct tt_uuid *instance_uuid, struct vclock *vclock) { - return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL, NULL); + return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL, + NULL, NULL); } /** @@ -411,7 +414,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock); static inline int xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock) { - return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL); + return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL, NULL); } /** @@ -442,7 +445,8 @@ xrow_decode_subscribe_response(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct vclock *vclock) { - return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL, NULL); + return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL, + NULL, NULL); } /** @@ -817,10 +821,10 @@ static inline void xrow_encode_subscribe_xc(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock, bool anon) + const struct vclock *vclock, bool anon, bool is_orphan) { if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid, - vclock, anon) != 0) + vclock, anon, is_orphan) != 0) diag_raise(); } @@ -829,10 +833,12 @@ static inline void xrow_decode_subscribe_xc(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *replica_version_id, bool *anon) + uint32_t *replica_version_id, bool *anon, + bool *is_orphan) { if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid, - vclock, replica_version_id, anon) != 0) + vclock, replica_version_id, anon, + is_orphan) != 0) diag_raise(); } -- 2.21.1 (Apple Git-122.3)