From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: v.shpilevoy@tarantool.org, vdavydov@tarantool.org Cc: tarantool-patches@dev.tarantool.org Subject: [Tarantool-patches] [PATCH 3/4] applier: factor replication stream processing out of subscribe() Date: Mon, 6 Dec 2021 06:03:22 +0300 [thread overview] Message-ID: <247613fdaa92be2c6b8b3229a77f1f40eba6fe7e.1638757827.git.sergepetrenko@tarantool.org> (raw) In-Reply-To: <cover.1638757827.git.sergepetrenko@tarantool.org> applier_subscribe() is huge, since it performs 2 separate tasks: first sends a subscribe request to the replication master and handles the result, then processes the replication stream. Factor out the replication stream processing into a separate routine to make applier_subscribe() appear more sane. Besides, this will be needed with applier-in-thread introduction, when the connection will be established by the tx fiber, but the stream will be processed by a separate thread. Part-of #6329 --- src/box/applier.cc | 114 ++++++++++++++++++++++++--------------------- 1 file changed, 62 insertions(+), 52 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index a8505c93a..393f0a2fe 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -1290,6 +1290,67 @@ applier_on_rollback(struct trigger *trigger, void *event) return 0; } +/** + * Subscribe to the replication stream. Decode the incoming rows right in + * applier fiber. + */ +static void +applier_subscribe_f(struct applier *applier) +{ + struct ibuf *ibuf = &applier->ibuf; + while (true) { + if (applier->state == APPLIER_FINAL_JOIN && + instance_id != REPLICA_ID_NIL) { + say_info("final data received"); + applier_set_state(applier, APPLIER_JOINED); + applier_set_state(applier, APPLIER_READY); + applier_set_state(applier, APPLIER_FOLLOW); + } + + /* + * Tarantool < 1.7.7 does not send periodic heartbeat + * messages so we can't assume that if we haven't heard + * from the master for quite a while the connection is + * broken - the master might just be idle. + */ + double timeout = applier->version_id < version_id(1, 7, 7) ? + TIMEOUT_INFINITY : + replication_disconnect_timeout(); + + struct stailq rows; + applier_read_tx(applier, &rows, timeout); + + /* + * In case of an heartbeat message wake a writer up + * and check applier state. + */ + struct xrow_header *first_row = + &stailq_first_entry(&rows, struct applier_tx_row, + next)->row; + raft_process_heartbeat(box_raft(), applier->instance_id); + if (first_row->lsn == 0) { + if (unlikely(iproto_type_is_raft_request( + first_row->type))) { + if (applier_handle_raft(applier, + first_row) != 0) + diag_raise(); + } + applier_signal_ack(applier); + } else if (applier_apply_tx(applier, &rows) != 0) { + diag_raise(); + } + + /* Discard processed input. */ + ibuf->rpos = ibuf->xpos; + /* + * Even though this is not necessary, defragment the buffer + * explicitly. Otherwise the defragmentation would be triggered + * by one of the row reads, resulting in moving a bigger memory + * chunk. + */ + ibuf_defragment(&applier->ibuf); + } +} /** * Execute and process SUBSCRIBE request (follow updates from a master). */ @@ -1425,58 +1486,7 @@ applier_subscribe(struct applier *applier) /* * Process a stream of rows from the binary log. */ - while (true) { - if (applier->state == APPLIER_FINAL_JOIN && - instance_id != REPLICA_ID_NIL) { - say_info("final data received"); - applier_set_state(applier, APPLIER_JOINED); - applier_set_state(applier, APPLIER_READY); - applier_set_state(applier, APPLIER_FOLLOW); - } - - /* - * Tarantool < 1.7.7 does not send periodic heartbeat - * messages so we can't assume that if we haven't heard - * from the master for quite a while the connection is - * broken - the master might just be idle. - */ - double timeout = applier->version_id < version_id(1, 7, 7) ? - TIMEOUT_INFINITY : - replication_disconnect_timeout(); - - struct stailq rows; - applier_read_tx(applier, &rows, timeout); - - /* - * In case of an heartbeat message wake a writer up - * and check applier state. - */ - struct xrow_header *first_row = - &stailq_first_entry(&rows, struct applier_tx_row, - next)->row; - raft_process_heartbeat(box_raft(), applier->instance_id); - if (first_row->lsn == 0) { - if (unlikely(iproto_type_is_raft_request( - first_row->type))) { - if (applier_handle_raft(applier, - first_row) != 0) - diag_raise(); - } - applier_signal_ack(applier); - } else if (applier_apply_tx(applier, &rows) != 0) { - diag_raise(); - } - - /* Discard processed input. */ - ibuf->rpos = ibuf->xpos; - /* - * Even though this is not necessary, defragment the buffer - * explicitly. Otherwise the defragmentation would be triggered - * by one of the row reads, resulting in moving a bigger memory - * chunk. - */ - ibuf_defragment(&applier->ibuf); - } + applier_subscribe_f(applier); } static inline void -- 2.30.1 (Apple Git-130)
next prev parent reply other threads:[~2021-12-06 3:05 UTC|newest] Thread overview: 8+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-12-06 3:03 [Tarantool-patches] [PATCH 0/4] replication: introduce applier thread Serge Petrenko via Tarantool-patches 2021-12-06 3:03 ` [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input Serge Petrenko via Tarantool-patches 2021-12-06 3:05 ` Serge Petrenko via Tarantool-patches 2021-12-06 3:03 ` [Tarantool-patches] [PATCH 2/4] applier: reuse input buffer to store row bodies Serge Petrenko via Tarantool-patches 2021-12-06 3:03 ` Serge Petrenko via Tarantool-patches [this message] 2021-12-06 3:03 ` [Tarantool-patches] [PATCH 4/4] Introduce applier thread Serge Petrenko via Tarantool-patches 2021-12-06 9:59 ` [Tarantool-patches] [PATCH 0/4] replication: introduce " Vladimir Davydov via Tarantool-patches 2021-12-06 10:31 ` Serge Petrenko via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=247613fdaa92be2c6b8b3229a77f1f40eba6fe7e.1638757827.git.sergepetrenko@tarantool.org \ --to=tarantool-patches@dev.tarantool.org \ --cc=sergepetrenko@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --cc=vdavydov@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH 3/4] applier: factor replication stream processing out of subscribe()' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox