From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 0E7532319B for ; Thu, 5 Sep 2019 09:32:22 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id JJCWH2hOkj8p for ; Thu, 5 Sep 2019 09:32:21 -0400 (EDT) Received: from smtp59.i.mail.ru (smtp59.i.mail.ru [217.69.128.39]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id A083520B97 for ; Thu, 5 Sep 2019 09:32:21 -0400 (EDT) Received: by smtp59.i.mail.ru with esmtpa (envelope-from ) id 1i5rs3-0001V3-LG for tarantool-patches@freelists.org; Thu, 05 Sep 2019 16:32:20 +0300 From: "Roman Tokarev" (Redacted sender "rtokarev" for DMARC) Subject: [tarantool-patches] [PATCH] relay: send heartbeats while skipping WALs already applied by replica Date: Thu, 5 Sep 2019 16:32:13 +0300 Message-Id: <20190905133213.45918-1-rtokarev@corp.mail.ru> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Currently, relay doesn't send heartbeat messages while skipping rows already applied by a replica, because heartbeats are sending in the same thread as relay reads WALs. To allow heartbeats being sent this patch introduce the 'heartbeater' fiber that sends them if no rows were sent to a replica during replication timeout. To allow this fiber to be executed, the main fiber, that reads WALs yields after skipped 10K rows. Fixes #4461 --- src/box/box.cc | 3 ++ src/box/recovery.cc | 34 +++++++++-------- src/box/relay.cc | 91 ++++++++++++++++++++++++++++++++------------- 3 files changed, 88 insertions(+), 40 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index ac10c21ad..c779044d5 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -326,6 +326,9 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v) static void apply_wal_row(struct xstream *stream, struct xrow_header *row) { + if (row == NULL) + return; + struct request request; xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); if (request.type != IPROTO_NOP) { diff --git a/src/box/recovery.cc b/src/box/recovery.cc index d122d618a..3de5d7053 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -256,23 +256,27 @@ recover_xlog(struct recovery *r, struct xstream *stream, if (stop_vclock != NULL && r->vclock.signature >= stop_vclock->signature) return; + int rc; int64_t current_lsn = vclock_get(&r->vclock, row.replica_id); if (row.lsn <= current_lsn) - continue; /* already applied, skip */ - - /* - * All rows in xlog files have an assigned - * replica id. - */ - assert(row.replica_id != 0); - /* - * We can promote the vclock either before or - * after xstream_write(): it only makes any impact - * in case of forced recovery, when we skip the - * failed row anyway. - */ - vclock_follow_xrow(&r->vclock, &row); - if (xstream_write(stream, &row) == 0) { + /* already applied, skip */ + rc = xstream_write(stream, NULL); + else { + /* + * All rows in xlog files have an assigned + * replica id. + */ + assert(row.replica_id != 0); + /* + * We can promote the vclock either before or + * after xstream_write(): it only makes any impact + * in case of forced recovery, when we skip the + * failed row anyway. + */ + vclock_follow_xrow(&r->vclock, &row); + rc = xstream_write(stream, &row); + } + if (rc == 0) { ++row_count; if (row_count % 100000 == 0) say_info("%.1fM rows processed", diff --git a/src/box/relay.cc b/src/box/relay.cc index a19abf6a9..baf694bd3 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -128,6 +128,8 @@ struct relay { * confirmation from the replica. */ struct stailq pending_gc; + /** Number or rows were processed. */ + uint64_t processed; /** Time when last row was sent to peer. */ double last_row_time; /** Relay sync state. */ @@ -517,12 +519,48 @@ relay_send_heartbeat(struct relay *relay) { struct xrow_header row; xrow_encode_timestamp(&row, instance_id, ev_now(loop())); + relay_send(relay, &row); +} + +/* + * Relay heartbeater fiber function. + * Send heartbeats to replica if no xrow has been sent during + * replication_timeout. + */ +int +relay_heartbeater_f(va_list ap) +{ + struct relay *relay = va_arg(ap, struct relay *); + struct fiber *relay_f = va_arg(ap, struct fiber *); + try { - relay_send(relay, &row); + /* + * If the replica happens to be up to date on subscribe, + * don't wait for timeout to happen - send a heartbeat + * message right away to update the replication lag as + * soon as possible. + */ + relay_send_heartbeat(relay); + + while (!fiber_is_cancelled()) { + double timeout = replication_timeout; + struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, + ERRINJ_DOUBLE); + if (inj != NULL && inj->dparam != 0) + timeout = inj->dparam; + + /* Check for a heartbeat timeout. */ + if (ev_monotonic_now(loop()) - relay->last_row_time > timeout) + relay_send_heartbeat(relay); + + fiber_sleep(timeout); + } } catch (Exception *e) { relay_set_error(relay, e); - fiber_cancel(fiber()); + fiber_cancel(relay_f); } + + return 0; } /** @@ -551,10 +589,6 @@ relay_subscribe_f(va_list ap) }; trigger_add(&r->on_close_log, &on_close_log); - /* Setup WAL watcher for sending new rows to the replica. */ - wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, - relay_process_wal_event, cbus_process); - /* Start fiber for receiving replica acks. */ char name[FIBER_NAME_MAX]; snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader"); @@ -562,27 +596,22 @@ relay_subscribe_f(va_list ap) fiber_set_joinable(reader, true); fiber_start(reader, relay, fiber()); - /* - * If the replica happens to be up to date on subscribe, - * don't wait for timeout to happen - send a heartbeat - * message right away to update the replication lag as - * soon as possible. - */ - relay_send_heartbeat(relay); + /* Start fiber for sending heartbeats to replica. */ + snprintf(name, sizeof(name), "%s:%s", fiber()->name, "heartbeater"); + struct fiber *heartbeater = fiber_new_xc(name, relay_heartbeater_f); + fiber_set_joinable(heartbeater, true); + fiber_start(heartbeater, relay, fiber()); + + /* Setup WAL watcher for sending new rows to the replica. */ + wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, + relay_process_wal_event, cbus_process); /* * Run the event loop until the connection is broken * or an error occurs. */ while (!fiber_is_cancelled()) { - double timeout = replication_timeout; - struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, - ERRINJ_DOUBLE); - if (inj != NULL && inj->dparam != 0) - timeout = inj->dparam; - - fiber_cond_wait_deadline(&relay->reader_cond, - relay->last_row_time + timeout); + fiber_cond_wait(&relay->reader_cond); /* * The fiber can be woken by IO cancel, by a timeout of @@ -590,9 +619,6 @@ relay_subscribe_f(va_list ap) * Handle cbus messages first. */ cbus_process(&relay->endpoint); - /* Check for a heartbeat timeout. */ - if (ev_monotonic_now(loop()) - relay->last_row_time > timeout) - relay_send_heartbeat(relay); /* * Check that the vclock has been updated and the previous * status message is delivered @@ -631,9 +657,11 @@ relay_subscribe_f(va_list ap) trigger_clear(&on_close_log); wal_clear_watcher(&relay->wal_watcher, cbus_process); - /* Join ack reader fiber. */ + /* Join ack reader & heartbeater fibers. */ fiber_cancel(reader); + fiber_cancel(heartbeater); fiber_join(reader); + fiber_join(heartbeater); /* Destroy cpipe to tx. */ cbus_unpair(&relay->tx_pipe, &relay->relay_pipe, @@ -716,6 +744,19 @@ static void relay_send_row(struct xstream *stream, struct xrow_header *packet) { struct relay *relay = container_of(stream, struct relay, stream); + + relay->processed++; + if (packet == NULL) { + if (relay->processed % 10000 != 0) + return; + + ev_now_update(loop()); + if (ev_monotonic_now(loop()) - relay->last_row_time > replication_timeout) + fiber_yield_timeout(0); + + return; + } + assert(iproto_type_is_dml(packet->type)); /* * Transform replica local requests to IPROTO_NOP so as to -- 2.20.1