From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Mon, 28 Jan 2019 12:56:47 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] [PATCH v2] Collect relaying rows into a buffer before send Message-ID: <20190128095647.3f4x6lzd5ysmit6u@esperanza> References: <1f19e5106fddd7bf350dd26b372249d3edaf58a2.1548152233.git.georgy@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <1f19e5106fddd7bf350dd26b372249d3edaf58a2.1548152233.git.georgy@tarantool.org> To: Georgy Kirichenko Cc: tarantool-patches@freelists.org List-ID: On Tue, Jan 22, 2019 at 01:25:20PM +0300, Georgy Kirichenko wrote: > Collect rows to send into a buffer and send it with one call. The buffer > is flushing when it's size reaches defined threshold or there is no more > rows to send right now. Why do you need this patch? Is it necessary for replication from memory or transaction boundaries or sync replication? Does it show any performance improvement? TBO I doubt that - the relay is running in a separate thread so CPU shouldn't be an issue. Besides a receiving end would still receive and apply rows one by one. A few minor comments are below. > > Closes #1025 > --- > Changes in v2: > - Rebased against the latest 2.1 > - Check for out of memory error > > Issue: https://github.com/tarantool/tarantool/issues/1025 > Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-1025-relay-buffering > > src/box/relay.cc | 63 +++++++++++++++++++++++++++++++++++++++++++++++- > 1 file changed, 62 insertions(+), 1 deletion(-) > > diff --git a/src/box/relay.cc b/src/box/relay.cc > index 90fced244..2bb260e06 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -54,6 +54,13 @@ > #include "xstream.h" > #include "wal.h" > > +enum { > + /** > + * Send relay buffer if it's size reaches the threshold. > + */ > + RELAY_BUFFER_SEND_THRESHOLD = 8 * 1024, Why 8 KB? An explanation should be in the comment. > +}; > + > /** > * Cbus message to send status updates from relay to tx thread. > */ > @@ -139,6 +146,8 @@ struct relay { > /** Known relay vclock. */ > struct vclock vclock; > } tx; > + /** Buffer to accumulate rows before sending. */ > + struct ibuf send_buf; Should go before 'tx' member. > }; > > struct diag* > @@ -277,6 +286,17 @@ relay_set_cord_name(int fd) > cord_set_name(name); > } > > +static void > +relay_flush(struct relay *relay) > +{ > + if (ibuf_used(&relay->send_buf) == 0) > + return; > + /* Send accumulated data. */ > + coio_write(&relay->io, relay->send_buf.rpos, > + ibuf_used(&relay->send_buf)); > + ibuf_reset(&relay->send_buf); > +} > + > void > relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) > { > @@ -301,12 +321,16 @@ relay_final_join_f(va_list ap) > > coio_enable(); > relay_set_cord_name(relay->io.fd); > + ibuf_create(&relay->send_buf, &cord()->slabc, > + 2 * RELAY_BUFFER_SEND_THRESHOLD); > > /* Send all WALs until stop_vclock */ > assert(relay->stream.write != NULL); > recover_remaining_wals(relay->r, &relay->stream, > &relay->stop_vclock, true); > + relay_flush(relay); > assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0); > + ibuf_destroy(&relay->send_buf); > return 0; > } > > @@ -453,6 +477,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events) > try { > recover_remaining_wals(relay->r, &relay->stream, NULL, > (events & WAL_EVENT_ROTATE) != 0); > + relay_flush(relay); > } catch (Exception *e) { > relay_set_error(relay, e); > fiber_cancel(fiber()); > @@ -517,6 +542,8 @@ relay_subscribe_f(va_list ap) > { > struct relay *relay = va_arg(ap, struct relay *); > struct recovery *r = relay->r; > + ibuf_create(&relay->send_buf, &cord()->slabc, > + 2 * RELAY_BUFFER_SEND_THRESHOLD); > > coio_enable(); > relay_set_cord_name(relay->io.fd); > @@ -623,6 +650,7 @@ relay_subscribe_f(va_list ap) > cbus_endpoint_destroy(&relay->endpoint, cbus_process); > > relay_exit(relay); > + ibuf_destroy(&relay->send_buf); > return -1; I don't like the code duplication (cf relay_final_join_f). I think that ibuf_destroy() (or should it rather be ibuf_reinit?) should be called from relay_exit() with a comment explaining why it should be called from the relay thread that used it. As for ibuf_create(), I think it'd be OK to move it to relay_new(). BTW why did you not implement the same buffering for the initial join procedure? > } > > @@ -665,6 +693,39 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, > diag_raise(); > } > > +static void > +relay_send_buffered(struct relay *relay, struct xrow_header *packet) > +{ > + struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL); > + while (inj != NULL && inj->bparam) { > + relay_flush(relay); > + fiber_sleep(0.01); > + } > + > + packet->sync = relay->sync; > + relay->last_row_tm = ev_monotonic_now(loop()); > + /* Dump row to send buffer. */ > + struct iovec iov[XROW_IOVMAX]; > + int iovcnt = xrow_to_iovec_xc(packet, iov); > + int i; > + for (i = 0; i < iovcnt; ++i) { > + void *p = ibuf_alloc(&relay->send_buf, iov[i].iov_len); > + if (p == NULL) > + tnt_raise(OutOfMemory, iov[i].iov_len, "region", > + "xrow"); > + memcpy(p, iov[i].iov_base, iov[i].iov_len); > + } I'd move this piece of code to xrow.c - xrow_to_ibuf or something. > + if (ibuf_used(&relay->send_buf) >= RELAY_BUFFER_SEND_THRESHOLD) > + relay_flush(relay); > + fiber_gc(); > + > + inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); > + if (inj != NULL && inj->dparam > 0) { > + relay_flush(relay); > + fiber_sleep(inj->dparam); > + } > +} > + > static void > relay_send(struct relay *relay, struct xrow_header *packet) > { > @@ -733,6 +794,6 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) > say_warn("injected broken lsn: %lld", > (long long) packet->lsn); > } > - relay_send(relay, packet); > + relay_send_buffered(relay, packet); > } > }