[tarantool-patches] Re: [PATCH] relay: send heartbeats while skipping WALs already applied by replica
Georgy Kirichenko
georgy at tarantool.org
Fri Sep 6 10:20:40 MSK 2019
Hi Roman!
Thanks you for your patch. I did an investigation in your issue and found that
applier sends an ACK packet in two cases:
if a transaction was committed
if a heartbeat message was received
It means that the case when a relay sends already applied transactions which
are going to be skipped by an applier there is no ACK packets from the applier
to the relay. Worth nothing that a relay should not send any heartbeats is
this case because even rows 'to be skipped' show - the relay is alive.
Based on this I would like to conclude that the best way to fix our issue is to
signal an applier write condition after a transaction was skipped.
Please see attached diff. I would be grateful if you tested the diff against
your case.
Have a nice day!
On Thursday, September 5, 2019 4:32:13 PM MSK Roman Tokarev wrote:
> Currently, relay doesn't send heartbeat messages while skipping rows
> already applied by a replica, because heartbeats are sending in the same
> thread as relay reads WALs.
>
> To allow heartbeats being sent this patch introduce the 'heartbeater' fiber
> that sends them if no rows were sent to a replica during replication
> timeout. To allow this fiber to be executed, the main fiber, that reads
> WALs yields after skipped 10K rows.
>
> Fixes #4461
> ---
> src/box/box.cc | 3 ++
> src/box/recovery.cc | 34 +++++++++--------
> src/box/relay.cc | 91 ++++++++++++++++++++++++++++++++-------------
> 3 files changed, 88 insertions(+), 40 deletions(-)
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index ac10c21ad..c779044d5 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -326,6 +326,9 @@ recovery_journal_create(struct recovery_journal
> *journal, struct vclock *v) static void
> apply_wal_row(struct xstream *stream, struct xrow_header *row)
> {
> + if (row == NULL)
> + return;
> +
> struct request request;
> xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> if (request.type != IPROTO_NOP) {
> diff --git a/src/box/recovery.cc b/src/box/recovery.cc
> index d122d618a..3de5d7053 100644
> --- a/src/box/recovery.cc
> +++ b/src/box/recovery.cc
> @@ -256,23 +256,27 @@ recover_xlog(struct recovery *r, struct xstream
> *stream, if (stop_vclock != NULL &&
> r->vclock.signature >= stop_vclock->signature)
> return;
> + int rc;
> int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
> if (row.lsn <= current_lsn)
> - continue; /* already applied, skip */
> -
> - /*
> - * All rows in xlog files have an assigned
> - * replica id.
> - */
> - assert(row.replica_id != 0);
> - /*
> - * We can promote the vclock either before or
> - * after xstream_write(): it only makes any impact
> - * in case of forced recovery, when we skip the
> - * failed row anyway.
> - */
> - vclock_follow_xrow(&r->vclock, &row);
> - if (xstream_write(stream, &row) == 0) {
> + /* already applied, skip */
> + rc = xstream_write(stream, NULL);
> + else {
> + /*
> + * All rows in xlog files have an assigned
> + * replica id.
> + */
> + assert(row.replica_id != 0);
> + /*
> + * We can promote the vclock either before or
> + * after xstream_write(): it only makes any impact
> + * in case of forced recovery, when we skip the
> + * failed row anyway.
> + */
> + vclock_follow_xrow(&r->vclock, &row);
> + rc = xstream_write(stream, &row);
> + }
> + if (rc == 0) {
> ++row_count;
> if (row_count % 100000 == 0)
> say_info("%.1fM rows processed",
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index a19abf6a9..baf694bd3 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -128,6 +128,8 @@ struct relay {
> * confirmation from the replica.
> */
> struct stailq pending_gc;
> + /** Number or rows were processed. */
> + uint64_t processed;
> /** Time when last row was sent to peer. */
> double last_row_time;
> /** Relay sync state. */
> @@ -517,12 +519,48 @@ relay_send_heartbeat(struct relay *relay)
> {
> struct xrow_header row;
> xrow_encode_timestamp(&row, instance_id, ev_now(loop()));
> + relay_send(relay, &row);
> +}
> +
> +/*
> + * Relay heartbeater fiber function.
> + * Send heartbeats to replica if no xrow has been sent during
> + * replication_timeout.
> + */
> +int
> +relay_heartbeater_f(va_list ap)
> +{
> + struct relay *relay = va_arg(ap, struct relay *);
> + struct fiber *relay_f = va_arg(ap, struct fiber *);
> +
> try {
> - relay_send(relay, &row);
> + /*
> + * If the replica happens to be up to date on subscribe,
> + * don't wait for timeout to happen - send a heartbeat
> + * message right away to update the replication lag as
> + * soon as possible.
> + */
> + relay_send_heartbeat(relay);
> +
> + while (!fiber_is_cancelled()) {
> + double timeout = replication_timeout;
> + struct errinj *inj =
errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> + ERRINJ_DOUBLE);
> + if (inj != NULL && inj->dparam != 0)
> + timeout = inj->dparam;
> +
> + /* Check for a heartbeat timeout. */
> + if (ev_monotonic_now(loop()) - relay->last_row_time >
timeout)
> + relay_send_heartbeat(relay);
> +
> + fiber_sleep(timeout);
> + }
> } catch (Exception *e) {
> relay_set_error(relay, e);
> - fiber_cancel(fiber());
> + fiber_cancel(relay_f);
> }
> +
> + return 0;
> }
>
> /**
> @@ -551,10 +589,6 @@ relay_subscribe_f(va_list ap)
> };
> trigger_add(&r->on_close_log, &on_close_log);
>
> - /* Setup WAL watcher for sending new rows to the replica. */
> - wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
> - relay_process_wal_event, cbus_process);
> -
> /* Start fiber for receiving replica acks. */
> char name[FIBER_NAME_MAX];
> snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
> @@ -562,27 +596,22 @@ relay_subscribe_f(va_list ap)
> fiber_set_joinable(reader, true);
> fiber_start(reader, relay, fiber());
>
> - /*
> - * If the replica happens to be up to date on subscribe,
> - * don't wait for timeout to happen - send a heartbeat
> - * message right away to update the replication lag as
> - * soon as possible.
> - */
> - relay_send_heartbeat(relay);
> + /* Start fiber for sending heartbeats to replica. */
> + snprintf(name, sizeof(name), "%s:%s", fiber()->name, "heartbeater");
> + struct fiber *heartbeater = fiber_new_xc(name, relay_heartbeater_f);
> + fiber_set_joinable(heartbeater, true);
> + fiber_start(heartbeater, relay, fiber());
> +
> + /* Setup WAL watcher for sending new rows to the replica. */
> + wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
> + relay_process_wal_event, cbus_process);
>
> /*
> * Run the event loop until the connection is broken
> * or an error occurs.
> */
> while (!fiber_is_cancelled()) {
> - double timeout = replication_timeout;
> - struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> - ERRINJ_DOUBLE);
> - if (inj != NULL && inj->dparam != 0)
> - timeout = inj->dparam;
> -
> - fiber_cond_wait_deadline(&relay->reader_cond,
> - relay->last_row_time + timeout);
> + fiber_cond_wait(&relay->reader_cond);
>
> /*
> * The fiber can be woken by IO cancel, by a timeout of
> @@ -590,9 +619,6 @@ relay_subscribe_f(va_list ap)
> * Handle cbus messages first.
> */
> cbus_process(&relay->endpoint);
> - /* Check for a heartbeat timeout. */
> - if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
> - relay_send_heartbeat(relay);
> /*
> * Check that the vclock has been updated and the previous
> * status message is delivered
> @@ -631,9 +657,11 @@ relay_subscribe_f(va_list ap)
> trigger_clear(&on_close_log);
> wal_clear_watcher(&relay->wal_watcher, cbus_process);
>
> - /* Join ack reader fiber. */
> + /* Join ack reader & heartbeater fibers. */
> fiber_cancel(reader);
> + fiber_cancel(heartbeater);
> fiber_join(reader);
> + fiber_join(heartbeater);
>
> /* Destroy cpipe to tx. */
> cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
> @@ -716,6 +744,19 @@ static void
> relay_send_row(struct xstream *stream, struct xrow_header *packet)
> {
> struct relay *relay = container_of(stream, struct relay, stream);
> +
> + relay->processed++;
> + if (packet == NULL) {
> + if (relay->processed % 10000 != 0)
> + return;
> +
> + ev_now_update(loop());
> + if (ev_monotonic_now(loop()) - relay->last_row_time >
> replication_timeout) + fiber_yield_timeout(0);
> +
> + return;
> + }
> +
> assert(iproto_type_is_dml(packet->type));
> /*
> * Transform replica local requests to IPROTO_NOP so as to
-------------- next part --------------
A non-text attachment was scrubbed...
Name: diff.patch
Type: text/x-patch
Size: 1811 bytes
Desc: not available
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20190906/6ee57449/attachment.bin>
More information about the Tarantool-patches
mailing list