From: Serge Petrenko via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: v.shpilevoy@tarantool.org, vdavydov@tarantool.org
Cc: tarantool-patches@dev.tarantool.org
Subject: [Tarantool-patches] [PATCH 3/4] applier: factor replication stream processing out of subscribe()
Date: Mon, 6 Dec 2021 06:03:22 +0300 [thread overview]
Message-ID: <247613fdaa92be2c6b8b3229a77f1f40eba6fe7e.1638757827.git.sergepetrenko@tarantool.org> (raw)
In-Reply-To: <cover.1638757827.git.sergepetrenko@tarantool.org>
applier_subscribe() is huge, since it performs 2 separate tasks: first
sends a subscribe request to the replication master and handles the
result, then processes the replication stream.
Factor out the replication stream processing into a separate routine to
make applier_subscribe() appear more sane.
Besides, this will be needed with applier-in-thread introduction, when
the connection will be established by the tx fiber, but the stream will
be processed by a separate thread.
Part-of #6329
---
src/box/applier.cc | 114 ++++++++++++++++++++++++---------------------
1 file changed, 62 insertions(+), 52 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index a8505c93a..393f0a2fe 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -1290,6 +1290,67 @@ applier_on_rollback(struct trigger *trigger, void *event)
return 0;
}
+/**
+ * Subscribe to the replication stream. Decode the incoming rows right in
+ * applier fiber.
+ */
+static void
+applier_subscribe_f(struct applier *applier)
+{
+ struct ibuf *ibuf = &applier->ibuf;
+ while (true) {
+ if (applier->state == APPLIER_FINAL_JOIN &&
+ instance_id != REPLICA_ID_NIL) {
+ say_info("final data received");
+ applier_set_state(applier, APPLIER_JOINED);
+ applier_set_state(applier, APPLIER_READY);
+ applier_set_state(applier, APPLIER_FOLLOW);
+ }
+
+ /*
+ * Tarantool < 1.7.7 does not send periodic heartbeat
+ * messages so we can't assume that if we haven't heard
+ * from the master for quite a while the connection is
+ * broken - the master might just be idle.
+ */
+ double timeout = applier->version_id < version_id(1, 7, 7) ?
+ TIMEOUT_INFINITY :
+ replication_disconnect_timeout();
+
+ struct stailq rows;
+ applier_read_tx(applier, &rows, timeout);
+
+ /*
+ * In case of an heartbeat message wake a writer up
+ * and check applier state.
+ */
+ struct xrow_header *first_row =
+ &stailq_first_entry(&rows, struct applier_tx_row,
+ next)->row;
+ raft_process_heartbeat(box_raft(), applier->instance_id);
+ if (first_row->lsn == 0) {
+ if (unlikely(iproto_type_is_raft_request(
+ first_row->type))) {
+ if (applier_handle_raft(applier,
+ first_row) != 0)
+ diag_raise();
+ }
+ applier_signal_ack(applier);
+ } else if (applier_apply_tx(applier, &rows) != 0) {
+ diag_raise();
+ }
+
+ /* Discard processed input. */
+ ibuf->rpos = ibuf->xpos;
+ /*
+ * Even though this is not necessary, defragment the buffer
+ * explicitly. Otherwise the defragmentation would be triggered
+ * by one of the row reads, resulting in moving a bigger memory
+ * chunk.
+ */
+ ibuf_defragment(&applier->ibuf);
+ }
+}
/**
* Execute and process SUBSCRIBE request (follow updates from a master).
*/
@@ -1425,58 +1486,7 @@ applier_subscribe(struct applier *applier)
/*
* Process a stream of rows from the binary log.
*/
- while (true) {
- if (applier->state == APPLIER_FINAL_JOIN &&
- instance_id != REPLICA_ID_NIL) {
- say_info("final data received");
- applier_set_state(applier, APPLIER_JOINED);
- applier_set_state(applier, APPLIER_READY);
- applier_set_state(applier, APPLIER_FOLLOW);
- }
-
- /*
- * Tarantool < 1.7.7 does not send periodic heartbeat
- * messages so we can't assume that if we haven't heard
- * from the master for quite a while the connection is
- * broken - the master might just be idle.
- */
- double timeout = applier->version_id < version_id(1, 7, 7) ?
- TIMEOUT_INFINITY :
- replication_disconnect_timeout();
-
- struct stailq rows;
- applier_read_tx(applier, &rows, timeout);
-
- /*
- * In case of an heartbeat message wake a writer up
- * and check applier state.
- */
- struct xrow_header *first_row =
- &stailq_first_entry(&rows, struct applier_tx_row,
- next)->row;
- raft_process_heartbeat(box_raft(), applier->instance_id);
- if (first_row->lsn == 0) {
- if (unlikely(iproto_type_is_raft_request(
- first_row->type))) {
- if (applier_handle_raft(applier,
- first_row) != 0)
- diag_raise();
- }
- applier_signal_ack(applier);
- } else if (applier_apply_tx(applier, &rows) != 0) {
- diag_raise();
- }
-
- /* Discard processed input. */
- ibuf->rpos = ibuf->xpos;
- /*
- * Even though this is not necessary, defragment the buffer
- * explicitly. Otherwise the defragmentation would be triggered
- * by one of the row reads, resulting in moving a bigger memory
- * chunk.
- */
- ibuf_defragment(&applier->ibuf);
- }
+ applier_subscribe_f(applier);
}
static inline void
--
2.30.1 (Apple Git-130)
next prev parent reply other threads:[~2021-12-06 3:05 UTC|newest]
Thread overview: 8+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-12-06 3:03 [Tarantool-patches] [PATCH 0/4] replication: introduce applier thread Serge Petrenko via Tarantool-patches
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input Serge Petrenko via Tarantool-patches
2021-12-06 3:05 ` Serge Petrenko via Tarantool-patches
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 2/4] applier: reuse input buffer to store row bodies Serge Petrenko via Tarantool-patches
2021-12-06 3:03 ` Serge Petrenko via Tarantool-patches [this message]
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 4/4] Introduce applier thread Serge Petrenko via Tarantool-patches
2021-12-06 9:59 ` [Tarantool-patches] [PATCH 0/4] replication: introduce " Vladimir Davydov via Tarantool-patches
2021-12-06 10:31 ` Serge Petrenko via Tarantool-patches
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=247613fdaa92be2c6b8b3229a77f1f40eba6fe7e.1638757827.git.sergepetrenko@tarantool.org \
--to=tarantool-patches@dev.tarantool.org \
--cc=sergepetrenko@tarantool.org \
--cc=v.shpilevoy@tarantool.org \
--cc=vdavydov@tarantool.org \
--subject='Re: [Tarantool-patches] [PATCH 3/4] applier: factor replication stream processing out of subscribe()' \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox