From: Georgy Kirichenko <georgy@tarantool.org> To: tarantool-patches@freelists.org Cc: Roman Tokarev <dmarc-noreply@freelists.org> Subject: [tarantool-patches] Re: [PATCH] relay: send heartbeats while skipping WALs already applied by replica Date: Fri, 06 Sep 2019 10:20:40 +0300 [thread overview] Message-ID: <1797608.vBFlj55HbS@home.lan> (raw) In-Reply-To: <20190905133213.45918-1-rtokarev@corp.mail.ru> [-- Attachment #1: Type: text/plain, Size: 8583 bytes --] Hi Roman! Thanks you for your patch. I did an investigation in your issue and found that applier sends an ACK packet in two cases: if a transaction was committed if a heartbeat message was received It means that the case when a relay sends already applied transactions which are going to be skipped by an applier there is no ACK packets from the applier to the relay. Worth nothing that a relay should not send any heartbeats is this case because even rows 'to be skipped' show - the relay is alive. Based on this I would like to conclude that the best way to fix our issue is to signal an applier write condition after a transaction was skipped. Please see attached diff. I would be grateful if you tested the diff against your case. Have a nice day! On Thursday, September 5, 2019 4:32:13 PM MSK Roman Tokarev wrote: > Currently, relay doesn't send heartbeat messages while skipping rows > already applied by a replica, because heartbeats are sending in the same > thread as relay reads WALs. > > To allow heartbeats being sent this patch introduce the 'heartbeater' fiber > that sends them if no rows were sent to a replica during replication > timeout. To allow this fiber to be executed, the main fiber, that reads > WALs yields after skipped 10K rows. > > Fixes #4461 > --- > src/box/box.cc | 3 ++ > src/box/recovery.cc | 34 +++++++++-------- > src/box/relay.cc | 91 ++++++++++++++++++++++++++++++++------------- > 3 files changed, 88 insertions(+), 40 deletions(-) > > diff --git a/src/box/box.cc b/src/box/box.cc > index ac10c21ad..c779044d5 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -326,6 +326,9 @@ recovery_journal_create(struct recovery_journal > *journal, struct vclock *v) static void > apply_wal_row(struct xstream *stream, struct xrow_header *row) > { > + if (row == NULL) > + return; > + > struct request request; > xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); > if (request.type != IPROTO_NOP) { > diff --git a/src/box/recovery.cc b/src/box/recovery.cc > index d122d618a..3de5d7053 100644 > --- a/src/box/recovery.cc > +++ b/src/box/recovery.cc > @@ -256,23 +256,27 @@ recover_xlog(struct recovery *r, struct xstream > *stream, if (stop_vclock != NULL && > r->vclock.signature >= stop_vclock->signature) > return; > + int rc; > int64_t current_lsn = vclock_get(&r->vclock, row.replica_id); > if (row.lsn <= current_lsn) > - continue; /* already applied, skip */ > - > - /* > - * All rows in xlog files have an assigned > - * replica id. > - */ > - assert(row.replica_id != 0); > - /* > - * We can promote the vclock either before or > - * after xstream_write(): it only makes any impact > - * in case of forced recovery, when we skip the > - * failed row anyway. > - */ > - vclock_follow_xrow(&r->vclock, &row); > - if (xstream_write(stream, &row) == 0) { > + /* already applied, skip */ > + rc = xstream_write(stream, NULL); > + else { > + /* > + * All rows in xlog files have an assigned > + * replica id. > + */ > + assert(row.replica_id != 0); > + /* > + * We can promote the vclock either before or > + * after xstream_write(): it only makes any impact > + * in case of forced recovery, when we skip the > + * failed row anyway. > + */ > + vclock_follow_xrow(&r->vclock, &row); > + rc = xstream_write(stream, &row); > + } > + if (rc == 0) { > ++row_count; > if (row_count % 100000 == 0) > say_info("%.1fM rows processed", > diff --git a/src/box/relay.cc b/src/box/relay.cc > index a19abf6a9..baf694bd3 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -128,6 +128,8 @@ struct relay { > * confirmation from the replica. > */ > struct stailq pending_gc; > + /** Number or rows were processed. */ > + uint64_t processed; > /** Time when last row was sent to peer. */ > double last_row_time; > /** Relay sync state. */ > @@ -517,12 +519,48 @@ relay_send_heartbeat(struct relay *relay) > { > struct xrow_header row; > xrow_encode_timestamp(&row, instance_id, ev_now(loop())); > + relay_send(relay, &row); > +} > + > +/* > + * Relay heartbeater fiber function. > + * Send heartbeats to replica if no xrow has been sent during > + * replication_timeout. > + */ > +int > +relay_heartbeater_f(va_list ap) > +{ > + struct relay *relay = va_arg(ap, struct relay *); > + struct fiber *relay_f = va_arg(ap, struct fiber *); > + > try { > - relay_send(relay, &row); > + /* > + * If the replica happens to be up to date on subscribe, > + * don't wait for timeout to happen - send a heartbeat > + * message right away to update the replication lag as > + * soon as possible. > + */ > + relay_send_heartbeat(relay); > + > + while (!fiber_is_cancelled()) { > + double timeout = replication_timeout; > + struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, > + ERRINJ_DOUBLE); > + if (inj != NULL && inj->dparam != 0) > + timeout = inj->dparam; > + > + /* Check for a heartbeat timeout. */ > + if (ev_monotonic_now(loop()) - relay->last_row_time > timeout) > + relay_send_heartbeat(relay); > + > + fiber_sleep(timeout); > + } > } catch (Exception *e) { > relay_set_error(relay, e); > - fiber_cancel(fiber()); > + fiber_cancel(relay_f); > } > + > + return 0; > } > > /** > @@ -551,10 +589,6 @@ relay_subscribe_f(va_list ap) > }; > trigger_add(&r->on_close_log, &on_close_log); > > - /* Setup WAL watcher for sending new rows to the replica. */ > - wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, > - relay_process_wal_event, cbus_process); > - > /* Start fiber for receiving replica acks. */ > char name[FIBER_NAME_MAX]; > snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader"); > @@ -562,27 +596,22 @@ relay_subscribe_f(va_list ap) > fiber_set_joinable(reader, true); > fiber_start(reader, relay, fiber()); > > - /* > - * If the replica happens to be up to date on subscribe, > - * don't wait for timeout to happen - send a heartbeat > - * message right away to update the replication lag as > - * soon as possible. > - */ > - relay_send_heartbeat(relay); > + /* Start fiber for sending heartbeats to replica. */ > + snprintf(name, sizeof(name), "%s:%s", fiber()->name, "heartbeater"); > + struct fiber *heartbeater = fiber_new_xc(name, relay_heartbeater_f); > + fiber_set_joinable(heartbeater, true); > + fiber_start(heartbeater, relay, fiber()); > + > + /* Setup WAL watcher for sending new rows to the replica. */ > + wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, > + relay_process_wal_event, cbus_process); > > /* > * Run the event loop until the connection is broken > * or an error occurs. > */ > while (!fiber_is_cancelled()) { > - double timeout = replication_timeout; > - struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, > - ERRINJ_DOUBLE); > - if (inj != NULL && inj->dparam != 0) > - timeout = inj->dparam; > - > - fiber_cond_wait_deadline(&relay->reader_cond, > - relay->last_row_time + timeout); > + fiber_cond_wait(&relay->reader_cond); > > /* > * The fiber can be woken by IO cancel, by a timeout of > @@ -590,9 +619,6 @@ relay_subscribe_f(va_list ap) > * Handle cbus messages first. > */ > cbus_process(&relay->endpoint); > - /* Check for a heartbeat timeout. */ > - if (ev_monotonic_now(loop()) - relay->last_row_time > timeout) > - relay_send_heartbeat(relay); > /* > * Check that the vclock has been updated and the previous > * status message is delivered > @@ -631,9 +657,11 @@ relay_subscribe_f(va_list ap) > trigger_clear(&on_close_log); > wal_clear_watcher(&relay->wal_watcher, cbus_process); > > - /* Join ack reader fiber. */ > + /* Join ack reader & heartbeater fibers. */ > fiber_cancel(reader); > + fiber_cancel(heartbeater); > fiber_join(reader); > + fiber_join(heartbeater); > > /* Destroy cpipe to tx. */ > cbus_unpair(&relay->tx_pipe, &relay->relay_pipe, > @@ -716,6 +744,19 @@ static void > relay_send_row(struct xstream *stream, struct xrow_header *packet) > { > struct relay *relay = container_of(stream, struct relay, stream); > + > + relay->processed++; > + if (packet == NULL) { > + if (relay->processed % 10000 != 0) > + return; > + > + ev_now_update(loop()); > + if (ev_monotonic_now(loop()) - relay->last_row_time > > replication_timeout) + fiber_yield_timeout(0); > + > + return; > + } > + > assert(iproto_type_is_dml(packet->type)); > /* > * Transform replica local requests to IPROTO_NOP so as to [-- Attachment #2: diff.patch --] [-- Type: text/x-patch, Size: 1811 bytes --] diff --git a/src/box/applier.cc b/src/box/applier.cc index 6239fcfd3..51c0a3c07 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -635,13 +635,19 @@ applier_txn_commit_cb(struct trigger *trigger, void *event) /** * Apply all rows in the rows queue as a single transaction. * - * Return 0 for success or -1 in case of an error. + * Return 0 when a transaction was applied and sent to wal + * 1 when a transaction was skipped + * -1 in case of an error. */ static int applier_apply_tx(struct stailq *rows) { struct xrow_header *first_row = &stailq_first_entry(rows, struct applier_tx_row, next)->row; + /* Check if transaction is a heartbeat message. */ + if (first_row->lsn == 0) + return 1; + struct replica *replica = replica_by_id(first_row->replica_id); /* * In a full mesh topology, the same set of changes @@ -655,7 +661,7 @@ applier_apply_tx(struct stailq *rows) if (vclock_get(&replicaset.applier.vclock, first_row->replica_id) >= first_row->lsn) { latch_unlock(latch); - return 0; + return 1; } /** @@ -902,15 +908,16 @@ applier_subscribe(struct applier *applier) applier_read_tx(applier, &rows); applier->last_row_time = ev_monotonic_now(loop()); + int apply_res = applier_apply_tx(&rows); + + if (apply_res < 0) + diag_raise(); /* - * In case of an heartbeat message wake a writer up - * and check applier state. + * In case of an heartbeat message or already applied + * transaction wake a writer up and check applier state. */ - if (stailq_first_entry(&rows, struct applier_tx_row, - next)->row.lsn == 0) + if (apply_res == 1) fiber_cond_signal(&applier->writer_cond); - else if (applier_apply_tx(&rows) != 0) - diag_raise(); if (ibuf_used(ibuf) == 0) ibuf_reset(ibuf);
next prev parent reply other threads:[~2019-09-06 7:20 UTC|newest] Thread overview: 3+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-09-05 13:32 [tarantool-patches] " Roman Tokarev 2019-09-06 7:20 ` Georgy Kirichenko [this message] 2019-09-06 8:31 ` [tarantool-patches] " Georgy Kirichenko
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=1797608.vBFlj55HbS@home.lan \ --to=georgy@tarantool.org \ --cc=dmarc-noreply@freelists.org \ --cc=tarantool-patches@freelists.org \ --subject='[tarantool-patches] Re: [PATCH] relay: send heartbeats while skipping WALs already applied by replica' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox