From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 0E0F625F15 for ; Tue, 22 Jan 2019 05:23:35 -0500 (EST) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id aIKohr3fT-4M for ; Tue, 22 Jan 2019 05:23:34 -0500 (EST) Received: from smtp53.i.mail.ru (smtp53.i.mail.ru [94.100.177.113]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 6BF6725F13 for ; Tue, 22 Jan 2019 05:23:34 -0500 (EST) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v2] Collect relaying rows into a buffer before send Date: Tue, 22 Jan 2019 13:25:20 +0300 Message-Id: <1f19e5106fddd7bf350dd26b372249d3edaf58a2.1548152233.git.georgy@tarantool.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Collect rows to send into a buffer and send it with one call. The buffer is flushing when it's size reaches defined threshold or there is no more rows to send right now. Closes #1025 --- Changes in v2: - Rebased against the latest 2.1 - Check for out of memory error Issue: https://github.com/tarantool/tarantool/issues/1025 Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-1025-relay-buffering src/box/relay.cc | 63 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/src/box/relay.cc b/src/box/relay.cc index 90fced244..2bb260e06 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -54,6 +54,13 @@ #include "xstream.h" #include "wal.h" +enum { + /** + * Send relay buffer if it's size reaches the threshold. + */ + RELAY_BUFFER_SEND_THRESHOLD = 8 * 1024, +}; + /** * Cbus message to send status updates from relay to tx thread. */ @@ -139,6 +146,8 @@ struct relay { /** Known relay vclock. */ struct vclock vclock; } tx; + /** Buffer to accumulate rows before sending. */ + struct ibuf send_buf; }; struct diag* @@ -277,6 +286,17 @@ relay_set_cord_name(int fd) cord_set_name(name); } +static void +relay_flush(struct relay *relay) +{ + if (ibuf_used(&relay->send_buf) == 0) + return; + /* Send accumulated data. */ + coio_write(&relay->io, relay->send_buf.rpos, + ibuf_used(&relay->send_buf)); + ibuf_reset(&relay->send_buf); +} + void relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) { @@ -301,12 +321,16 @@ relay_final_join_f(va_list ap) coio_enable(); relay_set_cord_name(relay->io.fd); + ibuf_create(&relay->send_buf, &cord()->slabc, + 2 * RELAY_BUFFER_SEND_THRESHOLD); /* Send all WALs until stop_vclock */ assert(relay->stream.write != NULL); recover_remaining_wals(relay->r, &relay->stream, &relay->stop_vclock, true); + relay_flush(relay); assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0); + ibuf_destroy(&relay->send_buf); return 0; } @@ -453,6 +477,7 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events) try { recover_remaining_wals(relay->r, &relay->stream, NULL, (events & WAL_EVENT_ROTATE) != 0); + relay_flush(relay); } catch (Exception *e) { relay_set_error(relay, e); fiber_cancel(fiber()); @@ -517,6 +542,8 @@ relay_subscribe_f(va_list ap) { struct relay *relay = va_arg(ap, struct relay *); struct recovery *r = relay->r; + ibuf_create(&relay->send_buf, &cord()->slabc, + 2 * RELAY_BUFFER_SEND_THRESHOLD); coio_enable(); relay_set_cord_name(relay->io.fd); @@ -623,6 +650,7 @@ relay_subscribe_f(va_list ap) cbus_endpoint_destroy(&relay->endpoint, cbus_process); relay_exit(relay); + ibuf_destroy(&relay->send_buf); return -1; } @@ -665,6 +693,39 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, diag_raise(); } +static void +relay_send_buffered(struct relay *relay, struct xrow_header *packet) +{ + struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL); + while (inj != NULL && inj->bparam) { + relay_flush(relay); + fiber_sleep(0.01); + } + + packet->sync = relay->sync; + relay->last_row_tm = ev_monotonic_now(loop()); + /* Dump row to send buffer. */ + struct iovec iov[XROW_IOVMAX]; + int iovcnt = xrow_to_iovec_xc(packet, iov); + int i; + for (i = 0; i < iovcnt; ++i) { + void *p = ibuf_alloc(&relay->send_buf, iov[i].iov_len); + if (p == NULL) + tnt_raise(OutOfMemory, iov[i].iov_len, "region", + "xrow"); + memcpy(p, iov[i].iov_base, iov[i].iov_len); + } + if (ibuf_used(&relay->send_buf) >= RELAY_BUFFER_SEND_THRESHOLD) + relay_flush(relay); + fiber_gc(); + + inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); + if (inj != NULL && inj->dparam > 0) { + relay_flush(relay); + fiber_sleep(inj->dparam); + } +} + static void relay_send(struct relay *relay, struct xrow_header *packet) { @@ -733,6 +794,6 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) say_warn("injected broken lsn: %lld", (long long) packet->lsn); } - relay_send(relay, packet); + relay_send_buffered(relay, packet); } } -- 2.20.1