From: Georgy Kirichenko <georgy@tarantool.org> To: tarantool-patches@freelists.org Cc: Georgy Kirichenko <georgy@tarantool.org> Subject: [tarantool-patches] [PATCH] Collect relaying rows into a buffer before send Date: Fri, 4 Jan 2019 12:07:45 +0300 [thread overview] Message-ID: <371c4be4a4d328a04cc052d6bff5298d9cd4ed9c.1546591880.git.georgy@tarantool.org> (raw) Collect rows to send into a buffer and send it with one call. The buffer is flushing when it's size reaches defined threshold or there is no more rows to send right now. Closes #1025 --- Issue: https://github.com/tarantool/tarantool/issues/1025 Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-1025-relay-buffering src/box/relay.cc | 60 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/src/box/relay.cc b/src/box/relay.cc index a01c2a2ee..d06a70931 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -54,6 +54,13 @@ #include "xstream.h" #include "wal.h" +enum { + /** + * Send relay buffer if it's size reaches the threshold. + */ + RELAY_BUFFER_SEND_THRESHOLD = 8 * 1024, +}; + /** * Cbus message to send status updates from relay to tx thread. */ @@ -139,6 +146,8 @@ struct relay { /** Known relay vclock. */ struct vclock vclock; } tx; + /** Buffer to accumulate rows before sending. */ + struct ibuf send_buf; }; struct diag* @@ -257,6 +266,17 @@ relay_set_cord_name(int fd) cord_set_name(name); } +static void +relay_flush(struct relay *relay) +{ + if (ibuf_used(&relay->send_buf) == 0) + return; + /* Send accumulated data. */ + coio_write(&relay->io, relay->send_buf.rpos, + ibuf_used(&relay->send_buf)); + ibuf_reset(&relay->send_buf); +} + void relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) { @@ -279,12 +299,16 @@ relay_final_join_f(va_list ap) struct relay *relay = va_arg(ap, struct relay *); coio_enable(); relay_set_cord_name(relay->io.fd); + ibuf_create(&relay->send_buf, &cord()->slabc, + 2 * RELAY_BUFFER_SEND_THRESHOLD); /* Send all WALs until stop_vclock */ assert(relay->stream.write != NULL); recover_remaining_wals(relay->r, &relay->stream, &relay->stop_vclock, true); + relay_flush(relay); assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0); + ibuf_destroy(&relay->send_buf); return 0; } @@ -419,6 +443,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events) try { recover_remaining_wals(relay->r, &relay->stream, NULL, (events & WAL_EVENT_ROTATE) != 0); + relay_flush(relay); } catch (Exception *e) { e->log(); diag_move(diag_get(), &relay->diag); @@ -492,6 +517,8 @@ relay_subscribe_f(va_list ap) { struct relay *relay = va_arg(ap, struct relay *); struct recovery *r = relay->r; + ibuf_create(&relay->send_buf, &cord()->slabc, + 2 * RELAY_BUFFER_SEND_THRESHOLD); coio_enable(); cbus_endpoint_create(&relay->endpoint, cord_name(cord()), @@ -585,6 +612,7 @@ relay_subscribe_f(va_list ap) if (inj != NULL && inj->dparam > 0) fiber_sleep(inj->dparam); + ibuf_destroy(&relay->send_buf); return diag_is_empty(diag_get()) ? 0: -1; } @@ -628,6 +656,36 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, diag_raise(); } +static void +relay_send_buffered(struct relay *relay, struct xrow_header *packet) +{ + struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL); + while (inj != NULL && inj->bparam) { + relay_flush(relay); + fiber_sleep(0.01); + } + + packet->sync = relay->sync; + relay->last_row_tm = ev_monotonic_now(loop()); + /* Dump row to send buffer. */ + struct iovec iov[XROW_IOVMAX]; + int iovcnt = xrow_to_iovec_xc(packet, iov); + int i; + for (i = 0; i < iovcnt; ++i) { + void *p = ibuf_alloc(&relay->send_buf, iov[i].iov_len); + memcpy(p, iov[i].iov_base, iov[i].iov_len); + } + if (ibuf_used(&relay->send_buf) >= RELAY_BUFFER_SEND_THRESHOLD) + relay_flush(relay); + fiber_gc(); + + inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); + if (inj != NULL && inj->dparam > 0) { + relay_flush(relay); + fiber_sleep(inj->dparam); + } +} + static void relay_send(struct relay *relay, struct xrow_header *packet) { @@ -696,6 +754,6 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) say_warn("injected broken lsn: %lld", (long long) packet->lsn); } - relay_send(relay, packet); + relay_send_buffered(relay, packet); } } -- 2.20.1
reply other threads:[~2019-01-04 9:06 UTC|newest] Thread overview: [no followups] expand[flat|nested] mbox.gz Atom feed
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=371c4be4a4d328a04cc052d6bff5298d9cd4ed9c.1546591880.git.georgy@tarantool.org \ --to=georgy@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH] Collect relaying rows into a buffer before send' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox