* [tarantool-patches] [PATCH] Collect relaying rows into a buffer before send
@ 2019-01-04 9:07 Georgy Kirichenko
0 siblings, 0 replies; only message in thread
From: Georgy Kirichenko @ 2019-01-04 9:07 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Collect rows to send into a buffer and send it with one call. The buffer
is flushing when it's size reaches defined threshold or there is no more
rows to send right now.
Closes #1025
---
Issue: https://github.com/tarantool/tarantool/issues/1025
Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-1025-relay-buffering
src/box/relay.cc | 60 +++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 59 insertions(+), 1 deletion(-)
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a01c2a2ee..d06a70931 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -54,6 +54,13 @@
#include "xstream.h"
#include "wal.h"
+enum {
+ /**
+ * Send relay buffer if it's size reaches the threshold.
+ */
+ RELAY_BUFFER_SEND_THRESHOLD = 8 * 1024,
+};
+
/**
* Cbus message to send status updates from relay to tx thread.
*/
@@ -139,6 +146,8 @@ struct relay {
/** Known relay vclock. */
struct vclock vclock;
} tx;
+ /** Buffer to accumulate rows before sending. */
+ struct ibuf send_buf;
};
struct diag*
@@ -257,6 +266,17 @@ relay_set_cord_name(int fd)
cord_set_name(name);
}
+static void
+relay_flush(struct relay *relay)
+{
+ if (ibuf_used(&relay->send_buf) == 0)
+ return;
+ /* Send accumulated data. */
+ coio_write(&relay->io, relay->send_buf.rpos,
+ ibuf_used(&relay->send_buf));
+ ibuf_reset(&relay->send_buf);
+}
+
void
relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
{
@@ -279,12 +299,16 @@ relay_final_join_f(va_list ap)
struct relay *relay = va_arg(ap, struct relay *);
coio_enable();
relay_set_cord_name(relay->io.fd);
+ ibuf_create(&relay->send_buf, &cord()->slabc,
+ 2 * RELAY_BUFFER_SEND_THRESHOLD);
/* Send all WALs until stop_vclock */
assert(relay->stream.write != NULL);
recover_remaining_wals(relay->r, &relay->stream,
&relay->stop_vclock, true);
+ relay_flush(relay);
assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0);
+ ibuf_destroy(&relay->send_buf);
return 0;
}
@@ -419,6 +443,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
try {
recover_remaining_wals(relay->r, &relay->stream, NULL,
(events & WAL_EVENT_ROTATE) != 0);
+ relay_flush(relay);
} catch (Exception *e) {
e->log();
diag_move(diag_get(), &relay->diag);
@@ -492,6 +517,8 @@ relay_subscribe_f(va_list ap)
{
struct relay *relay = va_arg(ap, struct relay *);
struct recovery *r = relay->r;
+ ibuf_create(&relay->send_buf, &cord()->slabc,
+ 2 * RELAY_BUFFER_SEND_THRESHOLD);
coio_enable();
cbus_endpoint_create(&relay->endpoint, cord_name(cord()),
@@ -585,6 +612,7 @@ relay_subscribe_f(va_list ap)
if (inj != NULL && inj->dparam > 0)
fiber_sleep(inj->dparam);
+ ibuf_destroy(&relay->send_buf);
return diag_is_empty(diag_get()) ? 0: -1;
}
@@ -628,6 +656,36 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
diag_raise();
}
+static void
+relay_send_buffered(struct relay *relay, struct xrow_header *packet)
+{
+ struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL);
+ while (inj != NULL && inj->bparam) {
+ relay_flush(relay);
+ fiber_sleep(0.01);
+ }
+
+ packet->sync = relay->sync;
+ relay->last_row_tm = ev_monotonic_now(loop());
+ /* Dump row to send buffer. */
+ struct iovec iov[XROW_IOVMAX];
+ int iovcnt = xrow_to_iovec_xc(packet, iov);
+ int i;
+ for (i = 0; i < iovcnt; ++i) {
+ void *p = ibuf_alloc(&relay->send_buf, iov[i].iov_len);
+ memcpy(p, iov[i].iov_base, iov[i].iov_len);
+ }
+ if (ibuf_used(&relay->send_buf) >= RELAY_BUFFER_SEND_THRESHOLD)
+ relay_flush(relay);
+ fiber_gc();
+
+ inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
+ if (inj != NULL && inj->dparam > 0) {
+ relay_flush(relay);
+ fiber_sleep(inj->dparam);
+ }
+}
+
static void
relay_send(struct relay *relay, struct xrow_header *packet)
{
@@ -696,6 +754,6 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
say_warn("injected broken lsn: %lld",
(long long) packet->lsn);
}
- relay_send(relay, packet);
+ relay_send_buffered(relay, packet);
}
}
--
2.20.1
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2019-01-04 9:06 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-01-04 9:07 [tarantool-patches] [PATCH] Collect relaying rows into a buffer before send Georgy Kirichenko
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox