From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 0BE586ECDB; Mon, 6 Dec 2021 06:05:08 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 0BE586ECDB DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1638759908; bh=DGA9a2//mpnvWH2+eOefPVtUbYlsYtIxZSUdNW7Y+Jc=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To:Cc: From; b=nddYLdqRG71x1Np7ScNfloNia1ADwVqraPjPT+NmW3+488fda4RTM1SLP9y0494SD zYIxVkAJHOhf+1XqeZOKRmHdC70OdFi3fupsZsHYolojevdPqCeKMQEgnx+zRPWJgB Fh9OhPjXex6GSX2kXmCrc5thJtLVbfIrSyNu0Z6o= Received: from smtp38.i.mail.ru (smtp38.i.mail.ru [94.100.177.98]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id CBB986ECE2 for ; Mon, 6 Dec 2021 06:03:30 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org CBB986ECE2 Received: by smtp38.i.mail.ru with esmtpa (envelope-from ) id 1mu4Hp-0007aS-Kq; Mon, 06 Dec 2021 06:03:30 +0300 To: v.shpilevoy@tarantool.org, vdavydov@tarantool.org Date: Mon, 6 Dec 2021 06:03:22 +0300 Message-Id: <247613fdaa92be2c6b8b3229a77f1f40eba6fe7e.1638757827.git.sergepetrenko@tarantool.org> X-Mailer: git-send-email 2.30.1 (Apple Git-130) In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-4EC0790: 10 X-7564579A: B8F34718100C35BD X-77F55803: 4F1203BC0FB41BD93822B471089FF64D6B5EE151AFD053303E9E903E4D999E60182A05F5380850402CA7D43297544066D08685ADCA68193BDCBC1F28A9610AE6CAECEBEC31788810 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE754E958D11587BC68EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637BC4B0F5721B555078638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D8F1B76611FFFF9B7D73E39CAAE86A341C117882F4460429724CE54428C33FAD305F5C1EE8F4F765FC8C7ADC89C2F0B2A5A471835C12D1D9774AD6D5ED66289B52BA9C0B312567BB23117882F44604297287769387670735209647ADFADE5905B12CC0D3CB04F14752D2E47CDBA5A96583BA9C0B312567BB2376E601842F6C81A19E625A9149C048EECCD848CCB6FE560C2D242C3BD2E3F4C64AD6D5ED66289B52698AB9A7B718F8C46E0066C2D8992A16725E5C173C3A84C3B445B9DA8B742BABBA3038C0950A5D36B5C8C57E37DE458B0BC6067A898B09E46D1867E19FE14079C09775C1D3CA48CF3D321E7403792E342EB15956EA79C166A417C69337E82CC275ECD9A6C639B01B78DA827A17800CE7A342EAC7F3A94D1B731C566533BA786AA5CC5B56E945C8DA X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8186998911F362727C4C7A0BC55FA0FE5FC07FEF4BC0B92C68B2F45967F2A44E949A188E519F1F536E2B1881A6453793CE9C32612AADDFBE061C61BE10805914D3804EBA3D8E7E5B87ABF8C51168CD8EBDBF87214F1A954108EDC48ACC2A39D04F89CDFB48F4795C241BDAD6C7F3747799A X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D3480665FBD8F49180C07BD044D411C8443180066D69D8D2A433D68CE8E8F1C80265B8B08ED356111AD1D7E09C32AA3244CE60A2EF13ABC8A815F39A3A02C25789D435BF7150578642F927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojbL9S8ysBdXiGu4RiEaSSzqR+as5ivAKu X-Mailru-Sender: 583F1D7ACE8F49BD7B46BC6C7C9DD5A8AA235155620D99CD226562899DB837B989AD5BDEE87B968B424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BCB0DAF586E7D11B3E67EA787935ED9F1B X-Mras: Ok Subject: [Tarantool-patches] [PATCH 3/4] applier: factor replication stream processing out of subscribe() X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Cc: tarantool-patches@dev.tarantool.org Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" applier_subscribe() is huge, since it performs 2 separate tasks: first sends a subscribe request to the replication master and handles the result, then processes the replication stream. Factor out the replication stream processing into a separate routine to make applier_subscribe() appear more sane. Besides, this will be needed with applier-in-thread introduction, when the connection will be established by the tx fiber, but the stream will be processed by a separate thread. Part-of #6329 --- src/box/applier.cc | 114 ++++++++++++++++++++++++--------------------- 1 file changed, 62 insertions(+), 52 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index a8505c93a..393f0a2fe 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -1290,6 +1290,67 @@ applier_on_rollback(struct trigger *trigger, void *event) return 0; } +/** + * Subscribe to the replication stream. Decode the incoming rows right in + * applier fiber. + */ +static void +applier_subscribe_f(struct applier *applier) +{ + struct ibuf *ibuf = &applier->ibuf; + while (true) { + if (applier->state == APPLIER_FINAL_JOIN && + instance_id != REPLICA_ID_NIL) { + say_info("final data received"); + applier_set_state(applier, APPLIER_JOINED); + applier_set_state(applier, APPLIER_READY); + applier_set_state(applier, APPLIER_FOLLOW); + } + + /* + * Tarantool < 1.7.7 does not send periodic heartbeat + * messages so we can't assume that if we haven't heard + * from the master for quite a while the connection is + * broken - the master might just be idle. + */ + double timeout = applier->version_id < version_id(1, 7, 7) ? + TIMEOUT_INFINITY : + replication_disconnect_timeout(); + + struct stailq rows; + applier_read_tx(applier, &rows, timeout); + + /* + * In case of an heartbeat message wake a writer up + * and check applier state. + */ + struct xrow_header *first_row = + &stailq_first_entry(&rows, struct applier_tx_row, + next)->row; + raft_process_heartbeat(box_raft(), applier->instance_id); + if (first_row->lsn == 0) { + if (unlikely(iproto_type_is_raft_request( + first_row->type))) { + if (applier_handle_raft(applier, + first_row) != 0) + diag_raise(); + } + applier_signal_ack(applier); + } else if (applier_apply_tx(applier, &rows) != 0) { + diag_raise(); + } + + /* Discard processed input. */ + ibuf->rpos = ibuf->xpos; + /* + * Even though this is not necessary, defragment the buffer + * explicitly. Otherwise the defragmentation would be triggered + * by one of the row reads, resulting in moving a bigger memory + * chunk. + */ + ibuf_defragment(&applier->ibuf); + } +} /** * Execute and process SUBSCRIBE request (follow updates from a master). */ @@ -1425,58 +1486,7 @@ applier_subscribe(struct applier *applier) /* * Process a stream of rows from the binary log. */ - while (true) { - if (applier->state == APPLIER_FINAL_JOIN && - instance_id != REPLICA_ID_NIL) { - say_info("final data received"); - applier_set_state(applier, APPLIER_JOINED); - applier_set_state(applier, APPLIER_READY); - applier_set_state(applier, APPLIER_FOLLOW); - } - - /* - * Tarantool < 1.7.7 does not send periodic heartbeat - * messages so we can't assume that if we haven't heard - * from the master for quite a while the connection is - * broken - the master might just be idle. - */ - double timeout = applier->version_id < version_id(1, 7, 7) ? - TIMEOUT_INFINITY : - replication_disconnect_timeout(); - - struct stailq rows; - applier_read_tx(applier, &rows, timeout); - - /* - * In case of an heartbeat message wake a writer up - * and check applier state. - */ - struct xrow_header *first_row = - &stailq_first_entry(&rows, struct applier_tx_row, - next)->row; - raft_process_heartbeat(box_raft(), applier->instance_id); - if (first_row->lsn == 0) { - if (unlikely(iproto_type_is_raft_request( - first_row->type))) { - if (applier_handle_raft(applier, - first_row) != 0) - diag_raise(); - } - applier_signal_ack(applier); - } else if (applier_apply_tx(applier, &rows) != 0) { - diag_raise(); - } - - /* Discard processed input. */ - ibuf->rpos = ibuf->xpos; - /* - * Even though this is not necessary, defragment the buffer - * explicitly. Otherwise the defragmentation would be triggered - * by one of the row reads, resulting in moving a bigger memory - * chunk. - */ - ibuf_defragment(&applier->ibuf); - } + applier_subscribe_f(applier); } static inline void -- 2.30.1 (Apple Git-130)