[tarantool-patches] [PATCH v2] Collect relaying rows into a buffer before send
Georgy Kirichenko
georgy at tarantool.org
Tue Jan 22 13:25:20 MSK 2019
Collect rows to send into a buffer and send it with one call. The buffer
is flushing when it's size reaches defined threshold or there is no more
rows to send right now.
Closes #1025
---
Changes in v2:
- Rebased against the latest 2.1
- Check for out of memory error
Issue: https://github.com/tarantool/tarantool/issues/1025
Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-1025-relay-buffering
src/box/relay.cc | 63 +++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 62 insertions(+), 1 deletion(-)
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 90fced244..2bb260e06 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -54,6 +54,13 @@
#include "xstream.h"
#include "wal.h"
+enum {
+ /**
+ * Send relay buffer if it's size reaches the threshold.
+ */
+ RELAY_BUFFER_SEND_THRESHOLD = 8 * 1024,
+};
+
/**
* Cbus message to send status updates from relay to tx thread.
*/
@@ -139,6 +146,8 @@ struct relay {
/** Known relay vclock. */
struct vclock vclock;
} tx;
+ /** Buffer to accumulate rows before sending. */
+ struct ibuf send_buf;
};
struct diag*
@@ -277,6 +286,17 @@ relay_set_cord_name(int fd)
cord_set_name(name);
}
+static void
+relay_flush(struct relay *relay)
+{
+ if (ibuf_used(&relay->send_buf) == 0)
+ return;
+ /* Send accumulated data. */
+ coio_write(&relay->io, relay->send_buf.rpos,
+ ibuf_used(&relay->send_buf));
+ ibuf_reset(&relay->send_buf);
+}
+
void
relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
{
@@ -301,12 +321,16 @@ relay_final_join_f(va_list ap)
coio_enable();
relay_set_cord_name(relay->io.fd);
+ ibuf_create(&relay->send_buf, &cord()->slabc,
+ 2 * RELAY_BUFFER_SEND_THRESHOLD);
/* Send all WALs until stop_vclock */
assert(relay->stream.write != NULL);
recover_remaining_wals(relay->r, &relay->stream,
&relay->stop_vclock, true);
+ relay_flush(relay);
assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0);
+ ibuf_destroy(&relay->send_buf);
return 0;
}
@@ -453,6 +477,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
try {
recover_remaining_wals(relay->r, &relay->stream, NULL,
(events & WAL_EVENT_ROTATE) != 0);
+ relay_flush(relay);
} catch (Exception *e) {
relay_set_error(relay, e);
fiber_cancel(fiber());
@@ -517,6 +542,8 @@ relay_subscribe_f(va_list ap)
{
struct relay *relay = va_arg(ap, struct relay *);
struct recovery *r = relay->r;
+ ibuf_create(&relay->send_buf, &cord()->slabc,
+ 2 * RELAY_BUFFER_SEND_THRESHOLD);
coio_enable();
relay_set_cord_name(relay->io.fd);
@@ -623,6 +650,7 @@ relay_subscribe_f(va_list ap)
cbus_endpoint_destroy(&relay->endpoint, cbus_process);
relay_exit(relay);
+ ibuf_destroy(&relay->send_buf);
return -1;
}
@@ -665,6 +693,39 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
diag_raise();
}
+static void
+relay_send_buffered(struct relay *relay, struct xrow_header *packet)
+{
+ struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL);
+ while (inj != NULL && inj->bparam) {
+ relay_flush(relay);
+ fiber_sleep(0.01);
+ }
+
+ packet->sync = relay->sync;
+ relay->last_row_tm = ev_monotonic_now(loop());
+ /* Dump row to send buffer. */
+ struct iovec iov[XROW_IOVMAX];
+ int iovcnt = xrow_to_iovec_xc(packet, iov);
+ int i;
+ for (i = 0; i < iovcnt; ++i) {
+ void *p = ibuf_alloc(&relay->send_buf, iov[i].iov_len);
+ if (p == NULL)
+ tnt_raise(OutOfMemory, iov[i].iov_len, "region",
+ "xrow");
+ memcpy(p, iov[i].iov_base, iov[i].iov_len);
+ }
+ if (ibuf_used(&relay->send_buf) >= RELAY_BUFFER_SEND_THRESHOLD)
+ relay_flush(relay);
+ fiber_gc();
+
+ inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
+ if (inj != NULL && inj->dparam > 0) {
+ relay_flush(relay);
+ fiber_sleep(inj->dparam);
+ }
+}
+
static void
relay_send(struct relay *relay, struct xrow_header *packet)
{
@@ -733,6 +794,6 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
say_warn("injected broken lsn: %lld",
(long long) packet->lsn);
}
- relay_send(relay, packet);
+ relay_send_buffered(relay, packet);
}
}
--
2.20.1
More information about the Tarantool-patches
mailing list