[tarantool-patches] [PATCH v2] Collect relaying rows into a buffer before send

Georgy Kirichenko georgy at tarantool.org
Tue Jan 22 13:25:20 MSK 2019


Collect rows to send into a buffer and send it with one call. The buffer
is flushing when it's size reaches defined threshold or there is no more
rows to send right now.

Closes #1025
---
Changes in v2:
 - Rebased against the latest 2.1
 - Check for out of memory error

Issue: https://github.com/tarantool/tarantool/issues/1025
Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-1025-relay-buffering

src/box/relay.cc | 63 +++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 62 insertions(+), 1 deletion(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 90fced244..2bb260e06 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -54,6 +54,13 @@
 #include "xstream.h"
 #include "wal.h"
 
+enum {
+	/**
+	 * Send relay buffer if it's size reaches the threshold.
+	 */
+	RELAY_BUFFER_SEND_THRESHOLD = 8 * 1024,
+};
+
 /**
  * Cbus message to send status updates from relay to tx thread.
  */
@@ -139,6 +146,8 @@ struct relay {
 		/** Known relay vclock. */
 		struct vclock vclock;
 	} tx;
+	/** Buffer to accumulate rows before sending. */
+	struct ibuf send_buf;
 };
 
 struct diag*
@@ -277,6 +286,17 @@ relay_set_cord_name(int fd)
 	cord_set_name(name);
 }
 
+static void
+relay_flush(struct relay *relay)
+{
+	if (ibuf_used(&relay->send_buf) == 0)
+		return;
+	/* Send accumulated data. */
+	coio_write(&relay->io, relay->send_buf.rpos,
+		   ibuf_used(&relay->send_buf));
+	ibuf_reset(&relay->send_buf);
+}
+
 void
 relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
 {
@@ -301,12 +321,16 @@ relay_final_join_f(va_list ap)
 
 	coio_enable();
 	relay_set_cord_name(relay->io.fd);
+	ibuf_create(&relay->send_buf, &cord()->slabc,
+		    2 * RELAY_BUFFER_SEND_THRESHOLD);
 
 	/* Send all WALs until stop_vclock */
 	assert(relay->stream.write != NULL);
 	recover_remaining_wals(relay->r, &relay->stream,
 			       &relay->stop_vclock, true);
+	relay_flush(relay);
 	assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0);
+	ibuf_destroy(&relay->send_buf);
 	return 0;
 }
 
@@ -453,6 +477,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 	try {
 		recover_remaining_wals(relay->r, &relay->stream, NULL,
 				       (events & WAL_EVENT_ROTATE) != 0);
+		relay_flush(relay);
 	} catch (Exception *e) {
 		relay_set_error(relay, e);
 		fiber_cancel(fiber());
@@ -517,6 +542,8 @@ relay_subscribe_f(va_list ap)
 {
 	struct relay *relay = va_arg(ap, struct relay *);
 	struct recovery *r = relay->r;
+	ibuf_create(&relay->send_buf, &cord()->slabc,
+		    2 * RELAY_BUFFER_SEND_THRESHOLD);
 
 	coio_enable();
 	relay_set_cord_name(relay->io.fd);
@@ -623,6 +650,7 @@ relay_subscribe_f(va_list ap)
 	cbus_endpoint_destroy(&relay->endpoint, cbus_process);
 
 	relay_exit(relay);
+	ibuf_destroy(&relay->send_buf);
 	return -1;
 }
 
@@ -665,6 +693,39 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 		diag_raise();
 }
 
+static void
+relay_send_buffered(struct relay *relay, struct xrow_header *packet)
+{
+	struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL);
+	while (inj != NULL && inj->bparam) {
+		relay_flush(relay);
+		fiber_sleep(0.01);
+	}
+
+	packet->sync = relay->sync;
+	relay->last_row_tm = ev_monotonic_now(loop());
+	/* Dump row to send buffer. */
+	struct iovec iov[XROW_IOVMAX];
+	int iovcnt = xrow_to_iovec_xc(packet, iov);
+	int i;
+	for (i = 0; i < iovcnt; ++i) {
+		void *p = ibuf_alloc(&relay->send_buf, iov[i].iov_len);
+		if (p == NULL)
+			tnt_raise(OutOfMemory, iov[i].iov_len, "region",
+				  "xrow");
+		memcpy(p, iov[i].iov_base, iov[i].iov_len);
+	}
+	if (ibuf_used(&relay->send_buf) >= RELAY_BUFFER_SEND_THRESHOLD)
+		relay_flush(relay);
+	fiber_gc();
+
+	inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
+	if (inj != NULL && inj->dparam > 0) {
+		relay_flush(relay);
+		fiber_sleep(inj->dparam);
+	}
+}
+
 static void
 relay_send(struct relay *relay, struct xrow_header *packet)
 {
@@ -733,6 +794,6 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 			say_warn("injected broken lsn: %lld",
 				 (long long) packet->lsn);
 		}
-		relay_send(relay, packet);
+		relay_send_buffered(relay, packet);
 	}
 }
-- 
2.20.1





More information about the Tarantool-patches mailing list