* [Tarantool-patches] [PATCH 0/4] replication: introduce applier thread
@ 2021-12-06 3:03 Serge Petrenko via Tarantool-patches
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input Serge Petrenko via Tarantool-patches
` (4 more replies)
0 siblings, 5 replies; 8+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-12-06 3:03 UTC (permalink / raw)
To: v.shpilevoy, vdavydov; +Cc: tarantool-patches
Patches #1 and #2 rework applier and coio_read_xrow to keep parsed input.
This allows us to get rid of copying row bodies in applier right after parsing
them.
The patches are left after a failed applier-in-thread approach. They are not
needed anymore, strictly speaking, so we may drop them. Or use them. After all,
they make usual applier (fiber in tx thread) better. I'm still sending them for
review.
Patch #3 is a small refactoring, needed for patch #4.
Patch #4 is the applier in thread itself. Here's a brief idea:
There's a separate thread which decodes the incoming rows. This way the applier
fiber (in tx thread) may deal with already decoded requests and doesn't waste
its time on decoding them. This should improve replication performance quite a
bit.
We introduce a new config option: `replication_num_threads`. There may be not
more than `replication_num_threads` applier threads (default is 1). And when
there are more appliers than threads, each thread handles multiple appliers.
The thread has 2 fibers per each applier: a reader and a writer.
The writer simply copies applier_writer_f behaviour. Nothing to see here.
The reader reads ahead as much data as possible and decodes it.
As soon as there is enough data read to decode a full transaction, the reader
decodes the transaction (or multiple transactions) and sends the result to the
tx thread.
I wasted quite some time to make the approach with coio_read_xrow() work. (It
was the first one that came to mind, and looked the simplest, but failed).
Everything came together once I dumped the coio_read_xrow() approach, and
started using readahead.
Some of the tests are still failing in CI and local runs, so I'm sending the
branch without tests now. I'm going to fix the tests now.
Branch:
https://github.com/tarantool/tarantool/tree/sp/gh-6329-applier-in-thread-notest
Issue:
https://github.com/tarantool/tarantool/issues/6329
Serge Petrenko (4):
xrow: rework coio_read_xrow to keep parsed input
applier: reuse input buffer to store row bodies
applier: factor replication stream processing out of subscribe()
Introduce applier thread
.../unreleased/gh-6329-applier-in-thread.md | 5 +
src/box/applier.cc | 1014 +++++++++++++++--
src/box/applier.h | 4 +
src/box/box.cc | 2 +-
src/box/lua/load_cfg.lua | 2 +
src/box/replication.cc | 5 +-
src/box/replication.h | 7 +-
src/box/xrow_io.cc | 28 +-
src/box/xrow_io.h | 18 +-
src/lib/small | 2 +-
src/lua/buffer.lua | 2 +
11 files changed, 955 insertions(+), 134 deletions(-)
create mode 100644 changelogs/unreleased/gh-6329-applier-in-thread.md
--
2.30.1 (Apple Git-130)
^ permalink raw reply [flat|nested] 8+ messages in thread
* [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input
2021-12-06 3:03 [Tarantool-patches] [PATCH 0/4] replication: introduce applier thread Serge Petrenko via Tarantool-patches
@ 2021-12-06 3:03 ` Serge Petrenko via Tarantool-patches
2021-12-06 3:05 ` Serge Petrenko via Tarantool-patches
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 2/4] applier: reuse input buffer to store row bodies Serge Petrenko via Tarantool-patches
` (3 subsequent siblings)
4 siblings, 1 reply; 8+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-12-06 3:03 UTC (permalink / raw)
To: v.shpilevoy, vdavydov; +Cc: tarantool-patches
coio_read_xrow(_timeout_xc) uses an ibuf as a temporary storage for
input. It parses the xrow upon receipt and immediately discards the
input buffer. This means that the just parsed xrow's body may be used
only until the next row is read: each new read may reuse the space
where row's body was located.
So once the user has to read multiple rows (say, a transaction) before
processing them, he is forced to copy rows' bodies to some other
location.
Let's better let the user decide whether input should be immediately
discarded or not.
Introduce coio_read_xrow_ex(_timeout_xc), which doesn't discard the
input upon parse, and uses ibuf->xpos instead of ibuf->rpos to point at
the end of parsed data. This way the user is free to decide when to
advance ibuf->rpos and discard the input.
Introduce macros with old behaviour for xrow_read_xrow(_timeout_xc).
Prerequisite #6329
---
src/box/xrow_io.cc | 28 ++++++++++++++--------------
src/box/xrow_io.h | 18 +++++++++++++++---
src/lib/small | 2 +-
src/lua/buffer.lua | 2 ++
4 files changed, 32 insertions(+), 18 deletions(-)
diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index 20e327239..9d0d4b12e 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -36,61 +36,61 @@
#include "msgpuck/msgpuck.h"
void
-coio_read_xrow(struct iostream *io, struct ibuf *in, struct xrow_header *row)
+coio_read_xrow_ex(struct iostream *io, struct ibuf *in, struct xrow_header *row)
{
/* Read fixed header */
- if (ibuf_used(in) < 1)
+ if (ibuf_unparsed(in) < 1)
coio_breadn(io, in, 1);
/* Read length */
- if (mp_typeof(*in->rpos) != MP_UINT) {
+ if (mp_typeof(*in->xpos) != MP_UINT) {
tnt_raise(ClientError, ER_INVALID_MSGPACK,
"packet length");
}
- ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
+ ssize_t to_read = mp_check_uint(in->xpos, in->wpos);
if (to_read > 0)
coio_breadn(io, in, to_read);
- uint32_t len = mp_decode_uint((const char **) &in->rpos);
+ uint32_t len = mp_decode_uint((const char **) &in->xpos);
/* Read header and body */
- to_read = len - ibuf_used(in);
+ to_read = len - ibuf_unparsed(in);
if (to_read > 0)
coio_breadn(io, in, to_read);
- xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
+ xrow_header_decode_xc(row, (const char **) &in->xpos, in->xpos + len,
true);
}
void
-coio_read_xrow_timeout_xc(struct iostream *io, struct ibuf *in,
+coio_read_xrow_ex_timeout_xc(struct iostream *io, struct ibuf *in,
struct xrow_header *row, ev_tstamp timeout)
{
ev_tstamp start, delay;
coio_timeout_init(&start, &delay, timeout);
/* Read fixed header */
- if (ibuf_used(in) < 1)
+ if (ibuf_unparsed(in) < 1)
coio_breadn_timeout(io, in, 1, delay);
coio_timeout_update(&start, &delay);
/* Read length */
- if (mp_typeof(*in->rpos) != MP_UINT) {
+ if (mp_typeof(*in->xpos) != MP_UINT) {
tnt_raise(ClientError, ER_INVALID_MSGPACK,
"packet length");
}
- ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
+ ssize_t to_read = mp_check_uint(in->xpos, in->wpos);
if (to_read > 0)
coio_breadn_timeout(io, in, to_read, delay);
coio_timeout_update(&start, &delay);
- uint32_t len = mp_decode_uint((const char **) &in->rpos);
+ uint32_t len = mp_decode_uint((const char **) &in->xpos);
/* Read header and body */
- to_read = len - ibuf_used(in);
+ to_read = len - ibuf_unparsed(in);
if (to_read > 0)
coio_breadn_timeout(io, in, to_read, delay);
- xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len,
+ xrow_header_decode_xc(row, (const char **) &in->xpos, in->xpos + len,
true);
}
diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h
index 4e81b6e0c..5ecdd108e 100644
--- a/src/box/xrow_io.h
+++ b/src/box/xrow_io.h
@@ -39,11 +39,23 @@ struct iostream;
struct xrow_header;
void
-coio_read_xrow(struct iostream *io, struct ibuf *in, struct xrow_header *row);
+coio_read_xrow_ex(struct iostream *io, struct ibuf *in, struct xrow_header *row);
+
+#define coio_read_xrow(io, in, row) \
+ do { \
+ coio_read_xrow_ex(io, in, row); \
+ (in)->rpos = (in)->xpos; \
+ } while (0)
void
-coio_read_xrow_timeout_xc(struct iostream *io, struct ibuf *in,
- struct xrow_header *row, double timeout);
+coio_read_xrow_ex_timeout_xc(struct iostream *io, struct ibuf *in,
+ struct xrow_header *row, double timeout);
+
+#define coio_read_xrow_timeout_xc(io, in, row, timeout) \
+ do { \
+ coio_read_xrow_ex_timeout_xc(io, in, row, timeout); \
+ (in)->rpos = (in)->xpos; \
+ } while (0)
void
coio_write_xrow(struct iostream *io, const struct xrow_header *row);
diff --git a/src/lib/small b/src/lib/small
index 3d15a7058..47ca9ee09 160000
--- a/src/lib/small
+++ b/src/lib/small
@@ -1 +1 @@
-Subproject commit 3d15a705817ff60ef6fe5e4b70ae4c09056927e3
+Subproject commit 47ca9ee09a9e1ae052c1111b3220751d3fe035b0
diff --git a/src/lua/buffer.lua b/src/lua/buffer.lua
index 182c0b015..cab0544e6 100644
--- a/src/lua/buffer.lua
+++ b/src/lua/buffer.lua
@@ -23,6 +23,8 @@ struct ibuf
char *buf;
/** Start of input. */
char *rpos;
+ /** End of parsed input. */
+ char *xpos;
/** End of useful input */
char *wpos;
/** End of ibuf. */
--
2.30.1 (Apple Git-130)
^ permalink raw reply [flat|nested] 8+ messages in thread
* [Tarantool-patches] [PATCH 2/4] applier: reuse input buffer to store row bodies
2021-12-06 3:03 [Tarantool-patches] [PATCH 0/4] replication: introduce applier thread Serge Petrenko via Tarantool-patches
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input Serge Petrenko via Tarantool-patches
@ 2021-12-06 3:03 ` Serge Petrenko via Tarantool-patches
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 3/4] applier: factor replication stream processing out of subscribe() Serge Petrenko via Tarantool-patches
` (2 subsequent siblings)
4 siblings, 0 replies; 8+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-12-06 3:03 UTC (permalink / raw)
To: v.shpilevoy, vdavydov; +Cc: tarantool-patches
Applier, the main user of coio_read_xrow(), had to save row bodies
elsewhere, because there was no guarantee that the input buffer wouldn't
be reused before the body is processed.
This changed in the previous commit (xrow: rework coio_read_xrow to keep
parsed input), so it's time to take advantage of this change in applier.
Stop saving row bodies to fiber gc region, store them right on ibuf
instead.
Introduce the machinery needed to track ibuf reallocation.
Prerequisite #6329
---
src/box/applier.cc | 86 +++++++++++++++++++++++++++++++++-------------
1 file changed, 63 insertions(+), 23 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 6036c19d9..a8505c93a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -553,6 +553,22 @@ struct applier_tx_row {
struct xrow_header row;
};
+/** Defragment the input buffer: move its contents, if any, to its beginning. */
+static inline void
+ibuf_defragment(struct ibuf *ibuf)
+{
+ size_t used = ibuf_used(ibuf);
+ if (used == 0) {
+ ibuf_reset(ibuf);
+ } else {
+ size_t cap = ibuf_capacity(ibuf);
+ /*
+ * Defragment the buffer by reserving all the available space.
+ */
+ ibuf_reserve(ibuf, cap - used);
+ }
+}
+
static uint64_t
applier_wait_register(struct applier *applier, uint64_t row_count)
{
@@ -589,8 +605,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
}
if (apply_final_join_tx(applier->instance_id, &rows) != 0)
diag_raise();
+ /* @sa applier_subscribe(). */
+ applier->ibuf.rpos = applier->ibuf.xpos;
+ ibuf_defragment(&applier->ibuf);
}
+ ibuf_reset(&applier->ibuf);
return row_count;
}
@@ -676,7 +696,7 @@ applier_read_tx_row(struct applier *applier, double timeout)
ERROR_INJECT_YIELD(ERRINJ_APPLIER_READ_TX_ROW_DELAY);
- coio_read_xrow_timeout_xc(io, ibuf, row, timeout);
+ coio_read_xrow_ex_timeout_xc(io, ibuf, row, timeout);
if (row->tm > 0)
applier->lag = ev_now(loop()) - row->tm;
@@ -722,20 +742,6 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)
if (row->is_commit) {
/* Signal the caller that we've reached the tx end. */
tsn = 0;
- } else if (row->bodycnt == 1) {
- /*
- * Save row body to gc region. Not done for single-statement
- * transactions and the last row of multi-statement transactions
- * knowing that the input buffer will not be used while the
- * transaction is applied.
- */
- void *new_base = region_alloc(&fiber()->gc, row->body->iov_len);
- if (new_base == NULL)
- tnt_raise(OutOfMemory, row->body->iov_len, "region",
- "xrow body");
- memcpy(new_base, row->body->iov_base, row->body->iov_len);
- /* Adjust row body pointers. */
- row->body->iov_base = new_base;
}
stailq_add_tail(rows, &tx_row->next);
@@ -744,10 +750,19 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)
/**
* Read one transaction from network using applier's input buffer.
- * Transaction rows are placed onto fiber gc region.
- * We could not use applier input buffer to store rows because
- * rpos is adjusted as xrow is decoded and the corresponding
- * network input space is reused for the next xrow.
+ *
+ * The input buffer is reused to store row bodies. The only two problems to
+ * deal with are sporadic input buffer reallocation and defragmentation.
+ * We have to adjust row body pointers each time any of the two occur.
+ *
+ * Defragmentation is done manually in between the transaction reads, so it
+ * **never** happens inside this function (ibuf->rpos always points at
+ * the very beginning of the ibuf).
+ *
+ * Speaking of buffer reallocation, it only happens during the "saturation"
+ * phase, until the input buffer reaches the size big enough to hold a single
+ * transaction. Moreover, each next reallocation is exponentially less likely
+ * to happen, because the buffer size is doubled every time.
*/
static uint64_t
applier_read_tx(struct applier *applier, struct stailq *rows, double timeout)
@@ -757,8 +772,26 @@ applier_read_tx(struct applier *applier, struct stailq *rows, double timeout)
stailq_create(rows);
do {
+ const char *old_rpos = applier->ibuf.rpos;
struct applier_tx_row *tx_row = applier_read_tx_row(applier,
timeout);
+ /* Detect ibuf reallocation or defragmentation. */
+ ssize_t delta = applier->ibuf.rpos - old_rpos;
+ if (unlikely(delta != 0)) {
+ struct applier_tx_row *item;
+ stailq_foreach_entry(item, rows, next) {
+ struct xrow_header *row = &item->row;
+ if (row->bodycnt == 0)
+ continue;
+ /*
+ * The row body's offset relative to ibuf->rpos
+ * is constant, so they all were moved by the
+ * same delta as rpos was.
+ */
+ row->body->iov_base =
+ (char *)row->body->iov_base + delta;
+ }
+ }
tsn = set_next_tx_row(rows, tx_row, tsn);
++row_count;
} while (tsn != 0);
@@ -1123,8 +1156,8 @@ nopify:;
row = &item->row;
row->type = IPROTO_NOP;
/*
- * Row body is saved to fiber's region and will be freed
- * on next fiber_gc() call.
+ * Row body will be discarded together with the remaining
+ * input.
*/
row->bodycnt = 0;
}
@@ -1434,8 +1467,15 @@ applier_subscribe(struct applier *applier)
diag_raise();
}
- if (ibuf_used(ibuf) == 0)
- ibuf_reset(ibuf);
+ /* Discard processed input. */
+ ibuf->rpos = ibuf->xpos;
+ /*
+ * Even though this is not necessary, defragment the buffer
+ * explicitly. Otherwise the defragmentation would be triggered
+ * by one of the row reads, resulting in moving a bigger memory
+ * chunk.
+ */
+ ibuf_defragment(&applier->ibuf);
}
}
--
2.30.1 (Apple Git-130)
^ permalink raw reply [flat|nested] 8+ messages in thread
* [Tarantool-patches] [PATCH 3/4] applier: factor replication stream processing out of subscribe()
2021-12-06 3:03 [Tarantool-patches] [PATCH 0/4] replication: introduce applier thread Serge Petrenko via Tarantool-patches
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input Serge Petrenko via Tarantool-patches
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 2/4] applier: reuse input buffer to store row bodies Serge Petrenko via Tarantool-patches
@ 2021-12-06 3:03 ` Serge Petrenko via Tarantool-patches
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 4/4] Introduce applier thread Serge Petrenko via Tarantool-patches
2021-12-06 9:59 ` [Tarantool-patches] [PATCH 0/4] replication: introduce " Vladimir Davydov via Tarantool-patches
4 siblings, 0 replies; 8+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-12-06 3:03 UTC (permalink / raw)
To: v.shpilevoy, vdavydov; +Cc: tarantool-patches
applier_subscribe() is huge, since it performs 2 separate tasks: first
sends a subscribe request to the replication master and handles the
result, then processes the replication stream.
Factor out the replication stream processing into a separate routine to
make applier_subscribe() appear more sane.
Besides, this will be needed with applier-in-thread introduction, when
the connection will be established by the tx fiber, but the stream will
be processed by a separate thread.
Part-of #6329
---
src/box/applier.cc | 114 ++++++++++++++++++++++++---------------------
1 file changed, 62 insertions(+), 52 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index a8505c93a..393f0a2fe 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -1290,6 +1290,67 @@ applier_on_rollback(struct trigger *trigger, void *event)
return 0;
}
+/**
+ * Subscribe to the replication stream. Decode the incoming rows right in
+ * applier fiber.
+ */
+static void
+applier_subscribe_f(struct applier *applier)
+{
+ struct ibuf *ibuf = &applier->ibuf;
+ while (true) {
+ if (applier->state == APPLIER_FINAL_JOIN &&
+ instance_id != REPLICA_ID_NIL) {
+ say_info("final data received");
+ applier_set_state(applier, APPLIER_JOINED);
+ applier_set_state(applier, APPLIER_READY);
+ applier_set_state(applier, APPLIER_FOLLOW);
+ }
+
+ /*
+ * Tarantool < 1.7.7 does not send periodic heartbeat
+ * messages so we can't assume that if we haven't heard
+ * from the master for quite a while the connection is
+ * broken - the master might just be idle.
+ */
+ double timeout = applier->version_id < version_id(1, 7, 7) ?
+ TIMEOUT_INFINITY :
+ replication_disconnect_timeout();
+
+ struct stailq rows;
+ applier_read_tx(applier, &rows, timeout);
+
+ /*
+ * In case of an heartbeat message wake a writer up
+ * and check applier state.
+ */
+ struct xrow_header *first_row =
+ &stailq_first_entry(&rows, struct applier_tx_row,
+ next)->row;
+ raft_process_heartbeat(box_raft(), applier->instance_id);
+ if (first_row->lsn == 0) {
+ if (unlikely(iproto_type_is_raft_request(
+ first_row->type))) {
+ if (applier_handle_raft(applier,
+ first_row) != 0)
+ diag_raise();
+ }
+ applier_signal_ack(applier);
+ } else if (applier_apply_tx(applier, &rows) != 0) {
+ diag_raise();
+ }
+
+ /* Discard processed input. */
+ ibuf->rpos = ibuf->xpos;
+ /*
+ * Even though this is not necessary, defragment the buffer
+ * explicitly. Otherwise the defragmentation would be triggered
+ * by one of the row reads, resulting in moving a bigger memory
+ * chunk.
+ */
+ ibuf_defragment(&applier->ibuf);
+ }
+}
/**
* Execute and process SUBSCRIBE request (follow updates from a master).
*/
@@ -1425,58 +1486,7 @@ applier_subscribe(struct applier *applier)
/*
* Process a stream of rows from the binary log.
*/
- while (true) {
- if (applier->state == APPLIER_FINAL_JOIN &&
- instance_id != REPLICA_ID_NIL) {
- say_info("final data received");
- applier_set_state(applier, APPLIER_JOINED);
- applier_set_state(applier, APPLIER_READY);
- applier_set_state(applier, APPLIER_FOLLOW);
- }
-
- /*
- * Tarantool < 1.7.7 does not send periodic heartbeat
- * messages so we can't assume that if we haven't heard
- * from the master for quite a while the connection is
- * broken - the master might just be idle.
- */
- double timeout = applier->version_id < version_id(1, 7, 7) ?
- TIMEOUT_INFINITY :
- replication_disconnect_timeout();
-
- struct stailq rows;
- applier_read_tx(applier, &rows, timeout);
-
- /*
- * In case of an heartbeat message wake a writer up
- * and check applier state.
- */
- struct xrow_header *first_row =
- &stailq_first_entry(&rows, struct applier_tx_row,
- next)->row;
- raft_process_heartbeat(box_raft(), applier->instance_id);
- if (first_row->lsn == 0) {
- if (unlikely(iproto_type_is_raft_request(
- first_row->type))) {
- if (applier_handle_raft(applier,
- first_row) != 0)
- diag_raise();
- }
- applier_signal_ack(applier);
- } else if (applier_apply_tx(applier, &rows) != 0) {
- diag_raise();
- }
-
- /* Discard processed input. */
- ibuf->rpos = ibuf->xpos;
- /*
- * Even though this is not necessary, defragment the buffer
- * explicitly. Otherwise the defragmentation would be triggered
- * by one of the row reads, resulting in moving a bigger memory
- * chunk.
- */
- ibuf_defragment(&applier->ibuf);
- }
+ applier_subscribe_f(applier);
}
static inline void
--
2.30.1 (Apple Git-130)
^ permalink raw reply [flat|nested] 8+ messages in thread
* [Tarantool-patches] [PATCH 4/4] Introduce applier thread
2021-12-06 3:03 [Tarantool-patches] [PATCH 0/4] replication: introduce applier thread Serge Petrenko via Tarantool-patches
` (2 preceding siblings ...)
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 3/4] applier: factor replication stream processing out of subscribe() Serge Petrenko via Tarantool-patches
@ 2021-12-06 3:03 ` Serge Petrenko via Tarantool-patches
2021-12-06 9:59 ` [Tarantool-patches] [PATCH 0/4] replication: introduce " Vladimir Davydov via Tarantool-patches
4 siblings, 0 replies; 8+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-12-06 3:03 UTC (permalink / raw)
To: v.shpilevoy, vdavydov; +Cc: tarantool-patches
It's reported that in master-slave setup replicas often have higher CPU
usage than their replication masters. Moreover, it's easy for a replica
to start falling behind the master.
The reason for that is the additional work load on replica's tx thread as
compared to master. While master typically splits request processing into 2
threads: iproto, which decodes the requests, and tx, which applies them,
replica performs both tasks in the tx thread.
This is due to replication architecture: replica handles master
connection by a single fiber in a tx thread. The fiber first decodes the
incoming requests and then applies them.
Make it possible to decode the incoming replication stream in a separate
thread. This way tx thread doesn't waste processing time on row
decoding.
Each applier thread may serve several appliers, and the total number of
applier threads is controlled by a new configuration option -
`replication_num_threads`, with default value `1` (meaning, a single
thread is spawned to handle all the appliers.
Closes #6329
@TarantoolBot document
Title: New configuration option - `replication_num_threads`
It's now possible to specify how many threads will be spawned to decode
incoming replication streams.
Setting the value to anything except 0 (the old default) makes Tarantool
spawn a thread per each server in `box.cfg.replication`. The thread
handles replication stream decoding, lowering the TX thread CPU load and
potentially making the replica better keep up with the master.
There are never more replication threads than `replication_num_threads`,
and when there are more peers in `box.cfg.replication` than threads,
one thread may handle multiple data streams.
---
.../unreleased/gh-6329-applier-in-thread.md | 5 +
src/box/applier.cc | 840 ++++++++++++++++--
src/box/applier.h | 4 +
src/box/box.cc | 2 +-
src/box/lua/load_cfg.lua | 2 +
src/box/replication.cc | 5 +-
src/box/replication.h | 7 +-
7 files changed, 811 insertions(+), 54 deletions(-)
create mode 100644 changelogs/unreleased/gh-6329-applier-in-thread.md
diff --git a/changelogs/unreleased/gh-6329-applier-in-thread.md b/changelogs/unreleased/gh-6329-applier-in-thread.md
new file mode 100644
index 000000000..542b5adf1
--- /dev/null
+++ b/changelogs/unreleased/gh-6329-applier-in-thread.md
@@ -0,0 +1,5 @@
+## feature/replication
+
+* Make it possible to decode incoming replication data in a separate thread. Add
+ the `replication_num_threads` configuration option, which controls how many
+ threads may be spawned to do the task (gh-6329).
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 393f0a2fe..ec6c7a6e6 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -57,6 +57,8 @@
#include "txn_limbo.h"
#include "journal.h"
#include "raft.h"
+#include "tt_static.h"
+#include "cbus.h"
STRS(applier_state, applier_STATE);
@@ -161,7 +163,9 @@ static int
applier_writer_f(va_list ap)
{
struct applier *applier = va_arg(ap, struct applier *);
-
+ bool check_sync = va_arg(ap, int);
+ bool *has_acks_to_send = va_arg(ap, bool *);
+ struct vclock *vclock = va_arg(ap, struct vclock *);
/* ID is permanent while applier is alive */
uint32_t replica_id = applier->instance_id;
@@ -171,7 +175,7 @@ applier_writer_f(va_list ap)
* messages so we don't need to send ACKs every
* replication_timeout seconds any more.
*/
- if (!applier->has_acks_to_send) {
+ if (!*has_acks_to_send) {
if (applier->version_id >= version_id(1, 7, 7))
fiber_cond_wait_timeout(&applier->writer_cond,
TIMEOUT_INFINITY);
@@ -185,16 +189,18 @@ applier_writer_f(va_list ap)
* update an applier status because the applier state could
* yield and doesn't fit into a commit trigger.
*/
- applier_check_sync(applier);
+ if (check_sync) {
+ applier_check_sync(applier);
+ /* Send ACKs only when in FOLLOW mode ,*/
+ if (applier->state != APPLIER_SYNC &&
+ applier->state != APPLIER_FOLLOW)
+ continue;
+ }
- /* Send ACKs only when in FOLLOW mode ,*/
- if (applier->state != APPLIER_SYNC &&
- applier->state != APPLIER_FOLLOW)
- continue;
try {
- applier->has_acks_to_send = false;
+ *has_acks_to_send = false;
struct xrow_header xrow;
- xrow_encode_vclock(&xrow, &replicaset.vclock);
+ xrow_encode_vclock(&xrow, vclock);
/*
* For relay lag statistics we report last
* written transaction timestamp in tm field.
@@ -295,24 +301,30 @@ process_nop(struct request *request)
}
static int
-apply_row(struct xrow_header *row)
+apply_request(struct request *request)
{
- struct request request;
- assert(!iproto_type_is_synchro_request(row->type));
- if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
- return -1;
- if (request.type == IPROTO_NOP)
- return process_nop(&request);
- struct space *space = space_cache_find(request.space_id);
+ if (request->type == IPROTO_NOP)
+ return process_nop(request);
+ struct space *space = space_cache_find(request->space_id);
if (space == NULL)
return -1;
- if (box_process_rw(&request, space, NULL) != 0) {
- say_error("error applying row: %s", request_str(&request));
+ if (box_process_rw(request, space, NULL) != 0) {
+ say_error("error applying row: %s", request_str(request));
return -1;
}
return 0;
}
+static int
+apply_row(struct xrow_header *row)
+{
+ struct request request;
+ assert(!iproto_type_is_synchro_request(row->type));
+ if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
+ return -1;
+ return apply_request(&request);
+}
+
/**
* Connect to a remote host and authenticate the client.
*/
@@ -543,6 +555,15 @@ applier_read_tx(struct applier *applier, struct stailq *rows, double timeout);
static int
apply_final_join_tx(uint32_t replica_id, struct stailq *rows);
+union applier_request {
+ struct request dml;
+ struct synchro_request synchro;
+ struct {
+ struct raft_request req;
+ struct vclock vclock;
+ } raft;
+};
+
/**
* A helper struct to link xrow objects in a list.
*/
@@ -551,6 +572,15 @@ struct applier_tx_row {
struct stailq_entry next;
/* xrow_header struct for the current transaction row. */
struct xrow_header row;
+ /* The decoded request, if any. */
+ union applier_request decoded[];
+};
+
+struct applier_tx {
+ /** A link in tx list. */
+ struct stailq_entry next;
+ /** The transaction rows. */
+ struct stailq rows;
};
/** Defragment the input buffer: move its contents, if any, to its beginning. */
@@ -691,7 +721,6 @@ applier_read_tx_row(struct applier *applier, double timeout)
if (tx_row == NULL)
tnt_raise(OutOfMemory, size, "region_alloc_object", "tx_row");
-
struct xrow_header *row = &tx_row->row;
ERROR_INJECT_YIELD(ERRINJ_APPLIER_READ_TX_ROW_DELAY);
@@ -761,8 +790,9 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)
*
* Speaking of buffer reallocation, it only happens during the "saturation"
* phase, until the input buffer reaches the size big enough to hold a single
- * transaction. Moreover, each next reallocation is exponentially less likely
- * to happen, because the buffer size is doubled every time.
+ * transaction (or a batch of transactions). Moreover, each next reallocation
+ * is exponentially less likely to happen, because the buffer size is doubled
+ * every time.
*/
static uint64_t
applier_read_tx(struct applier *applier, struct stailq *rows, double timeout)
@@ -895,7 +925,7 @@ struct synchro_entry {
* Async write journal completion.
*/
static void
-apply_synchro_row_cb(struct journal_entry *entry)
+apply_synchro_req_cb(struct journal_entry *entry)
{
assert(entry->complete_data != NULL);
struct synchro_entry *synchro_entry =
@@ -910,16 +940,9 @@ apply_synchro_row_cb(struct journal_entry *entry)
fiber_wakeup(synchro_entry->owner);
}
-/** Process a synchro request. */
static int
-apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
+apply_synchro_req(uint32_t replica_id, struct xrow_header *row, struct synchro_request *req)
{
- assert(iproto_type_is_synchro_request(row->type));
-
- struct synchro_request req;
- if (xrow_decode_synchro(row, &req) != 0)
- goto err;
-
struct replica_cb_data rcb_data;
struct synchro_entry entry;
/*
@@ -930,8 +953,8 @@ apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
rows = entry.base.rows;
rows[0] = row;
journal_entry_create(&entry.base, 1, xrow_approx_len(row),
- apply_synchro_row_cb, &entry);
- entry.req = &req;
+ apply_synchro_req_cb, &entry);
+ entry.req = req;
entry.owner = fiber();
rcb_data.replica_id = replica_id;
@@ -968,26 +991,48 @@ err:
return -1;
}
+/** Process a synchro request. */
static int
-applier_handle_raft(struct applier *applier, struct xrow_header *row)
+apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
{
- assert(iproto_type_is_raft_request(row->type));
+ assert(iproto_type_is_synchro_request(row->type));
+
+ struct synchro_request req;
+ if (xrow_decode_synchro(row, &req) != 0) {
+ diag_log();
+ return -1;
+ }
+
+ return apply_synchro_req(replica_id, row, &req);
+}
+
+static int
+applier_handle_raft_request(struct applier *applier, struct raft_request *req)
+{
+
if (applier->instance_id == 0) {
diag_set(ClientError, ER_PROTOCOL, "Can't apply a Raft request "
"from an instance without an ID");
return -1;
}
+ return box_raft_process(req, applier->instance_id);
+}
+
+static int
+applier_handle_raft(struct applier *applier, struct xrow_header *row)
+{
+ assert(iproto_type_is_raft_request(row->type));
struct raft_request req;
struct vclock candidate_clock;
if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
return -1;
- return box_raft_process(&req, applier->instance_id);
+ return applier_handle_raft_request(applier, &req);
}
static int
apply_plain_tx(uint32_t replica_id, struct stailq *rows,
- bool skip_conflict, bool use_triggers)
+ bool skip_conflict, bool use_triggers, bool decoded)
{
/*
* Explicitly begin the transaction so that we can
@@ -1002,7 +1047,8 @@ apply_plain_tx(uint32_t replica_id, struct stailq *rows,
stailq_foreach_entry(item, rows, next) {
struct xrow_header *row = &item->row;
- int res = apply_row(row);
+ int res = decoded ? apply_request(&item->decoded->dml) :
+ apply_row(row);
if (res != 0 && skip_conflict) {
struct error *e = diag_last_error(diag_get());
/*
@@ -1101,7 +1147,7 @@ apply_final_join_tx(uint32_t replica_id, struct stailq *rows)
assert(first_row == last_row);
rc = apply_synchro_row(replica_id, first_row);
} else {
- rc = apply_plain_tx(replica_id, rows, false, false);
+ rc = apply_plain_tx(replica_id, rows, false, false, false);
}
fiber_gc();
return rc;
@@ -1115,7 +1161,7 @@ apply_final_join_tx(uint32_t replica_id, struct stailq *rows)
* The rows are replaced with NOPs to preserve the vclock consistency.
*/
static void
-applier_synchro_filter_tx(struct stailq *rows)
+applier_synchro_filter_tx(struct stailq *rows, bool decoded)
{
/*
* XXX: in case raft is disabled, synchronous replication still works
@@ -1160,6 +1206,8 @@ nopify:;
* input.
*/
row->bodycnt = 0;
+ if (decoded)
+ item->decoded->dml.type = IPROTO_NOP;
}
}
@@ -1169,7 +1217,7 @@ nopify:;
* Return 0 for success or -1 in case of an error.
*/
static int
-applier_apply_tx(struct applier *applier, struct stailq *rows)
+applier_apply_tx(struct applier *applier, struct stailq *rows, bool decoded)
{
/*
* Initially we've been filtering out data if it came from
@@ -1187,8 +1235,10 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
* Finally we dropped such "sender" filtration and use transaction
* "initiator" filtration via xrow->replica_id only.
*/
- struct xrow_header *first_row = &stailq_first_entry(rows,
- struct applier_tx_row, next)->row;
+ struct applier_tx_row *txr = stailq_first_entry(rows,
+ struct applier_tx_row,
+ next);
+ struct xrow_header *first_row = &txr->row;
struct xrow_header *last_row;
last_row = &stailq_last_entry(rows, struct applier_tx_row, next)->row;
struct replica *replica = replica_by_id(first_row->replica_id);
@@ -1225,7 +1275,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
}
}
}
- applier_synchro_filter_tx(rows);
+ applier_synchro_filter_tx(rows, decoded);
if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
/*
* Synchro messages are not transactions, in terms
@@ -1233,10 +1283,15 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
* each other.
*/
assert(first_row == last_row);
- rc = apply_synchro_row(applier->instance_id, first_row);
+ if (decoded) {
+ rc = apply_synchro_req(applier->instance_id, &txr->row,
+ &txr->decoded->synchro);
+ } else {
+ rc = apply_synchro_row(applier->instance_id, first_row);
+ }
} else {
rc = apply_plain_tx(applier->instance_id, rows,
- replication_skip_conflict, true);
+ replication_skip_conflict, true, decoded);
}
if (rc != 0)
goto finish;
@@ -1256,7 +1311,8 @@ finish:
static inline void
applier_signal_ack(struct applier *applier)
{
- fiber_cond_signal(&applier->writer_cond);
+ if (applier->writer != NULL)
+ fiber_cond_signal(&applier->writer_cond);
applier->has_acks_to_send = true;
}
@@ -1290,6 +1346,682 @@ applier_on_rollback(struct trigger *trigger, void *event)
return 0;
}
+/** The underlying thread behind a number of appliers. */
+struct applier_thread {
+ /** A link in allocated threads list. */
+ struct rlist in_list;
+ struct cord cord;
+ /** The single thread endpoint. */
+ struct cbus_endpoint endpoint;
+ /** A pre-allocated message emitted by the thread once it's exiting. */
+ struct cmsg join_msg;
+ /** Count of appliers attached to this thread. */
+ int n_appliers;
+ /** Whether the thread has no appliers and has to be joined. */
+ bool is_exiting;
+};
+
+/**
+ * Notify tx that the applier thread has detached its last applier and has to
+ * be joined.
+ */
+static void
+applier_thread_set_joinable(struct cmsg *msg)
+{
+ struct applier_thread *thread = container_of(msg, struct applier_thread,
+ join_msg);
+ thread->is_exiting = true;
+}
+
+static const struct cmsg_hop join_msg_route[] = {
+ {applier_thread_set_joinable, NULL},
+};
+
+struct applier_detach_msg {
+ struct cmsg base;
+ struct diag diag;
+};
+
+/**
+ * Propagate the exception causing the applier death to tx from applier
+ * thread.
+ */
+static void
+applier_detach_thread(struct cmsg *base)
+{
+ struct applier_detach_msg *msg = (struct applier_detach_msg *)base;
+ assert(!diag_is_empty(&msg->diag));
+ diag_move(&msg->diag, &fiber()->diag);
+ diag_raise();
+}
+
+static const struct cmsg_hop detach_msg_route[] = {
+ {applier_detach_thread, NULL},
+};
+
+struct applier_msg {
+ struct cmsg base;
+ struct applier *applier;
+ /**
+ * The last known confirmed vclock to be used by applier ack writer
+ * fiber.
+ */
+ struct vclock ack_vclock;
+ /** Whether the message has an updated ack_vclcok value. */
+ bool has_ack;
+ /** A list of read up transactions to be processed in tx thread. */
+ struct stailq txs;
+ /** This message's personal buffer, holding input rows. */
+ struct ibuf ibuf;
+ /** This message's region used to allocate auxiliary structures. */
+ struct region region;
+ int txn_cnt;
+};
+
+/* Applier thread related data. */
+struct applier_thread_data {
+ /** A pointer to the thread handling this applier's data stream. */
+ struct applier_thread *thread;
+ /** Applier's personal endpoint in the tx thread. */
+ struct cbus_endpoint endpoint;
+ /** A pipe from the applier thread to tx. */
+ struct cpipe tx_pipe;
+ /** A pipe from tx to the applier thread. */
+ struct cpipe thread_pipe;
+ /**
+ * A pair of rotating messages. While one of the messages is processed
+ * in the tx thread the other is used to store incoming rows.
+ */
+ struct applier_msg msgs[2];
+ /** Usual message route. */
+ struct cmsg_hop route[2];
+ /** An index of the message currently filling up in applier thread. */
+ int msg_ptr;
+ /**
+ * A preallocated message used to notify tx that the applier should exit
+ * due to a network error.
+ */
+ struct applier_detach_msg exit_msg;
+ /** The reader fiber, reading and parsing incoming rows. */
+ struct fiber *reader;
+ /** The writer fiber, writing acks to the replication master. */
+ struct fiber *writer;
+ /** A trigger invoked on reader fiber stop. */
+ struct trigger reader_on_stop;
+ /** A trigger invoked on writer fiber stop. */
+ struct trigger writer_on_stop;
+ /** The latest known ack vclock to send to the replication master. */
+ struct vclock ack_vclock;
+ /** Whether an ack should be sent or not. */
+ bool has_ack;
+};
+
+/**
+ * The tx part of applier-in-thread machinery. Apply all the parsed transactions
+ * and notify the applier thread of new ack vclock value.
+ */
+static void
+applier_process_batch(struct cmsg *base)
+{
+ struct applier_msg *msg = (struct applier_msg *)base;
+ struct applier *applier = msg->applier;
+ struct applier_tx *tx;
+ stailq_foreach_entry(tx, &msg->txs, next) {
+ struct xrow_header *first_row =
+ &stailq_first_entry(&tx->rows, struct applier_tx_row,
+ next)->row;
+ raft_process_heartbeat(box_raft(), applier->instance_id);
+ if (first_row->lsn == 0) {
+ if (unlikely(iproto_type_is_raft_request(
+ first_row->type))) {
+ if (applier_handle_raft(applier,
+ first_row) != 0)
+ diag_raise();
+ }
+ applier_signal_ack(applier);
+ } else if (applier_apply_tx(applier, &tx->rows, true) != 0) {
+ diag_raise();
+ }
+ }
+ if (applier->has_acks_to_send) {
+ applier_check_sync(applier);
+ vclock_copy(&msg->ack_vclock, &replicaset.vclock);
+ msg->has_ack = true;
+ applier->has_acks_to_send = false;
+ }
+}
+
+/** The callback invoked on the message return to applier thread. */
+static void
+applier_thread_return_batch(struct cmsg *base)
+{
+ struct applier_msg *msg = (struct applier_msg *) base;
+ struct ibuf *ibuf = &msg->ibuf;
+ struct applier *applier = msg->applier;
+ struct applier_thread_data *tdata = applier->tdata;
+ msg->txn_cnt = 0;
+ ibuf->rpos = ibuf->xpos; /* forget processed input */
+ ibuf_defragment(ibuf);
+ region_reset(&msg->region);
+ stailq_create(&msg->txs);
+ if (tdata->reader != NULL && !fiber_is_dead(tdata->reader))
+ fiber_wakeup(tdata->reader);
+ if (msg->has_ack) {
+ tdata->has_ack = true;
+ vclock_copy(&tdata->ack_vclock, &msg->ack_vclock);
+ if (tdata->writer != NULL && !fiber_is_dead(tdata->writer)) {
+ fiber_cond_signal(&applier->writer_cond);
+ }
+ msg->has_ack = false;
+ }
+}
+
+/**
+ * Given a pair of input buffers, move the unparsed data from the buffer
+ * departing to tx thread to the buffer remaining available in applier thread.
+ */
+static inline size_t
+move_unparsed(struct ibuf *oldbuf, struct ibuf *newbuf)
+{
+ assert(ibuf_used(newbuf) == 0); /* cannot move on top of other data */
+ size_t unparsed = ibuf_unparsed(oldbuf);
+ if (unparsed > 0) {
+ void *ptr = ibuf_alloc(newbuf, unparsed);
+ if (ptr == NULL) {
+ panic("Applier failed to allocate memory for incoming "
+ "transactions on ibuf");
+ }
+ memcpy(ptr, oldbuf->xpos, unparsed);
+ oldbuf->wpos -= unparsed;
+ }
+ return unparsed;
+}
+
+/** Get the message not containing any data ready for processing. */
+static struct applier_msg *
+applier_thread_next_msg(struct applier_thread_data *tdata)
+{
+ int cur = tdata->msg_ptr;
+ struct applier_msg *msg = &tdata->msgs[cur];
+ struct ibuf *ibuf = &msg->ibuf;
+ if (ibuf_unparsed(ibuf) == ibuf_used(ibuf)) {
+ /*
+ * The ibuf doesn't contain any parsed data.
+ * Use current mesage.
+ */
+ return msg;
+ }
+ cur = (cur + 1) % 2;
+ msg = &tdata->msgs[cur];
+ struct ibuf *other = ibuf;
+ ibuf = &msg->ibuf;
+ if (ibuf_used(ibuf) == 0) {
+ tdata->msg_ptr = cur;
+ move_unparsed(other, ibuf);
+ return msg;
+ }
+ return NULL;
+}
+
+static void
+applier_thread_push_batch(struct applier_thread_data *tdata,
+ struct applier_msg *msg, struct stailq *txs)
+{
+ assert(msg != NULL);
+ stailq_concat(&msg->txs, txs);
+
+ cmsg_init(&msg->base, tdata->route);
+ cpipe_push(&tdata->tx_pipe, &msg->base);
+}
+
+/**
+ * Read as much data as possible. Do not yield as long as at least one byte is
+ * available.
+ */
+static ssize_t
+applier_read_ahead(struct applier *applier, struct ibuf *ibuf)
+{
+ return coio_breadn_timeout(&applier->io, ibuf, 1,
+ replication_disconnect_timeout());
+}
+
+static struct applier_tx *
+applier_parse_tx(struct applier *applier, struct ibuf *ibuf,
+ struct region *region)
+{
+ const char *data = ibuf->xpos;
+ const char *end = ibuf->wpos;
+ uint64_t tsn = 0;
+ struct applier_tx *tx = region_alloc_object_xc(region,
+ struct applier_tx);
+ stailq_create(&tx->rows);
+ do {
+ if (data == end)
+ goto not_read;
+ if (mp_typeof(*data) != MP_UINT) {
+ tnt_raise(ClientError, ER_INVALID_MSGPACK,
+ "packet length");
+ }
+ if (mp_check_uint(data, end) > 0) {
+ goto not_read;
+ }
+ uint64_t len = mp_decode_uint(&data);
+ const char *reqend = data + len;
+ if (reqend > end) {
+ goto not_read;
+ }
+ struct applier_tx_row *tx_row;
+ size_t size = sizeof(*tx_row) + sizeof(*tx_row->decoded);
+ tx_row = (typeof(tx_row))region_aligned_alloc_xc(region, size,
+ alignof(*tx_row));
+ memset(tx_row, 0, size);
+ struct xrow_header *row = &tx_row->row;
+ xrow_header_decode_xc(row, &data, reqend, true);
+ if (row->tm > 0) {
+ applier->lag = ev_now(loop()) - row->tm;
+ }
+ applier->last_row_time = ev_monotonic_now(loop());
+ tsn = set_next_tx_row(&tx->rows, tx_row, tsn);
+ } while (tsn != 0);
+
+ /* All txn row headers are parsed. Time to parse row bodies. */
+ struct applier_tx_row *item;
+ stailq_foreach_entry(item, &tx->rows, next) {
+ struct xrow_header *row = &item->row;
+ uint16_t type = row->type;
+ if (iproto_type_is_dml(type)) {
+ if (xrow_decode_dml(row, &item->decoded->dml,
+ dml_request_key_map(type)) != 0) {
+ diag_raise();
+ }
+ } else if (iproto_type_is_synchro_request(type)) {
+ if (xrow_decode_synchro(row, &item->decoded->synchro) != 0) {
+ diag_raise();
+ }
+ } else if (iproto_type_is_raft_request(type)) {
+ if (xrow_decode_raft(row, &item->decoded->raft.req,
+ &item->decoded->raft.vclock) != 0) {
+ diag_raise();
+ }
+ } else if (type == IPROTO_OK) {
+ /* Nothing to do. */
+ } else {
+ tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, type);
+ }
+ }
+ size_t read;
+ read = data - ibuf->xpos;
+ ibuf->xpos += read;
+ return tx;
+not_read:
+ /* Discard the preallocated messages. */
+ region_reset(region);
+ return NULL;
+}
+
+static int
+applier_decode_txs(struct applier *applier, struct ibuf *ibuf,
+ struct region *region, struct stailq *txs)
+{
+ int tx_cnt = 0;
+ while (true) {
+ struct applier_tx *tx;
+ tx = applier_parse_tx(applier, ibuf, region);
+ if (tx == NULL)
+ break;
+ stailq_add_tail_entry(txs, tx, next);
+ ++tx_cnt;
+ }
+ return tx_cnt;
+}
+
+/** Applier thread reader fiber function. */
+static int
+applier_thread_reader_f(va_list ap)
+{
+ struct applier *applier = va_arg(ap, struct applier *);
+ while (!fiber_is_cancelled()) {
+ struct ibuf *ibuf;
+ struct region *region;
+ struct applier_msg *msg;
+ do {
+ msg = applier_thread_next_msg(applier->tdata);
+ if (msg == NULL) {
+ fiber_yield();
+ if (fiber_is_cancelled())
+ return 0;
+ }
+ } while (msg == NULL);
+ ibuf = &msg->ibuf;
+ region = &msg->region;
+ struct stailq txs;
+ int tx_cnt;
+ try {
+ applier_read_ahead(applier, ibuf);
+ stailq_create(&txs);
+ tx_cnt = applier_decode_txs(applier, ibuf, region,
+ &txs);
+ } catch (Exception *e) {
+ return -1;
+ }
+ if (tx_cnt > 0)
+ applier_thread_push_batch(applier->tdata, msg, &txs);
+ else if (tx_cnt < 0)
+ return -1;
+ }
+ return 0;
+}
+
+/** The main applier thread fiber function. */
+static int
+applier_thread_f(va_list ap)
+{
+ struct applier_thread *thread = va_arg(ap, typeof(thread));
+ int rc = cbus_endpoint_create(&thread->endpoint, cord()->name,
+ fiber_schedule_cb, fiber());
+ assert(rc == 0);
+ cmsg_init(&thread->join_msg, join_msg_route);
+ thread->n_appliers = 0;
+
+ cbus_loop(&thread->endpoint);
+
+ cbus_endpoint_destroy(&thread->endpoint, cbus_process);
+ assert(thread->n_appliers == 0);
+ return 0;
+}
+
+/** Initialize and start the applier thread. */
+static int
+applier_thread_create(struct applier_thread *thread)
+{
+ static int thread_id = 0;
+ const char *name = tt_sprintf("applier_%d", ++thread_id);
+
+ thread->is_exiting = false;
+ if (cord_costart(&thread->cord, name, applier_thread_f, thread) != 0) {
+ return -1;
+ }
+ return thread_id;
+}
+
+/** A memory pool used for allocating applier threads. */
+struct mempool applier_thread_pool;
+
+/** A count of currently live applier threads. */
+static int applier_thread_cnt = 0;
+
+/** Alive applier thread list. */
+static RLIST_HEAD(thread_list);
+
+/**
+ * A pointer to the thread which will accept appliers once thread count reaches
+ * the maximum configured value.
+ */
+static struct applier_thread *fill_thread = NULL;
+
+/** Allocate the applier thread structure. */
+static struct applier_thread *
+applier_thread_alloc(void)
+{
+ if (!mempool_is_initialized(&applier_thread_pool)) {
+ mempool_create(&applier_thread_pool, &cord()->slabc,
+ sizeof(struct applier_thread));
+ }
+ struct applier_thread *thread =
+ (struct applier_thread *)mempool_alloc(&applier_thread_pool);
+ if (thread == NULL) {
+ diag_set(OutOfMemory, sizeof(*thread), "mempool_alloc",
+ "applier thread");
+ }
+ return thread;
+}
+
+/**
+ * Get a working applier thread. Either create a new one or use one of the
+ * already initialized.
+ */
+static struct applier_thread *
+applier_thread_new(void)
+{
+ assert(replication_num_applier_threads > 0);
+ struct applier_thread *thread;
+ if (applier_thread_cnt < replication_num_applier_threads) {
+alloc: thread = applier_thread_alloc();
+ if (thread != NULL) {
+ if (applier_thread_create(thread) < 0) {
+ mempool_free(&applier_thread_pool, thread);
+ return NULL;
+ }
+ rlist_add_tail_entry(&thread_list, thread, in_list);
+ applier_thread_cnt++;
+ }
+ } else {
+ assert(!rlist_empty(&thread_list));
+ if (fill_thread == NULL) {
+ fill_thread = rlist_first_entry(&thread_list,
+ struct applier_thread,
+ in_list);
+ }
+ thread = fill_thread;
+ /*
+ * Fill the threads in a round-robin manner. Each new applier
+ * goes to the next available thread.
+ */
+ if (fill_thread == rlist_last_entry(&thread_list,
+ struct applier_thread,
+ in_list)) {
+ fill_thread = rlist_first_entry(&thread_list,
+ struct applier_thread,
+ in_list);
+ } else {
+ fill_thread = rlist_next_entry(fill_thread, in_list);
+ }
+ if (thread->is_exiting)
+ goto alloc;
+ }
+ return thread;
+}
+
+/** Destroy the applier thread. */
+static void
+applier_thread_free(struct applier_thread *thread)
+{
+ if (cord_cojoin(&thread->cord) < 0) {
+ panic("Can't join the applier thread.");
+ }
+ rlist_del_entry(thread, in_list);
+ mempool_free(&applier_thread_pool, thread);
+ applier_thread_cnt--;
+ if (fill_thread == thread)
+ fill_thread = NULL;
+}
+
+/** Initialize applier thread messages. */
+static void
+applier_thread_msgs_init(struct applier *applier)
+{
+ struct applier_thread_data *tdata = applier->tdata;
+ for (int i = 0; i < 2; i++) {
+ struct applier_msg *msg = &tdata->msgs[i];
+ memset(msg, 0, sizeof(*msg));
+ msg->applier = applier;
+ stailq_create(&msg->txs);
+ ibuf_create(&msg->ibuf, &cord()->slabc, 1024);
+ region_create(&msg->region, &cord()->slabc);
+ }
+ tdata->msg_ptr = 0;
+
+ cmsg_init(&tdata->exit_msg.base, detach_msg_route);
+ diag_create(&tdata->exit_msg.diag);
+
+ /* Initialize the default message route. */
+ tdata->route[0].f = applier_process_batch;
+ tdata->route[0].pipe = &tdata->thread_pipe;
+ tdata->route[1].f = applier_thread_return_batch;
+ tdata->route[1].pipe = NULL;
+}
+
+/**
+ * A trigger fired on each of applier thread fibers death. Propagates the
+ * exception with which the fiber exited to the tx thread.
+ */
+static int
+applier_thread_fiber_on_stop(struct trigger *trigger, void *event)
+{
+ struct fiber *fiber = (struct fiber *)event;
+ assert(fiber == fiber());
+ struct applier_thread_data *tdata =
+ (struct applier_thread_data *)trigger->data;
+ assert(fiber == tdata->reader || fiber == tdata->writer);
+ if (fiber->f_ret != 0 && !(fiber->flags & FIBER_IS_CANCELLED)) {
+ /* Notify the tx thread that its applier is dead. */
+ assert(!diag_is_empty(&fiber->diag));
+ diag_move(&fiber->diag, &tdata->exit_msg.diag);
+ cpipe_push(&tdata->tx_pipe, &tdata->exit_msg.base);
+ }
+ trigger_clear(trigger);
+ return 0;
+}
+
+/** Initialize fibers needed for applier in thread operation. */
+static inline void
+applier_thread_fiber_init(struct applier *applier)
+{
+ struct applier_thread_data *tdata = applier->tdata;
+ tdata->reader = fiber_new_xc("reader", applier_thread_reader_f);
+ tdata->writer = fiber_new_xc("writer", applier_writer_f);
+ trigger_create(&tdata->reader_on_stop, applier_thread_fiber_on_stop,
+ tdata, NULL);
+ trigger_create(&tdata->writer_on_stop, applier_thread_fiber_on_stop,
+ tdata, NULL);
+ trigger_add(&tdata->reader->on_stop, &tdata->reader_on_stop);
+ trigger_add(&tdata->writer->on_stop, &tdata->writer_on_stop);
+ fiber_set_joinable(tdata->reader, true);
+ fiber_set_joinable(tdata->writer, true);
+ fiber_start(tdata->reader, applier);
+ fiber_start(tdata->writer, applier, false, &tdata->has_ack,
+ &tdata->ack_vclock);
+}
+
+/** Notify the applier thread it has to serve yet another applier. */
+static void
+applier_thread_attach_applier(void *data)
+{
+ struct applier *applier = (struct applier *)data;
+ struct applier_thread *thread = container_of(cord(), typeof(*thread),
+ cord);
+
+ applier_thread_msgs_init(applier);
+ applier_thread_fiber_init(applier);
+
+ ++thread->n_appliers;
+}
+
+/** Notify the applier thread one of the appliers it served is dead. */
+static void
+applier_thread_detach_applier(void *data)
+{
+ struct applier *applier = (struct applier *)data;
+ struct applier_thread_data *tdata = applier->tdata;
+ struct applier_thread *thread = container_of(cord(), typeof(*thread),
+ cord);
+
+ fiber_cancel(tdata->reader);
+ fiber_cancel(tdata->writer);
+ /*
+ * We do not join the fibers, since we do not care about their return
+ * codes. The exceptions are propagated elewhere.
+ */
+ if (--thread->n_appliers == 0) {
+ fiber_cancel(fiber());
+ cpipe_push(&tdata->tx_pipe, &thread->join_msg);
+ }
+}
+
+static void
+applier_thread_data_free(struct applier *applier);
+
+struct mempool applier_thread_data_pool;
+
+/**
+ * Create and initialize the applier-in-thread data and notify the thread
+ * there's a new applier.
+ */
+static struct applier_thread_data *
+applier_thread_data_new(struct applier *applier, struct applier_thread *thread)
+{
+ assert(thread != NULL);
+ if (!mempool_is_initialized(&applier_thread_data_pool)) {
+ mempool_create(&applier_thread_data_pool, &cord()->slabc,
+ sizeof(struct applier_thread_data));
+ }
+ struct applier_thread_data *tdata =
+ (typeof(tdata))mempool_alloc(&applier_thread_data_pool);
+ if (tdata == NULL) {
+ diag_set(OutOfMemory, sizeof(*tdata), "mempool_alloc",
+ "applier thread data");
+ return NULL;
+ }
+
+ tdata->thread = thread;
+ const char *thread_name = thread->cord.name;
+ const char *name = tt_sprintf("applier_%p", applier);
+ int rc = cbus_endpoint_create(&tdata->endpoint, name, fiber_schedule_cb,
+ fiber());
+ assert(rc == 0);
+ applier->tdata = tdata;
+ auto guard = make_scoped_guard([&]{
+ applier_thread_data_free(applier);
+ });
+ cbus_pair(thread_name, name, &tdata->thread_pipe, &tdata->tx_pipe,
+ applier_thread_attach_applier, applier, cbus_process);
+ guard.is_active = false;
+ return applier->tdata;
+}
+
+/**
+ * Remove the applier from the thread and destroy the supporting data structure.
+ */
+static void
+applier_thread_data_free(struct applier *applier)
+{
+ struct applier_thread_data *tdata = applier->tdata;
+ assert(tdata != NULL);
+
+ cbus_unpair(&tdata->thread_pipe, &tdata->tx_pipe,
+ applier_thread_detach_applier, applier, cbus_process);
+ cbus_endpoint_destroy(&tdata->endpoint, cbus_process);
+
+ if (tdata->thread->is_exiting)
+ applier_thread_free(tdata->thread);
+ mempool_free(&applier_thread_data_pool, tdata);
+ applier->tdata = NULL;
+}
+
+/**
+ * Subscribe to the replication stream. Use a separate decoding thread.
+ */
+static void
+applier_thread_subscribe(struct applier *applier)
+{
+ struct applier_thread *thread = applier_thread_new();
+
+ applier->tdata = applier_thread_data_new(applier, thread);
+ if (applier->tdata == NULL) {
+ diag_raise();
+ }
+
+ struct applier_thread_data *tdata = applier->tdata;
+
+ auto guard = make_scoped_guard([&]{
+ applier_thread_data_free(applier);
+ });
+
+ cbus_loop(&tdata->endpoint);
+
+ tnt_raise(FiberIsCancelled);
+}
+
/**
* Subscribe to the replication stream. Decode the incoming rows right in
* applier fiber.
@@ -1336,7 +2068,7 @@ applier_subscribe_f(struct applier *applier)
diag_raise();
}
applier_signal_ack(applier);
- } else if (applier_apply_tx(applier, &rows) != 0) {
+ } else if (applier_apply_tx(applier, &rows, false) != 0) {
diag_raise();
}
@@ -1351,6 +2083,7 @@ applier_subscribe_f(struct applier *applier)
ibuf_defragment(&applier->ibuf);
}
}
+
/**
* Execute and process SUBSCRIBE request (follow updates from a master).
*/
@@ -1448,7 +2181,8 @@ applier_subscribe(struct applier *applier)
/* Re-enable warnings after successful execution of SUBSCRIBE */
applier->last_logged_errcode = 0;
- if (applier->version_id >= version_id(1, 7, 4)) {
+ if (applier->version_id >= version_id(1, 7, 4) &&
+ replication_num_applier_threads == 0) {
/* Enable replication ACKs for newer servers */
assert(applier->writer == NULL);
@@ -1458,7 +2192,8 @@ applier_subscribe(struct applier *applier)
applier->writer = fiber_new_xc(name, applier_writer_f);
fiber_set_joinable(applier->writer, true);
- fiber_start(applier->writer, applier);
+ fiber_start(applier->writer, applier, true,
+ &applier->has_acks_to_send, &replicaset.vclock);
}
applier->lag = TIMEOUT_INFINITY;
@@ -1486,7 +2221,10 @@ applier_subscribe(struct applier *applier)
/*
* Process a stream of rows from the binary log.
*/
- applier_subscribe_f(applier);
+ if (replication_num_applier_threads > 0)
+ applier_thread_subscribe(applier);
+ else
+ applier_subscribe_f(applier);
}
static inline void
diff --git a/src/box/applier.h b/src/box/applier.h
index 899dc053a..0658cbd25 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -71,6 +71,8 @@ enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */
ENUM(applier_state, applier_STATE);
extern const char *applier_state_strs[];
+struct applier_thread_data;
+
/**
* State of a replication connection to the master
*/
@@ -132,6 +134,8 @@ struct applier {
struct diag diag;
/* Master's vclock at the time of SUBSCRIBE. */
struct vclock remote_vclock_at_subscribe;
+ /* The thread data used by this applier. */
+ struct applier_thread_data *tdata;
};
/**
diff --git a/src/box/box.cc b/src/box/box.cc
index 0413cbf44..eeef3029a 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -3629,7 +3629,7 @@ box_cfg_xc(void)
gc_init();
engine_init();
schema_init();
- replication_init();
+ replication_init(cfg_geti_default("replication_num_threads", 1));
port_init();
iproto_init(cfg_geti("iproto_threads"));
sql_init();
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index f14c8de5d..946d3537d 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -103,6 +103,7 @@ local default_cfg = {
replication_connect_quorum = nil, -- connect all
replication_skip_conflict = false,
replication_anon = false,
+ replication_num_threads = 1,
feedback_enabled = true,
feedback_crashinfo = true,
feedback_host = "https://feedback.tarantool.io",
@@ -210,6 +211,7 @@ local template_cfg = {
replication_connect_quorum = 'number',
replication_skip_conflict = 'boolean',
replication_anon = 'boolean',
+ replication_num_threads = 'number',
feedback_enabled = ifdef_feedback('boolean'),
feedback_crashinfo = ifdef_feedback('boolean'),
feedback_host = ifdef_feedback('string'),
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 10b4ac915..028d9f054 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -56,6 +56,7 @@ double replication_synchro_timeout = 5.0; /* seconds */
double replication_sync_timeout = 300.0; /* seconds */
bool replication_skip_conflict = false;
bool replication_anon = false;
+int replication_num_applier_threads = 1;
struct replicaset replicaset;
@@ -84,7 +85,7 @@ replicaset_quorum(void)
}
void
-replication_init(void)
+replication_init(int num_threads)
{
memset(&replicaset, 0, sizeof(replicaset));
replica_hash_new(&replicaset.hash);
@@ -101,6 +102,8 @@ replication_init(void)
rlist_create(&replicaset.on_ack);
diag_create(&replicaset.applier.diag);
+
+ replication_num_applier_threads = num_threads;
}
void
diff --git a/src/box/replication.h b/src/box/replication.h
index 95563e811..881087ca6 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -155,6 +155,11 @@ extern bool replication_skip_conflict;
*/
extern bool replication_anon;
+/**
+ * Whether this replica will receive replication stream in a separate thread.
+ */
+extern int replication_num_applier_threads;
+
/**
* Wait for the given period of time before trying to reconnect
* to a master.
@@ -176,7 +181,7 @@ replication_disconnect_timeout(void)
}
void
-replication_init(void);
+replication_init(int num_threads);
void
replication_free(void);
--
2.30.1 (Apple Git-130)
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input Serge Petrenko via Tarantool-patches
@ 2021-12-06 3:05 ` Serge Petrenko via Tarantool-patches
0 siblings, 0 replies; 8+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-12-06 3:05 UTC (permalink / raw)
To: v.shpilevoy, vdavydov; +Cc: tarantool-patches
Here's the corresponding change in libsmall:
Subject: [PATCH] ibuf: introduce xpos field
ibuf->xpos is the pointer to the not-yet-parsed input, so that
rpos <= xpos <= wpos. This is useful when the user processes input in
multiple stages, and needs the input data for a longer period of time than
just "until the data is read".
---
https://github.com/tarantool/small/tree/sp/ibuf-xpos
include/small/ibuf.h | 10 +++++++++-
small/ibuf.c | 4 +++-
2 files changed, 12 insertions(+), 2 deletions(-)
diff --git a/include/small/ibuf.h b/include/small/ibuf.h
index 00d7781..a6646b5 100644
--- a/include/small/ibuf.h
+++ b/include/small/ibuf.h
@@ -61,6 +61,8 @@ struct ibuf
char *buf;
/** Start of input. */
char *rpos;
+ /** End of parsed input. */
+ char *xpos;
/** End of useful input */
char *wpos;
/** End of buffer. */
@@ -100,6 +102,12 @@ ibuf_capacity(struct ibuf *ibuf)
return ibuf->end - ibuf->buf;
}
+static inline size_t
+ibuf_unparsed(struct ibuf *ibuf)
+{
+ return ibuf->wpos - ibuf->xpos;
+}
+
/**
* Integer value of the position in the buffer - stable
* in case of realloc.
@@ -115,7 +123,7 @@ ibuf_pos(struct ibuf *ibuf)
static inline void
ibuf_reset(struct ibuf *ibuf)
{
- ibuf->rpos = ibuf->wpos = ibuf->buf;
+ ibuf->rpos = ibuf->xpos = ibuf->wpos = ibuf->buf;
}
void *
diff --git a/small/ibuf.c b/small/ibuf.c
index fe3f441..85345f4 100644
--- a/small/ibuf.c
+++ b/small/ibuf.c
@@ -37,7 +37,7 @@ void
ibuf_create(struct ibuf *ibuf, struct slab_cache *slabc, size_t
start_capacity)
{
ibuf->slabc = slabc;
- ibuf->buf = ibuf->rpos = ibuf->wpos = ibuf->end = NULL;
+ ibuf->buf = ibuf->rpos = ibuf->xpos = ibuf->wpos = ibuf->end = NULL;
ibuf->start_capacity = start_capacity;
/* Don't allocate the buffer yet. */
}
@@ -99,7 +99,9 @@ ibuf_reserve_slow(struct ibuf *ibuf, size_t size)
ibuf->buf = ptr;
ibuf->end = ibuf->buf + slab_capacity(slab);
}
+ size_t xsize = ibuf->xpos - ibuf->rpos;
ibuf->rpos = ibuf->buf;
+ ibuf->xpos = ibuf->rpos + xsize;
ibuf->wpos = ibuf->rpos + used;
return ibuf->wpos;
}
--
2.30.1 (Apple Git-130)
--
Serge Petrenko
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH 0/4] replication: introduce applier thread
2021-12-06 3:03 [Tarantool-patches] [PATCH 0/4] replication: introduce applier thread Serge Petrenko via Tarantool-patches
` (3 preceding siblings ...)
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 4/4] Introduce applier thread Serge Petrenko via Tarantool-patches
@ 2021-12-06 9:59 ` Vladimir Davydov via Tarantool-patches
2021-12-06 10:31 ` Serge Petrenko via Tarantool-patches
4 siblings, 1 reply; 8+ messages in thread
From: Vladimir Davydov via Tarantool-patches @ 2021-12-06 9:59 UTC (permalink / raw)
To: Serge Petrenko; +Cc: v.shpilevoy, tarantool-patches
Please create a pull request.
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH 0/4] replication: introduce applier thread
2021-12-06 9:59 ` [Tarantool-patches] [PATCH 0/4] replication: introduce " Vladimir Davydov via Tarantool-patches
@ 2021-12-06 10:31 ` Serge Petrenko via Tarantool-patches
0 siblings, 0 replies; 8+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-12-06 10:31 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: v.shpilevoy, tarantool-patches
06.12.2021 12:59, Vladimir Davydov пишет:
> Please create a pull request.
Ok, here;
https://github.com/tarantool/tarantool/pull/6676
--
Serge Petrenko
^ permalink raw reply [flat|nested] 8+ messages in thread
end of thread, other threads:[~2021-12-06 10:31 UTC | newest]
Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-12-06 3:03 [Tarantool-patches] [PATCH 0/4] replication: introduce applier thread Serge Petrenko via Tarantool-patches
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 1/4] xrow: rework coio_read_xrow to keep parsed input Serge Petrenko via Tarantool-patches
2021-12-06 3:05 ` Serge Petrenko via Tarantool-patches
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 2/4] applier: reuse input buffer to store row bodies Serge Petrenko via Tarantool-patches
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 3/4] applier: factor replication stream processing out of subscribe() Serge Petrenko via Tarantool-patches
2021-12-06 3:03 ` [Tarantool-patches] [PATCH 4/4] Introduce applier thread Serge Petrenko via Tarantool-patches
2021-12-06 9:59 ` [Tarantool-patches] [PATCH 0/4] replication: introduce " Vladimir Davydov via Tarantool-patches
2021-12-06 10:31 ` Serge Petrenko via Tarantool-patches
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox