From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp36.i.mail.ru (smtp36.i.mail.ru [94.100.177.96]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 97E1446970E for ; Wed, 25 Dec 2019 15:49:11 +0300 (MSK) From: sergepetrenko Date: Wed, 25 Dec 2019 15:47:00 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v2 3/5] applier: split join processing into two stages List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: v.shpilevoy@tarantool.org, georgy@tarantool.org Cc: tarantool-patches@dev.tarantool.org From: Serge Petrenko We already have 'initial join' and 'final join' stages in applier logic. The first actually means fetching master's snapshot, and the second one -- receiving the rows which should contain replica's registration in _cluster. These stages will be used separately once anonymous replica is implemented, so split them as a preparation. Prerequisite #3186 --- src/box/applier.cc | 65 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 50 insertions(+), 15 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 42374f886..f4f9d0670 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -202,7 +202,7 @@ applier_writer_f(va_list ap) } static int -apply_initial_join_row(struct xrow_header *row) +apply_snapshot_row(struct xrow_header *row) { int rc; struct request request; @@ -388,18 +388,12 @@ done: applier_set_state(applier, APPLIER_READY); } -/** - * Execute and process JOIN request (bootstrap the instance). - */ -static void -applier_join(struct applier *applier) +static uint64_t +applier_wait_snapshot(struct applier *applier) { - /* Send JOIN request */ struct ev_io *coio = &applier->io; struct ibuf *ibuf = &applier->ibuf; struct xrow_header row; - xrow_encode_join_xc(&row, &INSTANCE_UUID); - coio_write_xrow(coio, &row); /** * Tarantool < 1.7.0: if JOIN is successful, there is no "OK" @@ -423,8 +417,6 @@ applier_join(struct applier *applier) xrow_decode_vclock_xc(&row, &replicaset.vclock); } - applier_set_state(applier, APPLIER_INITIAL_JOIN); - /* * Receive initial data. */ @@ -433,7 +425,7 @@ applier_join(struct applier *applier) coio_read_xrow(coio, ibuf, &row); applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { - if (apply_initial_join_row(&row) != 0) + if (apply_snapshot_row(&row) != 0) diag_raise(); if (++row_count % 100000 == 0) say_info("%.1fM rows received", row_count / 1e6); @@ -456,9 +448,16 @@ applier_join(struct applier *applier) (uint32_t) row.type); } } - say_info("initial data received"); - applier_set_state(applier, APPLIER_FINAL_JOIN); + return row_count; +} + +static uint64_t +applier_wait_register(struct applier *applier, uint64_t row_count) +{ + struct ev_io *coio = &applier->io; + struct ibuf *ibuf = &applier->ibuf; + struct xrow_header row; /* * Tarantool < 1.7.0: there is no "final join" stage. @@ -466,7 +465,7 @@ applier_join(struct applier *applier) * until replica id is received. */ if (applier->version_id < version_id(1, 7, 0)) - return; + return row_count; /* * Receive final data. @@ -485,6 +484,7 @@ applier_join(struct applier *applier) * Current vclock. This is not used now, * ignore. */ + ++row_count; break; /* end of stream */ } else if (iproto_type_is_error(row.type)) { xrow_decode_error_xc(&row); /* rethrow error */ @@ -493,6 +493,41 @@ applier_join(struct applier *applier) (uint32_t) row.type); } } + + return row_count; +} + +/** + * Execute and process JOIN request (bootstrap the instance). + */ +static void +applier_join(struct applier *applier) +{ + /* Send JOIN request */ + struct ev_io *coio = &applier->io; + struct xrow_header row; + uint64_t row_count; + + xrow_encode_join_xc(&row, &INSTANCE_UUID); + coio_write_xrow(coio, &row); + + applier_set_state(applier, APPLIER_INITIAL_JOIN); + + row_count = applier_wait_snapshot(applier); + + say_info("initial data received"); + + applier_set_state(applier, APPLIER_FINAL_JOIN); + + if (applier_wait_register(applier, row_count) == row_count) { + /* + * We didn't receive any rows during registration. + * Proceed to "subscribe" and do not finish bootstrap + * until replica id is received. + */ + return; + } + say_info("final data received"); applier_set_state(applier, APPLIER_JOINED); -- 2.20.1 (Apple Git-117)