Tarantool development patches archive
 help / color / mirror / Atom feed
From: Georgy Kirichenko <georgy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: Georgy Kirichenko <georgy@tarantool.org>
Subject: [tarantool-patches] [PATCH] Collect relaying rows into a buffer before send
Date: Fri,  4 Jan 2019 12:07:45 +0300	[thread overview]
Message-ID: <371c4be4a4d328a04cc052d6bff5298d9cd4ed9c.1546591880.git.georgy@tarantool.org> (raw)

Collect rows to send into a buffer and send it with one call. The buffer
is flushing when it's size reaches defined threshold or there is no more
rows to send right now.

Closes #1025
---
Issue: https://github.com/tarantool/tarantool/issues/1025
Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-1025-relay-buffering

 src/box/relay.cc | 60 +++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 59 insertions(+), 1 deletion(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index a01c2a2ee..d06a70931 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -54,6 +54,13 @@
 #include "xstream.h"
 #include "wal.h"
 
+enum {
+	/**
+	 * Send relay buffer if it's size reaches the threshold.
+	 */
+	RELAY_BUFFER_SEND_THRESHOLD = 8 * 1024,
+};
+
 /**
  * Cbus message to send status updates from relay to tx thread.
  */
@@ -139,6 +146,8 @@ struct relay {
 		/** Known relay vclock. */
 		struct vclock vclock;
 	} tx;
+	/** Buffer to accumulate rows before sending. */
+	struct ibuf send_buf;
 };
 
 struct diag*
@@ -257,6 +266,17 @@ relay_set_cord_name(int fd)
 	cord_set_name(name);
 }
 
+static void
+relay_flush(struct relay *relay)
+{
+	if (ibuf_used(&relay->send_buf) == 0)
+		return;
+	/* Send accumulated data. */
+	coio_write(&relay->io, relay->send_buf.rpos,
+		   ibuf_used(&relay->send_buf));
+	ibuf_reset(&relay->send_buf);
+}
+
 void
 relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
 {
@@ -279,12 +299,16 @@ relay_final_join_f(va_list ap)
 	struct relay *relay = va_arg(ap, struct relay *);
 	coio_enable();
 	relay_set_cord_name(relay->io.fd);
+	ibuf_create(&relay->send_buf, &cord()->slabc,
+		    2 * RELAY_BUFFER_SEND_THRESHOLD);
 
 	/* Send all WALs until stop_vclock */
 	assert(relay->stream.write != NULL);
 	recover_remaining_wals(relay->r, &relay->stream,
 			       &relay->stop_vclock, true);
+	relay_flush(relay);
 	assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0);
+	ibuf_destroy(&relay->send_buf);
 	return 0;
 }
 
@@ -419,6 +443,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 	try {
 		recover_remaining_wals(relay->r, &relay->stream, NULL,
 				       (events & WAL_EVENT_ROTATE) != 0);
+		relay_flush(relay);
 	} catch (Exception *e) {
 		e->log();
 		diag_move(diag_get(), &relay->diag);
@@ -492,6 +517,8 @@ relay_subscribe_f(va_list ap)
 {
 	struct relay *relay = va_arg(ap, struct relay *);
 	struct recovery *r = relay->r;
+	ibuf_create(&relay->send_buf, &cord()->slabc,
+		    2 * RELAY_BUFFER_SEND_THRESHOLD);
 
 	coio_enable();
 	cbus_endpoint_create(&relay->endpoint, cord_name(cord()),
@@ -585,6 +612,7 @@ relay_subscribe_f(va_list ap)
 	if (inj != NULL && inj->dparam > 0)
 		fiber_sleep(inj->dparam);
 
+	ibuf_destroy(&relay->send_buf);
 	return diag_is_empty(diag_get()) ? 0: -1;
 }
 
@@ -628,6 +656,36 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 		diag_raise();
 }
 
+static void
+relay_send_buffered(struct relay *relay, struct xrow_header *packet)
+{
+	struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL);
+	while (inj != NULL && inj->bparam) {
+		relay_flush(relay);
+		fiber_sleep(0.01);
+	}
+
+	packet->sync = relay->sync;
+	relay->last_row_tm = ev_monotonic_now(loop());
+	/* Dump row to send buffer. */
+	struct iovec iov[XROW_IOVMAX];
+	int iovcnt = xrow_to_iovec_xc(packet, iov);
+	int i;
+	for (i = 0; i < iovcnt; ++i) {
+		void *p = ibuf_alloc(&relay->send_buf, iov[i].iov_len);
+		memcpy(p, iov[i].iov_base, iov[i].iov_len);
+	}
+	if (ibuf_used(&relay->send_buf) >= RELAY_BUFFER_SEND_THRESHOLD)
+		relay_flush(relay);
+	fiber_gc();
+
+	inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
+	if (inj != NULL && inj->dparam > 0) {
+		relay_flush(relay);
+		fiber_sleep(inj->dparam);
+	}
+}
+
 static void
 relay_send(struct relay *relay, struct xrow_header *packet)
 {
@@ -696,6 +754,6 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 			say_warn("injected broken lsn: %lld",
 				 (long long) packet->lsn);
 		}
-		relay_send(relay, packet);
+		relay_send_buffered(relay, packet);
 	}
 }
-- 
2.20.1

                 reply	other threads:[~2019-01-04  9:06 UTC|newest]

Thread overview: [no followups] expand[flat|nested]  mbox.gz  Atom feed

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=371c4be4a4d328a04cc052d6bff5298d9cd4ed9c.1546591880.git.georgy@tarantool.org \
    --to=georgy@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [tarantool-patches] [PATCH] Collect relaying rows into a buffer before send' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox