Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH v2] Collect relaying rows into a buffer before send
@ 2019-01-22 10:25 Georgy Kirichenko
  2019-01-28  9:56 ` Vladimir Davydov
  0 siblings, 1 reply; 2+ messages in thread
From: Georgy Kirichenko @ 2019-01-22 10:25 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Collect rows to send into a buffer and send it with one call. The buffer
is flushing when it's size reaches defined threshold or there is no more
rows to send right now.

Closes #1025
---
Changes in v2:
 - Rebased against the latest 2.1
 - Check for out of memory error

Issue: https://github.com/tarantool/tarantool/issues/1025
Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-1025-relay-buffering

src/box/relay.cc | 63 +++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 62 insertions(+), 1 deletion(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 90fced244..2bb260e06 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -54,6 +54,13 @@
 #include "xstream.h"
 #include "wal.h"
 
+enum {
+	/**
+	 * Send relay buffer if it's size reaches the threshold.
+	 */
+	RELAY_BUFFER_SEND_THRESHOLD = 8 * 1024,
+};
+
 /**
  * Cbus message to send status updates from relay to tx thread.
  */
@@ -139,6 +146,8 @@ struct relay {
 		/** Known relay vclock. */
 		struct vclock vclock;
 	} tx;
+	/** Buffer to accumulate rows before sending. */
+	struct ibuf send_buf;
 };
 
 struct diag*
@@ -277,6 +286,17 @@ relay_set_cord_name(int fd)
 	cord_set_name(name);
 }
 
+static void
+relay_flush(struct relay *relay)
+{
+	if (ibuf_used(&relay->send_buf) == 0)
+		return;
+	/* Send accumulated data. */
+	coio_write(&relay->io, relay->send_buf.rpos,
+		   ibuf_used(&relay->send_buf));
+	ibuf_reset(&relay->send_buf);
+}
+
 void
 relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
 {
@@ -301,12 +321,16 @@ relay_final_join_f(va_list ap)
 
 	coio_enable();
 	relay_set_cord_name(relay->io.fd);
+	ibuf_create(&relay->send_buf, &cord()->slabc,
+		    2 * RELAY_BUFFER_SEND_THRESHOLD);
 
 	/* Send all WALs until stop_vclock */
 	assert(relay->stream.write != NULL);
 	recover_remaining_wals(relay->r, &relay->stream,
 			       &relay->stop_vclock, true);
+	relay_flush(relay);
 	assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0);
+	ibuf_destroy(&relay->send_buf);
 	return 0;
 }
 
@@ -453,6 +477,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
 	try {
 		recover_remaining_wals(relay->r, &relay->stream, NULL,
 				       (events & WAL_EVENT_ROTATE) != 0);
+		relay_flush(relay);
 	} catch (Exception *e) {
 		relay_set_error(relay, e);
 		fiber_cancel(fiber());
@@ -517,6 +542,8 @@ relay_subscribe_f(va_list ap)
 {
 	struct relay *relay = va_arg(ap, struct relay *);
 	struct recovery *r = relay->r;
+	ibuf_create(&relay->send_buf, &cord()->slabc,
+		    2 * RELAY_BUFFER_SEND_THRESHOLD);
 
 	coio_enable();
 	relay_set_cord_name(relay->io.fd);
@@ -623,6 +650,7 @@ relay_subscribe_f(va_list ap)
 	cbus_endpoint_destroy(&relay->endpoint, cbus_process);
 
 	relay_exit(relay);
+	ibuf_destroy(&relay->send_buf);
 	return -1;
 }
 
@@ -665,6 +693,39 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 		diag_raise();
 }
 
+static void
+relay_send_buffered(struct relay *relay, struct xrow_header *packet)
+{
+	struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL);
+	while (inj != NULL && inj->bparam) {
+		relay_flush(relay);
+		fiber_sleep(0.01);
+	}
+
+	packet->sync = relay->sync;
+	relay->last_row_tm = ev_monotonic_now(loop());
+	/* Dump row to send buffer. */
+	struct iovec iov[XROW_IOVMAX];
+	int iovcnt = xrow_to_iovec_xc(packet, iov);
+	int i;
+	for (i = 0; i < iovcnt; ++i) {
+		void *p = ibuf_alloc(&relay->send_buf, iov[i].iov_len);
+		if (p == NULL)
+			tnt_raise(OutOfMemory, iov[i].iov_len, "region",
+				  "xrow");
+		memcpy(p, iov[i].iov_base, iov[i].iov_len);
+	}
+	if (ibuf_used(&relay->send_buf) >= RELAY_BUFFER_SEND_THRESHOLD)
+		relay_flush(relay);
+	fiber_gc();
+
+	inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
+	if (inj != NULL && inj->dparam > 0) {
+		relay_flush(relay);
+		fiber_sleep(inj->dparam);
+	}
+}
+
 static void
 relay_send(struct relay *relay, struct xrow_header *packet)
 {
@@ -733,6 +794,6 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 			say_warn("injected broken lsn: %lld",
 				 (long long) packet->lsn);
 		}
-		relay_send(relay, packet);
+		relay_send_buffered(relay, packet);
 	}
 }
-- 
2.20.1

^ permalink raw reply	[flat|nested] 2+ messages in thread

* Re: [tarantool-patches] [PATCH v2] Collect relaying rows into a buffer before send
  2019-01-22 10:25 [tarantool-patches] [PATCH v2] Collect relaying rows into a buffer before send Georgy Kirichenko
@ 2019-01-28  9:56 ` Vladimir Davydov
  0 siblings, 0 replies; 2+ messages in thread
From: Vladimir Davydov @ 2019-01-28  9:56 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Jan 22, 2019 at 01:25:20PM +0300, Georgy Kirichenko wrote:
> Collect rows to send into a buffer and send it with one call. The buffer
> is flushing when it's size reaches defined threshold or there is no more
> rows to send right now.

Why do you need this patch? Is it necessary for replication from memory
or transaction boundaries or sync replication?

Does it show any performance improvement? TBO I doubt that - the relay
is running in a separate thread so CPU shouldn't be an issue. Besides
a receiving end would still receive and apply rows one by one.

A few minor comments are below.

> 
> Closes #1025
> ---
> Changes in v2:
>  - Rebased against the latest 2.1
>  - Check for out of memory error
> 
> Issue: https://github.com/tarantool/tarantool/issues/1025
> Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-1025-relay-buffering
> 
> src/box/relay.cc | 63 +++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 62 insertions(+), 1 deletion(-)
> 
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 90fced244..2bb260e06 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -54,6 +54,13 @@
>  #include "xstream.h"
>  #include "wal.h"
>  
> +enum {
> +	/**
> +	 * Send relay buffer if it's size reaches the threshold.
> +	 */
> +	RELAY_BUFFER_SEND_THRESHOLD = 8 * 1024,

Why 8 KB? An explanation should be in the comment.

> +};
> +
>  /**
>   * Cbus message to send status updates from relay to tx thread.
>   */
> @@ -139,6 +146,8 @@ struct relay {
>  		/** Known relay vclock. */
>  		struct vclock vclock;
>  	} tx;
> +	/** Buffer to accumulate rows before sending. */
> +	struct ibuf send_buf;

Should go before 'tx' member.

>  };
>  
>  struct diag*
> @@ -277,6 +286,17 @@ relay_set_cord_name(int fd)
>  	cord_set_name(name);
>  }
>  
> +static void
> +relay_flush(struct relay *relay)
> +{
> +	if (ibuf_used(&relay->send_buf) == 0)
> +		return;
> +	/* Send accumulated data. */
> +	coio_write(&relay->io, relay->send_buf.rpos,
> +		   ibuf_used(&relay->send_buf));
> +	ibuf_reset(&relay->send_buf);
> +}
> +
>  void
>  relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
>  {
> @@ -301,12 +321,16 @@ relay_final_join_f(va_list ap)
>  
>  	coio_enable();
>  	relay_set_cord_name(relay->io.fd);
> +	ibuf_create(&relay->send_buf, &cord()->slabc,
> +		    2 * RELAY_BUFFER_SEND_THRESHOLD);
>  
>  	/* Send all WALs until stop_vclock */
>  	assert(relay->stream.write != NULL);
>  	recover_remaining_wals(relay->r, &relay->stream,
>  			       &relay->stop_vclock, true);
> +	relay_flush(relay);
>  	assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0);
> +	ibuf_destroy(&relay->send_buf);
>  	return 0;
>  }
>  
> @@ -453,6 +477,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
>  	try {
>  		recover_remaining_wals(relay->r, &relay->stream, NULL,
>  				       (events & WAL_EVENT_ROTATE) != 0);
> +		relay_flush(relay);
>  	} catch (Exception *e) {
>  		relay_set_error(relay, e);
>  		fiber_cancel(fiber());
> @@ -517,6 +542,8 @@ relay_subscribe_f(va_list ap)
>  {
>  	struct relay *relay = va_arg(ap, struct relay *);
>  	struct recovery *r = relay->r;
> +	ibuf_create(&relay->send_buf, &cord()->slabc,
> +		    2 * RELAY_BUFFER_SEND_THRESHOLD);
>  
>  	coio_enable();
>  	relay_set_cord_name(relay->io.fd);
> @@ -623,6 +650,7 @@ relay_subscribe_f(va_list ap)
>  	cbus_endpoint_destroy(&relay->endpoint, cbus_process);
>  
>  	relay_exit(relay);
> +	ibuf_destroy(&relay->send_buf);
>  	return -1;

I don't like the code duplication (cf relay_final_join_f). I think that
ibuf_destroy() (or should it rather be ibuf_reinit?) should be called
from relay_exit() with a comment explaining why it should be called from
the relay thread that used it. As for ibuf_create(), I think it'd be OK
to move it to relay_new().

BTW why did you not implement the same buffering for the initial join
procedure?

>  }
>  
> @@ -665,6 +693,39 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
>  		diag_raise();
>  }
>  
> +static void
> +relay_send_buffered(struct relay *relay, struct xrow_header *packet)
> +{
> +	struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL);
> +	while (inj != NULL && inj->bparam) {
> +		relay_flush(relay);
> +		fiber_sleep(0.01);
> +	}
> +
> +	packet->sync = relay->sync;
> +	relay->last_row_tm = ev_monotonic_now(loop());

> +	/* Dump row to send buffer. */
> +	struct iovec iov[XROW_IOVMAX];
> +	int iovcnt = xrow_to_iovec_xc(packet, iov);
> +	int i;
> +	for (i = 0; i < iovcnt; ++i) {
> +		void *p = ibuf_alloc(&relay->send_buf, iov[i].iov_len);
> +		if (p == NULL)
> +			tnt_raise(OutOfMemory, iov[i].iov_len, "region",
> +				  "xrow");
> +		memcpy(p, iov[i].iov_base, iov[i].iov_len);
> +	}

I'd move this piece of code to xrow.c - xrow_to_ibuf or something.

> +	if (ibuf_used(&relay->send_buf) >= RELAY_BUFFER_SEND_THRESHOLD)
> +		relay_flush(relay);
> +	fiber_gc();
> +
> +	inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
> +	if (inj != NULL && inj->dparam > 0) {
> +		relay_flush(relay);
> +		fiber_sleep(inj->dparam);
> +	}
> +}
> +
>  static void
>  relay_send(struct relay *relay, struct xrow_header *packet)
>  {
> @@ -733,6 +794,6 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
>  			say_warn("injected broken lsn: %lld",
>  				 (long long) packet->lsn);
>  		}
> -		relay_send(relay, packet);
> +		relay_send_buffered(relay, packet);
>  	}
>  }

^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2019-01-28  9:56 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-01-22 10:25 [tarantool-patches] [PATCH v2] Collect relaying rows into a buffer before send Georgy Kirichenko
2019-01-28  9:56 ` Vladimir Davydov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox