[Tarantool-patches] [PATCH v3 10/12] replication: add META stage to JOIN
Serge Petrenko
sergepetrenko at tarantool.org
Tue Jun 29 01:12:56 MSK 2021
The new META stage is part of server's response to a join request.
It's marked by IPROTO_JOIN_META and IPROTO_JOIN_SNAPSHOT requests and goes
before the actual snapshot data.
Follow-up #6034
---
src/box/applier.cc | 17 ++++++++++++++++-
src/box/box.cc | 5 +++--
src/box/iproto_constants.h | 2 ++
src/box/relay.cc | 19 ++++++++++++++++++-
src/box/relay.h | 4 +++-
5 files changed, 42 insertions(+), 5 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 07fe7f5c7..7abad3a64 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -447,12 +447,26 @@ applier_wait_snapshot(struct applier *applier)
xrow_decode_vclock_xc(&row, &replicaset.vclock);
}
+ coio_read_xrow(coio, ibuf, &row);
+ if (row.type == IPROTO_JOIN_META) {
+ /* Read additional metadata. Empty at the moment. */
+ do {
+ coio_read_xrow(coio, ibuf, &row);
+ if (iproto_type_is_error(row.type))
+ xrow_decode_error_xc(&row);
+ else if (row.type != IPROTO_JOIN_SNAPSHOT) {
+ tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
+ (uint32_t)row.type);
+ }
+ } while (row.type != IPROTO_JOIN_SNAPSHOT);
+ coio_read_xrow(coio, ibuf, &row);
+ }
+
/*
* Receive initial data.
*/
uint64_t row_count = 0;
while (true) {
- coio_read_xrow(coio, ibuf, &row);
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
if (apply_snapshot_row(&row) != 0)
@@ -477,6 +491,7 @@ applier_wait_snapshot(struct applier *applier)
tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
(uint32_t) row.type);
}
+ coio_read_xrow(coio, ibuf, &row);
}
return row_count;
diff --git a/src/box/box.cc b/src/box/box.cc
index b2c52bc54..bc68ee4c8 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2501,7 +2501,7 @@ box_process_fetch_snapshot(struct ev_io *io, struct xrow_header *header)
/* Send the snapshot data to the instance. */
struct vclock start_vclock;
- relay_initial_join(io->fd, header->sync, &start_vclock);
+ relay_initial_join(io->fd, header->sync, &start_vclock, 0);
say_info("read-view sent.");
/* Remember master's vclock after the last request */
@@ -2699,7 +2699,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
* Initial stream: feed replica with dirty data from engines.
*/
struct vclock start_vclock;
- relay_initial_join(io->fd, header->sync, &start_vclock);
+ relay_initial_join(io->fd, header->sync, &start_vclock,
+ replica_version_id);
say_info("initial data sent.");
/**
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 3c9edb7d2..247ca6f37 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -263,6 +263,8 @@ enum iproto_type {
IPROTO_FETCH_SNAPSHOT = 69,
/** REGISTER request to leave anonymous replication. */
IPROTO_REGISTER = 70,
+ IPROTO_JOIN_META = 71,
+ IPROTO_JOIN_SNAPSHOT = 72,
/** Vinyl run info stored in .index file */
VY_INDEX_RUN_INFO = 100,
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 60f527b7f..4ebe0fb06 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -392,7 +392,8 @@ relay_set_cord_name(int fd)
}
void
-relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
+relay_initial_join(int fd, uint64_t sync, struct vclock *vclock,
+ uint32_t replica_version_id)
{
struct relay *relay = relay_new(NULL);
if (relay == NULL)
@@ -432,6 +433,22 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
row.sync = sync;
coio_write_xrow(&relay->io, &row);
+ /*
+ * Version is present starting with 2.7.3, 2.8.2, 2.9.1
+ * All these versions know of additional META stage of initial join.
+ */
+ if (replica_version_id > 0) {
+ /* Mark the beginning of the metadata stream. */
+ row.type = IPROTO_JOIN_META;
+ coio_write_xrow(&relay->io, &row);
+
+ /* Empty at the moment. */
+
+ /* Mark the end of the metadata stream. */
+ row.type = IPROTO_JOIN_SNAPSHOT;
+ coio_write_xrow(&relay->io, &row);
+ }
+
/* Send read view to the replica. */
engine_join_xc(&ctx, &relay->stream);
}
diff --git a/src/box/relay.h b/src/box/relay.h
index 615ffb75d..112428ae8 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -116,9 +116,11 @@ relay_push_raft(struct relay *relay, const struct raft_request *req);
* @param fd client connection
* @param sync sync from incoming JOIN request
* @param vclock[out] vclock of the read view sent to the replica
+ * @param replica_version_id peer's version
*/
void
-relay_initial_join(int fd, uint64_t sync, struct vclock *vclock);
+relay_initial_join(int fd, uint64_t sync, struct vclock *vclock,
+ uint32_t replica_version_id);
/**
* Send final JOIN rows to the replica.
--
2.30.1 (Apple Git-130)
More information about the Tarantool-patches
mailing list