From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 467C36EC6E; Wed, 14 Jul 2021 21:28:28 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 467C36EC6E DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1626287308; bh=uWK4wTrJt3SbtdQQg66ccp7XORkSOZZ4U5pBXblDdRA=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To:Cc: From; b=DFr3G7uytB2Jw+sWFdf2+try17AybafY/tL1FaIbCnFvPy/UAg/BrRl06gDBjzVPH +pfLmC5x6P56JpIwRE5WIlVmE2kUjgypAm/bdRq7O9/yoMsjwRiVVw4wGA5ImabPeX 4HGl5uZfcyJS7jXucQAGHj2wrk4vta/82ySj8KXo= Received: from smtp58.i.mail.ru (smtp58.i.mail.ru [217.69.128.38]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id C91B86EC6E for ; Wed, 14 Jul 2021 21:26:01 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org C91B86EC6E Received: by smtp58.i.mail.ru with esmtpa (envelope-from ) id 1m3ja4-0007Q7-Rk; Wed, 14 Jul 2021 21:26:01 +0300 To: v.shpilevoy@tarantool.org, gorcunov@gmail.com Date: Wed, 14 Jul 2021 21:25:33 +0300 Message-Id: <5508c21501bda07f9e9238de16ebb3f5cfb6df36.1626287002.git.sergepetrenko@tarantool.org> X-Mailer: git-send-email 2.30.1 (Apple Git-130) In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-4EC0790: 10 X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD97BB0EF39AD2B33D598226807B8A1E9DC331E3F9997896515182A05F538085040B419F02788486C3DA3BBF5FDC46C8C84188EE43B2098728CE64CAC7AA644CDCE X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE75644E22E05AA81AEB287FD4696A6DC2FA8DF7F3B2552694A4E2F5AFA99E116B42401471946AA11AF23F8577A6DFFEA7C1AB87D0AB0DD9D358F08D7030A58E5AD1A62830130A00468AEEEE3FBA3A834EE7353EFBB55337566E68746B1F2AB10C606FC4FCFB8497676153BF656E0D8EC6BA471835C12D1D9774AD6D5ED66289B5278DA827A17800CE70B7EC9B0538196269FA2833FD35BB23D2EF20D2F80756B5F868A13BD56FB6657A471835C12D1D977725E5C173C3A84C3AE4FDBF11360AC9B117882F4460429728AD0CFFFB425014E868A13BD56FB6657E2021AF6380DFAD1A18204E546F3947CB11811A4A51E3B096D1867E19FE1407959CC434672EE6371089D37D7C0E48F6C8AA50765F790063788B3B24285A3CD0EEFF80C71ABB335746BA297DBC24807EABDAD6C7F3747799A X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8183A4AFAF3EA6BDC44C234C8B12C006B7A3EB0C8D5FD3BD3A05CFFFC8BEA447FD8640D2167C061F8EBB1881A6453793CE9C32612AADDFBE061C61BE10805914D3804EBA3D8E7E5B87ABF8C51168CD8EBDB30B6221521ACED37DC48ACC2A39D04F89CDFB48F4795C241BDAD6C7F3747799A X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34AE88D5ADEBE7C983E8243991A1F7BA2DA8C4CF88C08D2AA91786FC063AD311BCD19E9CC97B905DD31D7E09C32AA3244CA4595B41D80C634EBEA3810F9C488E43259227199D06760A927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojDdSFIg49M1RckEYkkWvzlA== X-Mailru-Sender: 3B9A0136629DC9125D61937A2360A446DCF67A6ACA5D040DC29C65211E4673EEA6599301EF07B87E424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BC112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: [Tarantool-patches] [PATCH v4 05/16] replication: add META stage to JOIN X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Cc: tarantool-patches@dev.tarantool.org Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" The new META stage is part of server's response to a join request. It's marked by IPROTO_JOIN_META and IPROTO_JOIN_SNAPSHOT requests and goes before the actual snapshot data. Prerequisite #6034 @TarantoolBot document Title: new protocol stage during JOIN A new stage is added to the stream of JOIN rows coming from master. The stage is marked with a bodyless row with type IPROTO_JOIN_META = 71 Once all the rows from the stage are sent out, the JOIN continues as before (as a stream of snapshot rows). The end of META stage is marked with a row of type IPROTO_JOIN_SNAPSHOT = 72 The stage contains the rows that are necessary for instance initialization (current Raft term, current state of synchronous transaction queue), but do not belong to any system space. --- src/box/applier.cc | 17 ++++++++++++++++- src/box/box.cc | 5 +++-- src/box/iproto_constants.h | 2 ++ src/box/relay.cc | 19 ++++++++++++++++++- src/box/relay.h | 4 +++- 5 files changed, 42 insertions(+), 5 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 07fe7f5c7..0f81b7cc4 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -447,12 +447,26 @@ applier_wait_snapshot(struct applier *applier) xrow_decode_vclock_xc(&row, &replicaset.vclock); } + coio_read_xrow(coio, ibuf, &row); + if (row.type == IPROTO_JOIN_META) { + /* Read additional metadata. Empty at the moment. */ + do { + coio_read_xrow(coio, ibuf, &row); + if (iproto_type_is_error(row.type)) { + xrow_decode_error_xc(&row); + } else if (row.type != IPROTO_JOIN_SNAPSHOT) { + tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, + (uint32_t)row.type); + } + } while (row.type != IPROTO_JOIN_SNAPSHOT); + coio_read_xrow(coio, ibuf, &row); + } + /* * Receive initial data. */ uint64_t row_count = 0; while (true) { - coio_read_xrow(coio, ibuf, &row); applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { if (apply_snapshot_row(&row) != 0) @@ -477,6 +491,7 @@ applier_wait_snapshot(struct applier *applier) tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, (uint32_t) row.type); } + coio_read_xrow(coio, ibuf, &row); } return row_count; diff --git a/src/box/box.cc b/src/box/box.cc index 6d5516682..8c695686e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -2474,7 +2474,7 @@ box_process_fetch_snapshot(struct ev_io *io, struct xrow_header *header) /* Send the snapshot data to the instance. */ struct vclock start_vclock; - relay_initial_join(io->fd, header->sync, &start_vclock); + relay_initial_join(io->fd, header->sync, &start_vclock, 0); say_info("read-view sent."); /* Remember master's vclock after the last request */ @@ -2672,7 +2672,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header) * Initial stream: feed replica with dirty data from engines. */ struct vclock start_vclock; - relay_initial_join(io->fd, header->sync, &start_vclock); + relay_initial_join(io->fd, header->sync, &start_vclock, + replica_version_id); say_info("initial data sent."); /** diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 137bee9da..e913801a8 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -261,6 +261,8 @@ enum iproto_type { IPROTO_FETCH_SNAPSHOT = 69, /** REGISTER request to leave anonymous replication. */ IPROTO_REGISTER = 70, + IPROTO_JOIN_META = 71, + IPROTO_JOIN_SNAPSHOT = 72, /** Vinyl run info stored in .index file */ VY_INDEX_RUN_INFO = 100, diff --git a/src/box/relay.cc b/src/box/relay.cc index 60f527b7f..4ebe0fb06 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -392,7 +392,8 @@ relay_set_cord_name(int fd) } void -relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) +relay_initial_join(int fd, uint64_t sync, struct vclock *vclock, + uint32_t replica_version_id) { struct relay *relay = relay_new(NULL); if (relay == NULL) @@ -432,6 +433,22 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) row.sync = sync; coio_write_xrow(&relay->io, &row); + /* + * Version is present starting with 2.7.3, 2.8.2, 2.9.1 + * All these versions know of additional META stage of initial join. + */ + if (replica_version_id > 0) { + /* Mark the beginning of the metadata stream. */ + row.type = IPROTO_JOIN_META; + coio_write_xrow(&relay->io, &row); + + /* Empty at the moment. */ + + /* Mark the end of the metadata stream. */ + row.type = IPROTO_JOIN_SNAPSHOT; + coio_write_xrow(&relay->io, &row); + } + /* Send read view to the replica. */ engine_join_xc(&ctx, &relay->stream); } diff --git a/src/box/relay.h b/src/box/relay.h index 615ffb75d..112428ae8 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -116,9 +116,11 @@ relay_push_raft(struct relay *relay, const struct raft_request *req); * @param fd client connection * @param sync sync from incoming JOIN request * @param vclock[out] vclock of the read view sent to the replica + * @param replica_version_id peer's version */ void -relay_initial_join(int fd, uint64_t sync, struct vclock *vclock); +relay_initial_join(int fd, uint64_t sync, struct vclock *vclock, + uint32_t replica_version_id); /** * Send final JOIN rows to the replica. -- 2.30.1 (Apple Git-130)