From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 86AB424A55 for ; Fri, 6 Sep 2019 04:31:52 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id yOVpVMTrwp5A for ; Fri, 6 Sep 2019 04:31:52 -0400 (EDT) Received: from smtp29.i.mail.ru (smtp29.i.mail.ru [94.100.177.89]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id AD95524A4A for ; Fri, 6 Sep 2019 04:31:51 -0400 (EDT) Received: by smtp29.i.mail.ru with esmtpa (envelope-from ) id 1i69en-0004PX-E4 for tarantool-patches@freelists.org; Fri, 06 Sep 2019 11:31:49 +0300 From: Georgy Kirichenko Subject: [tarantool-patches] Re: [PATCH] relay: send heartbeats while skipping WALs already applied by replica Date: Fri, 06 Sep 2019 11:31:49 +0300 Message-ID: <5014702.VqQpfgWnbB@home.lan> In-Reply-To: <1797608.vBFlj55HbS@home.lan> References: <20190905133213.45918-1-rtokarev@corp.mail.ru> <1797608.vBFlj55HbS@home.lan> MIME-Version: 1.0 Content-Transfer-Encoding: 7Bit Content-Type: text/plain; charset="us-ascii" Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Roman, sorry for misunderstanding the issue you found. There is a problem with too long xlog_cursor positioning which happens without any heartbeats or even yields , so I need some more time to think about. Regards On Friday, September 6, 2019 10:20:40 AM MSK Georgy Kirichenko wrote: > Hi Roman! > > Thanks you for your patch. I did an investigation in your issue and found > that applier sends an ACK packet in two cases: > if a transaction was committed > if a heartbeat message was received > > It means that the case when a relay sends already applied transactions which > are going to be skipped by an applier there is no ACK packets from the > applier to the relay. Worth nothing that a relay should not send any > heartbeats is this case because even rows 'to be skipped' show - the relay > is alive. > > Based on this I would like to conclude that the best way to fix our issue is > to signal an applier write condition after a transaction was skipped. > > Please see attached diff. I would be grateful if you tested the diff against > your case. > > Have a nice day! > > On Thursday, September 5, 2019 4:32:13 PM MSK Roman Tokarev wrote: > > Currently, relay doesn't send heartbeat messages while skipping rows > > already applied by a replica, because heartbeats are sending in the same > > thread as relay reads WALs. > > > > To allow heartbeats being sent this patch introduce the 'heartbeater' > > fiber > > that sends them if no rows were sent to a replica during replication > > timeout. To allow this fiber to be executed, the main fiber, that reads > > WALs yields after skipped 10K rows. > > > > Fixes #4461 > > --- > > > > src/box/box.cc | 3 ++ > > src/box/recovery.cc | 34 +++++++++-------- > > src/box/relay.cc | 91 ++++++++++++++++++++++++++++++++------------- > > 3 files changed, 88 insertions(+), 40 deletions(-) > > > > diff --git a/src/box/box.cc b/src/box/box.cc > > index ac10c21ad..c779044d5 100644 > > --- a/src/box/box.cc > > +++ b/src/box/box.cc > > @@ -326,6 +326,9 @@ recovery_journal_create(struct recovery_journal > > *journal, struct vclock *v) static void > > > > apply_wal_row(struct xstream *stream, struct xrow_header *row) > > { > > > > + if (row == NULL) > > + return; > > + > > > > struct request request; > > xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); > > if (request.type != IPROTO_NOP) { > > > > diff --git a/src/box/recovery.cc b/src/box/recovery.cc > > index d122d618a..3de5d7053 100644 > > --- a/src/box/recovery.cc > > +++ b/src/box/recovery.cc > > @@ -256,23 +256,27 @@ recover_xlog(struct recovery *r, struct xstream > > *stream, if (stop_vclock != NULL && > > > > r->vclock.signature >= stop_vclock->signature) > > > > return; > > > > + int rc; > > > > int64_t current_lsn = vclock_get(&r->vclock, row.replica_id); > > if (row.lsn <= current_lsn) > > > > - continue; /* already applied, skip */ > > - > > - /* > > - * All rows in xlog files have an assigned > > - * replica id. > > - */ > > - assert(row.replica_id != 0); > > - /* > > - * We can promote the vclock either before or > > - * after xstream_write(): it only makes any impact > > - * in case of forced recovery, when we skip the > > - * failed row anyway. > > - */ > > - vclock_follow_xrow(&r->vclock, &row); > > - if (xstream_write(stream, &row) == 0) { > > + /* already applied, skip */ > > + rc = xstream_write(stream, NULL); > > + else { > > + /* > > + * All rows in xlog files have an assigned > > + * replica id. > > + */ > > + assert(row.replica_id != 0); > > + /* > > + * We can promote the vclock either before or > > + * after xstream_write(): it only makes any impact > > + * in case of forced recovery, when we skip the > > + * failed row anyway. > > + */ > > + vclock_follow_xrow(&r->vclock, &row); > > + rc = xstream_write(stream, &row); > > + } > > + if (rc == 0) { > > > > ++row_count; > > if (row_count % 100000 == 0) > > > > say_info("%.1fM rows processed", > > > > diff --git a/src/box/relay.cc b/src/box/relay.cc > > index a19abf6a9..baf694bd3 100644 > > --- a/src/box/relay.cc > > +++ b/src/box/relay.cc > > @@ -128,6 +128,8 @@ struct relay { > > > > * confirmation from the replica. > > */ > > > > struct stailq pending_gc; > > > > + /** Number or rows were processed. */ > > + uint64_t processed; > > > > /** Time when last row was sent to peer. */ > > double last_row_time; > > /** Relay sync state. */ > > > > @@ -517,12 +519,48 @@ relay_send_heartbeat(struct relay *relay) > > > > { > > > > struct xrow_header row; > > xrow_encode_timestamp(&row, instance_id, ev_now(loop())); > > > > + relay_send(relay, &row); > > +} > > + > > +/* > > + * Relay heartbeater fiber function. > > + * Send heartbeats to replica if no xrow has been sent during > > + * replication_timeout. > > + */ > > +int > > +relay_heartbeater_f(va_list ap) > > +{ > > + struct relay *relay = va_arg(ap, struct relay *); > > + struct fiber *relay_f = va_arg(ap, struct fiber *); > > + > > > > try { > > > > - relay_send(relay, &row); > > + /* > > + * If the replica happens to be up to date on subscribe, > > + * don't wait for timeout to happen - send a heartbeat > > + * message right away to update the replication lag as > > + * soon as possible. > > + */ > > + relay_send_heartbeat(relay); > > + > > + while (!fiber_is_cancelled()) { > > + double timeout = replication_timeout; > > + struct errinj *inj = > > errinj(ERRINJ_RELAY_REPORT_INTERVAL, > > > + ERRINJ_DOUBLE); > > + if (inj != NULL && inj->dparam != 0) > > + timeout = inj->dparam; > > + > > + /* Check for a heartbeat timeout. */ > > + if (ev_monotonic_now(loop()) - relay->last_row_time > > > timeout) > > > + relay_send_heartbeat(relay); > > + > > + fiber_sleep(timeout); > > + } > > > > } catch (Exception *e) { > > > > relay_set_error(relay, e); > > > > - fiber_cancel(fiber()); > > + fiber_cancel(relay_f); > > > > } > > > > + > > + return 0; > > > > } > > > > /** > > > > @@ -551,10 +589,6 @@ relay_subscribe_f(va_list ap) > > > > }; > > trigger_add(&r->on_close_log, &on_close_log); > > > > - /* Setup WAL watcher for sending new rows to the replica. */ > > - wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, > > - relay_process_wal_event, cbus_process); > > - > > > > /* Start fiber for receiving replica acks. */ > > char name[FIBER_NAME_MAX]; > > snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader"); > > > > @@ -562,27 +596,22 @@ relay_subscribe_f(va_list ap) > > > > fiber_set_joinable(reader, true); > > fiber_start(reader, relay, fiber()); > > > > - /* > > - * If the replica happens to be up to date on subscribe, > > - * don't wait for timeout to happen - send a heartbeat > > - * message right away to update the replication lag as > > - * soon as possible. > > - */ > > - relay_send_heartbeat(relay); > > + /* Start fiber for sending heartbeats to replica. */ > > + snprintf(name, sizeof(name), "%s:%s", fiber()->name, "heartbeater"); > > + struct fiber *heartbeater = fiber_new_xc(name, relay_heartbeater_f); > > + fiber_set_joinable(heartbeater, true); > > + fiber_start(heartbeater, relay, fiber()); > > + > > + /* Setup WAL watcher for sending new rows to the replica. */ > > + wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, > > + relay_process_wal_event, cbus_process); > > > > /* > > > > * Run the event loop until the connection is broken > > * or an error occurs. > > */ > > > > while (!fiber_is_cancelled()) { > > > > - double timeout = replication_timeout; > > - struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, > > - ERRINJ_DOUBLE); > > - if (inj != NULL && inj->dparam != 0) > > - timeout = inj->dparam; > > - > > - fiber_cond_wait_deadline(&relay->reader_cond, > > - relay->last_row_time + timeout); > > + fiber_cond_wait(&relay->reader_cond); > > > > /* > > > > * The fiber can be woken by IO cancel, by a timeout of > > > > @@ -590,9 +619,6 @@ relay_subscribe_f(va_list ap) > > > > * Handle cbus messages first. > > */ > > > > cbus_process(&relay->endpoint); > > > > - /* Check for a heartbeat timeout. */ > > - if (ev_monotonic_now(loop()) - relay->last_row_time > timeout) > > - relay_send_heartbeat(relay); > > > > /* > > > > * Check that the vclock has been updated and the previous > > * status message is delivered > > > > @@ -631,9 +657,11 @@ relay_subscribe_f(va_list ap) > > > > trigger_clear(&on_close_log); > > wal_clear_watcher(&relay->wal_watcher, cbus_process); > > > > - /* Join ack reader fiber. */ > > + /* Join ack reader & heartbeater fibers. */ > > > > fiber_cancel(reader); > > > > + fiber_cancel(heartbeater); > > > > fiber_join(reader); > > > > + fiber_join(heartbeater); > > > > /* Destroy cpipe to tx. */ > > cbus_unpair(&relay->tx_pipe, &relay->relay_pipe, > > > > @@ -716,6 +744,19 @@ static void > > > > relay_send_row(struct xstream *stream, struct xrow_header *packet) > > { > > > > struct relay *relay = container_of(stream, struct relay, stream); > > > > + > > + relay->processed++; > > + if (packet == NULL) { > > + if (relay->processed % 10000 != 0) > > + return; > > + > > + ev_now_update(loop()); > > + if (ev_monotonic_now(loop()) - relay->last_row_time > > > replication_timeout) + fiber_yield_timeout(0); > > + > > + return; > > + } > > + > > > > assert(iproto_type_is_dml(packet->type)); > > /* > > > > * Transform replica local requests to IPROTO_NOP so as to