Hi Roman! Thanks you for your patch. I did an investigation in your issue and found that applier sends an ACK packet in two cases: if a transaction was committed if a heartbeat message was received It means that the case when a relay sends already applied transactions which are going to be skipped by an applier there is no ACK packets from the applier to the relay. Worth nothing that a relay should not send any heartbeats is this case because even rows 'to be skipped' show - the relay is alive. Based on this I would like to conclude that the best way to fix our issue is to signal an applier write condition after a transaction was skipped. Please see attached diff. I would be grateful if you tested the diff against your case. Have a nice day! On Thursday, September 5, 2019 4:32:13 PM MSK Roman Tokarev wrote: > Currently, relay doesn't send heartbeat messages while skipping rows > already applied by a replica, because heartbeats are sending in the same > thread as relay reads WALs. > > To allow heartbeats being sent this patch introduce the 'heartbeater' fiber > that sends them if no rows were sent to a replica during replication > timeout. To allow this fiber to be executed, the main fiber, that reads > WALs yields after skipped 10K rows. > > Fixes #4461 > --- > src/box/box.cc | 3 ++ > src/box/recovery.cc | 34 +++++++++-------- > src/box/relay.cc | 91 ++++++++++++++++++++++++++++++++------------- > 3 files changed, 88 insertions(+), 40 deletions(-) > > diff --git a/src/box/box.cc b/src/box/box.cc > index ac10c21ad..c779044d5 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -326,6 +326,9 @@ recovery_journal_create(struct recovery_journal > *journal, struct vclock *v) static void > apply_wal_row(struct xstream *stream, struct xrow_header *row) > { > + if (row == NULL) > + return; > + > struct request request; > xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); > if (request.type != IPROTO_NOP) { > diff --git a/src/box/recovery.cc b/src/box/recovery.cc > index d122d618a..3de5d7053 100644 > --- a/src/box/recovery.cc > +++ b/src/box/recovery.cc > @@ -256,23 +256,27 @@ recover_xlog(struct recovery *r, struct xstream > *stream, if (stop_vclock != NULL && > r->vclock.signature >= stop_vclock->signature) > return; > + int rc; > int64_t current_lsn = vclock_get(&r->vclock, row.replica_id); > if (row.lsn <= current_lsn) > - continue; /* already applied, skip */ > - > - /* > - * All rows in xlog files have an assigned > - * replica id. > - */ > - assert(row.replica_id != 0); > - /* > - * We can promote the vclock either before or > - * after xstream_write(): it only makes any impact > - * in case of forced recovery, when we skip the > - * failed row anyway. > - */ > - vclock_follow_xrow(&r->vclock, &row); > - if (xstream_write(stream, &row) == 0) { > + /* already applied, skip */ > + rc = xstream_write(stream, NULL); > + else { > + /* > + * All rows in xlog files have an assigned > + * replica id. > + */ > + assert(row.replica_id != 0); > + /* > + * We can promote the vclock either before or > + * after xstream_write(): it only makes any impact > + * in case of forced recovery, when we skip the > + * failed row anyway. > + */ > + vclock_follow_xrow(&r->vclock, &row); > + rc = xstream_write(stream, &row); > + } > + if (rc == 0) { > ++row_count; > if (row_count % 100000 == 0) > say_info("%.1fM rows processed", > diff --git a/src/box/relay.cc b/src/box/relay.cc > index a19abf6a9..baf694bd3 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -128,6 +128,8 @@ struct relay { > * confirmation from the replica. > */ > struct stailq pending_gc; > + /** Number or rows were processed. */ > + uint64_t processed; > /** Time when last row was sent to peer. */ > double last_row_time; > /** Relay sync state. */ > @@ -517,12 +519,48 @@ relay_send_heartbeat(struct relay *relay) > { > struct xrow_header row; > xrow_encode_timestamp(&row, instance_id, ev_now(loop())); > + relay_send(relay, &row); > +} > + > +/* > + * Relay heartbeater fiber function. > + * Send heartbeats to replica if no xrow has been sent during > + * replication_timeout. > + */ > +int > +relay_heartbeater_f(va_list ap) > +{ > + struct relay *relay = va_arg(ap, struct relay *); > + struct fiber *relay_f = va_arg(ap, struct fiber *); > + > try { > - relay_send(relay, &row); > + /* > + * If the replica happens to be up to date on subscribe, > + * don't wait for timeout to happen - send a heartbeat > + * message right away to update the replication lag as > + * soon as possible. > + */ > + relay_send_heartbeat(relay); > + > + while (!fiber_is_cancelled()) { > + double timeout = replication_timeout; > + struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, > + ERRINJ_DOUBLE); > + if (inj != NULL && inj->dparam != 0) > + timeout = inj->dparam; > + > + /* Check for a heartbeat timeout. */ > + if (ev_monotonic_now(loop()) - relay->last_row_time > timeout) > + relay_send_heartbeat(relay); > + > + fiber_sleep(timeout); > + } > } catch (Exception *e) { > relay_set_error(relay, e); > - fiber_cancel(fiber()); > + fiber_cancel(relay_f); > } > + > + return 0; > } > > /** > @@ -551,10 +589,6 @@ relay_subscribe_f(va_list ap) > }; > trigger_add(&r->on_close_log, &on_close_log); > > - /* Setup WAL watcher for sending new rows to the replica. */ > - wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, > - relay_process_wal_event, cbus_process); > - > /* Start fiber for receiving replica acks. */ > char name[FIBER_NAME_MAX]; > snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader"); > @@ -562,27 +596,22 @@ relay_subscribe_f(va_list ap) > fiber_set_joinable(reader, true); > fiber_start(reader, relay, fiber()); > > - /* > - * If the replica happens to be up to date on subscribe, > - * don't wait for timeout to happen - send a heartbeat > - * message right away to update the replication lag as > - * soon as possible. > - */ > - relay_send_heartbeat(relay); > + /* Start fiber for sending heartbeats to replica. */ > + snprintf(name, sizeof(name), "%s:%s", fiber()->name, "heartbeater"); > + struct fiber *heartbeater = fiber_new_xc(name, relay_heartbeater_f); > + fiber_set_joinable(heartbeater, true); > + fiber_start(heartbeater, relay, fiber()); > + > + /* Setup WAL watcher for sending new rows to the replica. */ > + wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, > + relay_process_wal_event, cbus_process); > > /* > * Run the event loop until the connection is broken > * or an error occurs. > */ > while (!fiber_is_cancelled()) { > - double timeout = replication_timeout; > - struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, > - ERRINJ_DOUBLE); > - if (inj != NULL && inj->dparam != 0) > - timeout = inj->dparam; > - > - fiber_cond_wait_deadline(&relay->reader_cond, > - relay->last_row_time + timeout); > + fiber_cond_wait(&relay->reader_cond); > > /* > * The fiber can be woken by IO cancel, by a timeout of > @@ -590,9 +619,6 @@ relay_subscribe_f(va_list ap) > * Handle cbus messages first. > */ > cbus_process(&relay->endpoint); > - /* Check for a heartbeat timeout. */ > - if (ev_monotonic_now(loop()) - relay->last_row_time > timeout) > - relay_send_heartbeat(relay); > /* > * Check that the vclock has been updated and the previous > * status message is delivered > @@ -631,9 +657,11 @@ relay_subscribe_f(va_list ap) > trigger_clear(&on_close_log); > wal_clear_watcher(&relay->wal_watcher, cbus_process); > > - /* Join ack reader fiber. */ > + /* Join ack reader & heartbeater fibers. */ > fiber_cancel(reader); > + fiber_cancel(heartbeater); > fiber_join(reader); > + fiber_join(heartbeater); > > /* Destroy cpipe to tx. */ > cbus_unpair(&relay->tx_pipe, &relay->relay_pipe, > @@ -716,6 +744,19 @@ static void > relay_send_row(struct xstream *stream, struct xrow_header *packet) > { > struct relay *relay = container_of(stream, struct relay, stream); > + > + relay->processed++; > + if (packet == NULL) { > + if (relay->processed % 10000 != 0) > + return; > + > + ev_now_update(loop()); > + if (ev_monotonic_now(loop()) - relay->last_row_time > > replication_timeout) + fiber_yield_timeout(0); > + > + return; > + } > + > assert(iproto_type_is_dml(packet->type)); > /* > * Transform replica local requests to IPROTO_NOP so as to