From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id D3D946EC40; Tue, 29 Jun 2021 01:18:32 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org D3D946EC40 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1624918712; bh=v5W7SwF0Q8+kXpvyxaXa93x/j/RjmJTdDhP1Q+Qd5jI=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To:Cc: From; b=ubTnAOZEMxPu7XZuC3pnqsR+l9z3GLQUZ34ODwL2wkzD/uT6gOftKKnVzTblci7Nx 06YqEHb0e8ZeQnkEha1YoUHYM+ZQJ7Ym41SdaQAVaRisKHacNkcXKJ7gm1XhuJp6m8 2WKa3z509ynKWW+Tyd77JcJJnGReGqI2BSZEUKBY= Received: from smtp61.i.mail.ru (smtp61.i.mail.ru [217.69.128.41]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 7D4156F3C4 for ; Tue, 29 Jun 2021 01:13:24 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 7D4156F3C4 Received: by smtp61.i.mail.ru with esmtpa (envelope-from ) id 1lxzVL-0007oC-LQ; Tue, 29 Jun 2021 01:13:24 +0300 To: v.shpilevoy@tarantool.org, gorcunov@gmail.com Date: Tue, 29 Jun 2021 01:12:56 +0300 Message-Id: <36836518bdd41be84a0c84e32b244093ed069f62.1624918078.git.sergepetrenko@tarantool.org> X-Mailer: git-send-email 2.30.1 (Apple Git-130) In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-4EC0790: 10 X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD954DFF1DC42D673FB0C620705B15DE32DFE392EA95FA71EAB182A05F538085040830FEA78C6D0D9AE84EBB9191EA448DB622FEF23B59BB5E91793D23E8A9F4A74 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7590E57235B5C00BDEA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637FDB3827A455F08028638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D883FCF9ADCF72256E1140CAB6F8AF9339117882F4460429724CE54428C33FAD305F5C1EE8F4F765FCE2CCD8F0CAA010FB389733CBF5DBD5E9C8A9BA7A39EFB766F5D81C698A659EA7CC7F00164DA146DA9985D098DBDEAEC8706E30CA52318619F6B57BC7E6449061A352F6E88A58FB86F5D81C698A659EA7E827F84554CEF5019E625A9149C048EE9ECD01F8117BC8BEE2021AF6380DFAD18AA50765F790063735872C767BF85DA227C277FBC8AE2E8B569F1129A2C6445075ECD9A6C639B01B4E70A05D1297E1BBCB5012B2E24CD356 X-B7AD71C0: AC4F5C86D027EB782CDD5689AFBDA7A2AD77751E876CB595E8F7B195E1C97831D7A3F10BEE0A642DE7DC6BFB44DFC7CA X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8186998911F362727C414F749A5E30D975CD87436A60A035F40EC1B1DB64C2E8C06BB1919B4DE8EB09C9C2B6934AE262D3EE7EAB7254005DCED7532B743992DF240BDC6A1CF3F042BAD6DF99611D93F60EF92B0BE0DA6BB795D699F904B3F4130E343918A1A30D5E7FCCB5012B2E24CD356 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34806D3522FB05EB396EF31C61FAC8358488D0BC43EFC9457E9CFCE68668FB45981285E5C98FE925E31D7E09C32AA3244C3E72F58A85BFD6CC74FBA844799612133FD9C8CA1B0515E0927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojNjLyMoNI2JZhvFy3QE+3hw== X-Mailru-Sender: 3B9A0136629DC9125D61937A2360A446C3FEE19542AA4C0E8231685662B97467AB09C8E55D095759424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BC112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: [Tarantool-patches] [PATCH v3 10/12] replication: add META stage to JOIN X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Cc: tarantool-patches@dev.tarantool.org Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" The new META stage is part of server's response to a join request. It's marked by IPROTO_JOIN_META and IPROTO_JOIN_SNAPSHOT requests and goes before the actual snapshot data. Follow-up #6034 --- src/box/applier.cc | 17 ++++++++++++++++- src/box/box.cc | 5 +++-- src/box/iproto_constants.h | 2 ++ src/box/relay.cc | 19 ++++++++++++++++++- src/box/relay.h | 4 +++- 5 files changed, 42 insertions(+), 5 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 07fe7f5c7..7abad3a64 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -447,12 +447,26 @@ applier_wait_snapshot(struct applier *applier) xrow_decode_vclock_xc(&row, &replicaset.vclock); } + coio_read_xrow(coio, ibuf, &row); + if (row.type == IPROTO_JOIN_META) { + /* Read additional metadata. Empty at the moment. */ + do { + coio_read_xrow(coio, ibuf, &row); + if (iproto_type_is_error(row.type)) + xrow_decode_error_xc(&row); + else if (row.type != IPROTO_JOIN_SNAPSHOT) { + tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, + (uint32_t)row.type); + } + } while (row.type != IPROTO_JOIN_SNAPSHOT); + coio_read_xrow(coio, ibuf, &row); + } + /* * Receive initial data. */ uint64_t row_count = 0; while (true) { - coio_read_xrow(coio, ibuf, &row); applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { if (apply_snapshot_row(&row) != 0) @@ -477,6 +491,7 @@ applier_wait_snapshot(struct applier *applier) tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, (uint32_t) row.type); } + coio_read_xrow(coio, ibuf, &row); } return row_count; diff --git a/src/box/box.cc b/src/box/box.cc index b2c52bc54..bc68ee4c8 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -2501,7 +2501,7 @@ box_process_fetch_snapshot(struct ev_io *io, struct xrow_header *header) /* Send the snapshot data to the instance. */ struct vclock start_vclock; - relay_initial_join(io->fd, header->sync, &start_vclock); + relay_initial_join(io->fd, header->sync, &start_vclock, 0); say_info("read-view sent."); /* Remember master's vclock after the last request */ @@ -2699,7 +2699,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header) * Initial stream: feed replica with dirty data from engines. */ struct vclock start_vclock; - relay_initial_join(io->fd, header->sync, &start_vclock); + relay_initial_join(io->fd, header->sync, &start_vclock, + replica_version_id); say_info("initial data sent."); /** diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 3c9edb7d2..247ca6f37 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -263,6 +263,8 @@ enum iproto_type { IPROTO_FETCH_SNAPSHOT = 69, /** REGISTER request to leave anonymous replication. */ IPROTO_REGISTER = 70, + IPROTO_JOIN_META = 71, + IPROTO_JOIN_SNAPSHOT = 72, /** Vinyl run info stored in .index file */ VY_INDEX_RUN_INFO = 100, diff --git a/src/box/relay.cc b/src/box/relay.cc index 60f527b7f..4ebe0fb06 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -392,7 +392,8 @@ relay_set_cord_name(int fd) } void -relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) +relay_initial_join(int fd, uint64_t sync, struct vclock *vclock, + uint32_t replica_version_id) { struct relay *relay = relay_new(NULL); if (relay == NULL) @@ -432,6 +433,22 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) row.sync = sync; coio_write_xrow(&relay->io, &row); + /* + * Version is present starting with 2.7.3, 2.8.2, 2.9.1 + * All these versions know of additional META stage of initial join. + */ + if (replica_version_id > 0) { + /* Mark the beginning of the metadata stream. */ + row.type = IPROTO_JOIN_META; + coio_write_xrow(&relay->io, &row); + + /* Empty at the moment. */ + + /* Mark the end of the metadata stream. */ + row.type = IPROTO_JOIN_SNAPSHOT; + coio_write_xrow(&relay->io, &row); + } + /* Send read view to the replica. */ engine_join_xc(&ctx, &relay->stream); } diff --git a/src/box/relay.h b/src/box/relay.h index 615ffb75d..112428ae8 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -116,9 +116,11 @@ relay_push_raft(struct relay *relay, const struct raft_request *req); * @param fd client connection * @param sync sync from incoming JOIN request * @param vclock[out] vclock of the read view sent to the replica + * @param replica_version_id peer's version */ void -relay_initial_join(int fd, uint64_t sync, struct vclock *vclock); +relay_initial_join(int fd, uint64_t sync, struct vclock *vclock, + uint32_t replica_version_id); /** * Send final JOIN rows to the replica. -- 2.30.1 (Apple Git-130)