[Tarantool-patches] [PATCH 3/4] applier: factor replication stream processing out of subscribe()
Serge Petrenko
sergepetrenko at tarantool.org
Mon Dec 6 06:03:22 MSK 2021
applier_subscribe() is huge, since it performs 2 separate tasks: first
sends a subscribe request to the replication master and handles the
result, then processes the replication stream.
Factor out the replication stream processing into a separate routine to
make applier_subscribe() appear more sane.
Besides, this will be needed with applier-in-thread introduction, when
the connection will be established by the tx fiber, but the stream will
be processed by a separate thread.
Part-of #6329
---
src/box/applier.cc | 114 ++++++++++++++++++++++++---------------------
1 file changed, 62 insertions(+), 52 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index a8505c93a..393f0a2fe 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -1290,6 +1290,67 @@ applier_on_rollback(struct trigger *trigger, void *event)
return 0;
}
+/**
+ * Subscribe to the replication stream. Decode the incoming rows right in
+ * applier fiber.
+ */
+static void
+applier_subscribe_f(struct applier *applier)
+{
+ struct ibuf *ibuf = &applier->ibuf;
+ while (true) {
+ if (applier->state == APPLIER_FINAL_JOIN &&
+ instance_id != REPLICA_ID_NIL) {
+ say_info("final data received");
+ applier_set_state(applier, APPLIER_JOINED);
+ applier_set_state(applier, APPLIER_READY);
+ applier_set_state(applier, APPLIER_FOLLOW);
+ }
+
+ /*
+ * Tarantool < 1.7.7 does not send periodic heartbeat
+ * messages so we can't assume that if we haven't heard
+ * from the master for quite a while the connection is
+ * broken - the master might just be idle.
+ */
+ double timeout = applier->version_id < version_id(1, 7, 7) ?
+ TIMEOUT_INFINITY :
+ replication_disconnect_timeout();
+
+ struct stailq rows;
+ applier_read_tx(applier, &rows, timeout);
+
+ /*
+ * In case of an heartbeat message wake a writer up
+ * and check applier state.
+ */
+ struct xrow_header *first_row =
+ &stailq_first_entry(&rows, struct applier_tx_row,
+ next)->row;
+ raft_process_heartbeat(box_raft(), applier->instance_id);
+ if (first_row->lsn == 0) {
+ if (unlikely(iproto_type_is_raft_request(
+ first_row->type))) {
+ if (applier_handle_raft(applier,
+ first_row) != 0)
+ diag_raise();
+ }
+ applier_signal_ack(applier);
+ } else if (applier_apply_tx(applier, &rows) != 0) {
+ diag_raise();
+ }
+
+ /* Discard processed input. */
+ ibuf->rpos = ibuf->xpos;
+ /*
+ * Even though this is not necessary, defragment the buffer
+ * explicitly. Otherwise the defragmentation would be triggered
+ * by one of the row reads, resulting in moving a bigger memory
+ * chunk.
+ */
+ ibuf_defragment(&applier->ibuf);
+ }
+}
/**
* Execute and process SUBSCRIBE request (follow updates from a master).
*/
@@ -1425,58 +1486,7 @@ applier_subscribe(struct applier *applier)
/*
* Process a stream of rows from the binary log.
*/
- while (true) {
- if (applier->state == APPLIER_FINAL_JOIN &&
- instance_id != REPLICA_ID_NIL) {
- say_info("final data received");
- applier_set_state(applier, APPLIER_JOINED);
- applier_set_state(applier, APPLIER_READY);
- applier_set_state(applier, APPLIER_FOLLOW);
- }
-
- /*
- * Tarantool < 1.7.7 does not send periodic heartbeat
- * messages so we can't assume that if we haven't heard
- * from the master for quite a while the connection is
- * broken - the master might just be idle.
- */
- double timeout = applier->version_id < version_id(1, 7, 7) ?
- TIMEOUT_INFINITY :
- replication_disconnect_timeout();
-
- struct stailq rows;
- applier_read_tx(applier, &rows, timeout);
-
- /*
- * In case of an heartbeat message wake a writer up
- * and check applier state.
- */
- struct xrow_header *first_row =
- &stailq_first_entry(&rows, struct applier_tx_row,
- next)->row;
- raft_process_heartbeat(box_raft(), applier->instance_id);
- if (first_row->lsn == 0) {
- if (unlikely(iproto_type_is_raft_request(
- first_row->type))) {
- if (applier_handle_raft(applier,
- first_row) != 0)
- diag_raise();
- }
- applier_signal_ack(applier);
- } else if (applier_apply_tx(applier, &rows) != 0) {
- diag_raise();
- }
-
- /* Discard processed input. */
- ibuf->rpos = ibuf->xpos;
- /*
- * Even though this is not necessary, defragment the buffer
- * explicitly. Otherwise the defragmentation would be triggered
- * by one of the row reads, resulting in moving a bigger memory
- * chunk.
- */
- ibuf_defragment(&applier->ibuf);
- }
+ applier_subscribe_f(applier);
}
static inline void
--
2.30.1 (Apple Git-130)
More information about the Tarantool-patches
mailing list