Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladimir Davydov <vdavydov.dev@gmail.com>
To: Georgy Kirichenko <georgy@tarantool.org>
Cc: tarantool-patches@freelists.org
Subject: Re: [tarantool-patches] [PATCH v2] Collect relaying rows into a buffer before send
Date: Mon, 28 Jan 2019 12:56:47 +0300	[thread overview]
Message-ID: <20190128095647.3f4x6lzd5ysmit6u@esperanza> (raw)
In-Reply-To: <1f19e5106fddd7bf350dd26b372249d3edaf58a2.1548152233.git.georgy@tarantool.org>

On Tue, Jan 22, 2019 at 01:25:20PM +0300, Georgy Kirichenko wrote:
> Collect rows to send into a buffer and send it with one call. The buffer
> is flushing when it's size reaches defined threshold or there is no more
> rows to send right now.

Why do you need this patch? Is it necessary for replication from memory
or transaction boundaries or sync replication?

Does it show any performance improvement? TBO I doubt that - the relay
is running in a separate thread so CPU shouldn't be an issue. Besides
a receiving end would still receive and apply rows one by one.

A few minor comments are below.

> 
> Closes #1025
> ---
> Changes in v2:
>  - Rebased against the latest 2.1
>  - Check for out of memory error
> 
> Issue: https://github.com/tarantool/tarantool/issues/1025
> Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-1025-relay-buffering
> 
> src/box/relay.cc | 63 +++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 62 insertions(+), 1 deletion(-)
> 
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 90fced244..2bb260e06 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -54,6 +54,13 @@
>  #include "xstream.h"
>  #include "wal.h"
>  
> +enum {
> +	/**
> +	 * Send relay buffer if it's size reaches the threshold.
> +	 */
> +	RELAY_BUFFER_SEND_THRESHOLD = 8 * 1024,

Why 8 KB? An explanation should be in the comment.

> +};
> +
>  /**
>   * Cbus message to send status updates from relay to tx thread.
>   */
> @@ -139,6 +146,8 @@ struct relay {
>  		/** Known relay vclock. */
>  		struct vclock vclock;
>  	} tx;
> +	/** Buffer to accumulate rows before sending. */
> +	struct ibuf send_buf;

Should go before 'tx' member.

>  };
>  
>  struct diag*
> @@ -277,6 +286,17 @@ relay_set_cord_name(int fd)
>  	cord_set_name(name);
>  }
>  
> +static void
> +relay_flush(struct relay *relay)
> +{
> +	if (ibuf_used(&relay->send_buf) == 0)
> +		return;
> +	/* Send accumulated data. */
> +	coio_write(&relay->io, relay->send_buf.rpos,
> +		   ibuf_used(&relay->send_buf));
> +	ibuf_reset(&relay->send_buf);
> +}
> +
>  void
>  relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
>  {
> @@ -301,12 +321,16 @@ relay_final_join_f(va_list ap)
>  
>  	coio_enable();
>  	relay_set_cord_name(relay->io.fd);
> +	ibuf_create(&relay->send_buf, &cord()->slabc,
> +		    2 * RELAY_BUFFER_SEND_THRESHOLD);
>  
>  	/* Send all WALs until stop_vclock */
>  	assert(relay->stream.write != NULL);
>  	recover_remaining_wals(relay->r, &relay->stream,
>  			       &relay->stop_vclock, true);
> +	relay_flush(relay);
>  	assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0);
> +	ibuf_destroy(&relay->send_buf);
>  	return 0;
>  }
>  
> @@ -453,6 +477,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
>  	try {
>  		recover_remaining_wals(relay->r, &relay->stream, NULL,
>  				       (events & WAL_EVENT_ROTATE) != 0);
> +		relay_flush(relay);
>  	} catch (Exception *e) {
>  		relay_set_error(relay, e);
>  		fiber_cancel(fiber());
> @@ -517,6 +542,8 @@ relay_subscribe_f(va_list ap)
>  {
>  	struct relay *relay = va_arg(ap, struct relay *);
>  	struct recovery *r = relay->r;
> +	ibuf_create(&relay->send_buf, &cord()->slabc,
> +		    2 * RELAY_BUFFER_SEND_THRESHOLD);
>  
>  	coio_enable();
>  	relay_set_cord_name(relay->io.fd);
> @@ -623,6 +650,7 @@ relay_subscribe_f(va_list ap)
>  	cbus_endpoint_destroy(&relay->endpoint, cbus_process);
>  
>  	relay_exit(relay);
> +	ibuf_destroy(&relay->send_buf);
>  	return -1;

I don't like the code duplication (cf relay_final_join_f). I think that
ibuf_destroy() (or should it rather be ibuf_reinit?) should be called
from relay_exit() with a comment explaining why it should be called from
the relay thread that used it. As for ibuf_create(), I think it'd be OK
to move it to relay_new().

BTW why did you not implement the same buffering for the initial join
procedure?

>  }
>  
> @@ -665,6 +693,39 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
>  		diag_raise();
>  }
>  
> +static void
> +relay_send_buffered(struct relay *relay, struct xrow_header *packet)
> +{
> +	struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL);
> +	while (inj != NULL && inj->bparam) {
> +		relay_flush(relay);
> +		fiber_sleep(0.01);
> +	}
> +
> +	packet->sync = relay->sync;
> +	relay->last_row_tm = ev_monotonic_now(loop());

> +	/* Dump row to send buffer. */
> +	struct iovec iov[XROW_IOVMAX];
> +	int iovcnt = xrow_to_iovec_xc(packet, iov);
> +	int i;
> +	for (i = 0; i < iovcnt; ++i) {
> +		void *p = ibuf_alloc(&relay->send_buf, iov[i].iov_len);
> +		if (p == NULL)
> +			tnt_raise(OutOfMemory, iov[i].iov_len, "region",
> +				  "xrow");
> +		memcpy(p, iov[i].iov_base, iov[i].iov_len);
> +	}

I'd move this piece of code to xrow.c - xrow_to_ibuf or something.

> +	if (ibuf_used(&relay->send_buf) >= RELAY_BUFFER_SEND_THRESHOLD)
> +		relay_flush(relay);
> +	fiber_gc();
> +
> +	inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
> +	if (inj != NULL && inj->dparam > 0) {
> +		relay_flush(relay);
> +		fiber_sleep(inj->dparam);
> +	}
> +}
> +
>  static void
>  relay_send(struct relay *relay, struct xrow_header *packet)
>  {
> @@ -733,6 +794,6 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
>  			say_warn("injected broken lsn: %lld",
>  				 (long long) packet->lsn);
>  		}
> -		relay_send(relay, packet);
> +		relay_send_buffered(relay, packet);
>  	}
>  }

      reply	other threads:[~2019-01-28  9:56 UTC|newest]

Thread overview: 2+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-01-22 10:25 Georgy Kirichenko
2019-01-28  9:56 ` Vladimir Davydov [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20190128095647.3f4x6lzd5ysmit6u@esperanza \
    --to=vdavydov.dev@gmail.com \
    --cc=georgy@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [tarantool-patches] [PATCH v2] Collect relaying rows into a buffer before send' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox