Tarantool development patches archive
 help / color / mirror / Atom feed
From: Georgy Kirichenko <georgy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: Roman Tokarev <dmarc-noreply@freelists.org>
Subject: [tarantool-patches] Re: [PATCH] relay: send heartbeats while skipping WALs already applied by replica
Date: Fri, 06 Sep 2019 10:20:40 +0300	[thread overview]
Message-ID: <1797608.vBFlj55HbS@home.lan> (raw)
In-Reply-To: <20190905133213.45918-1-rtokarev@corp.mail.ru>

[-- Attachment #1: Type: text/plain, Size: 8583 bytes --]

Hi Roman!

Thanks you for your patch. I did an investigation in your issue and found that 
applier sends an ACK packet in two cases:
 if a transaction was committed
 if a heartbeat message was received

It means that the case when a relay sends already applied transactions which 
are going to be skipped by an applier there is no ACK packets from the applier 
to the relay. Worth nothing that a relay should not send any heartbeats is 
this case because even rows 'to be skipped' show - the relay is alive.

Based on this I would like to conclude that the best way to fix our issue is to 
signal an applier write condition after a transaction was skipped.

Please see attached diff. I would be grateful if you tested the diff against 
your case.

Have a nice day!



On Thursday, September 5, 2019 4:32:13 PM MSK Roman Tokarev wrote:
> Currently, relay doesn't send heartbeat messages while skipping rows
> already applied by a replica, because heartbeats are sending in the same
> thread as relay reads WALs.
> 
> To allow heartbeats being sent this patch introduce the 'heartbeater' fiber
> that sends them if no rows were sent to a replica during replication
> timeout. To allow this fiber to be executed, the main fiber, that reads
> WALs yields after skipped 10K rows.
> 
> Fixes #4461
> ---
>  src/box/box.cc      |  3 ++
>  src/box/recovery.cc | 34 +++++++++--------
>  src/box/relay.cc    | 91 ++++++++++++++++++++++++++++++++-------------
>  3 files changed, 88 insertions(+), 40 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index ac10c21ad..c779044d5 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -326,6 +326,9 @@ recovery_journal_create(struct recovery_journal
> *journal, struct vclock *v) static void
>  apply_wal_row(struct xstream *stream, struct xrow_header *row)
>  {
> +	if (row == NULL)
> +		return;
> +
>  	struct request request;
>  	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
>  	if (request.type != IPROTO_NOP) {
> diff --git a/src/box/recovery.cc b/src/box/recovery.cc
> index d122d618a..3de5d7053 100644
> --- a/src/box/recovery.cc
> +++ b/src/box/recovery.cc
> @@ -256,23 +256,27 @@ recover_xlog(struct recovery *r, struct xstream
> *stream, if (stop_vclock != NULL &&
>  		    r->vclock.signature >= stop_vclock->signature)
>  			return;
> +		int rc;
>  		int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
>  		if (row.lsn <= current_lsn)
> -			continue; /* already applied, skip */
> -
> -		/*
> -		 * All rows in xlog files have an assigned
> -		 * replica id.
> -		 */
> -		assert(row.replica_id != 0);
> -		/*
> -		 * We can promote the vclock either before or
> -		 * after xstream_write(): it only makes any impact
> -		 * in case of forced recovery, when we skip the
> -		 * failed row anyway.
> -		 */
> -		vclock_follow_xrow(&r->vclock, &row);
> -		if (xstream_write(stream, &row) == 0) {
> +			/* already applied, skip */
> +			rc = xstream_write(stream, NULL);
> +		else {
> +			/*
> +			 * All rows in xlog files have an assigned
> +			 * replica id.
> +			 */
> +			assert(row.replica_id != 0);
> +			/*
> +			 * We can promote the vclock either before or
> +			 * after xstream_write(): it only makes any impact
> +			 * in case of forced recovery, when we skip the
> +			 * failed row anyway.
> +			 */
> +			vclock_follow_xrow(&r->vclock, &row);
> +			rc = xstream_write(stream, &row);
> +		}
> +		if (rc == 0) {
>  			++row_count;
>  			if (row_count % 100000 == 0)
>  				say_info("%.1fM rows processed",
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index a19abf6a9..baf694bd3 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -128,6 +128,8 @@ struct relay {
>  	 * confirmation from the replica.
>  	 */
>  	struct stailq pending_gc;
> +	/** Number or rows were processed. */
> +	uint64_t processed;
>  	/** Time when last row was sent to peer. */
>  	double last_row_time;
>  	/** Relay sync state. */
> @@ -517,12 +519,48 @@ relay_send_heartbeat(struct relay *relay)
>  {
>  	struct xrow_header row;
>  	xrow_encode_timestamp(&row, instance_id, ev_now(loop()));
> +	relay_send(relay, &row);
> +}
> +
> +/*
> + * Relay heartbeater fiber function.
> + * Send heartbeats to replica if no xrow has been sent during
> + *  replication_timeout.
> + */
> +int
> +relay_heartbeater_f(va_list ap)
> +{
> +	struct relay *relay = va_arg(ap, struct relay *);
> +	struct fiber *relay_f = va_arg(ap, struct fiber *);
> +
>  	try {
> -		relay_send(relay, &row);
> +		/*
> +		 * If the replica happens to be up to date on subscribe,
> +		 * don't wait for timeout to happen - send a heartbeat
> +		 * message right away to update the replication lag as
> +		 * soon as possible.
> +		 */
> +		relay_send_heartbeat(relay);
> +
> +		while (!fiber_is_cancelled()) {
> +			double timeout = replication_timeout;
> +			struct errinj *inj = 
errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> +						    ERRINJ_DOUBLE);
> +			if (inj != NULL && inj->dparam != 0)
> +				timeout = inj->dparam;
> +
> +			/* Check for a heartbeat timeout. */
> +			if (ev_monotonic_now(loop()) - relay->last_row_time > 
timeout)
> +				relay_send_heartbeat(relay);
> +
> +			fiber_sleep(timeout);
> +		}
>  	} catch (Exception *e) {
>  		relay_set_error(relay, e);
> -		fiber_cancel(fiber());
> +		fiber_cancel(relay_f);
>  	}
> +
> +	return 0;
>  }
> 
>  /**
> @@ -551,10 +589,6 @@ relay_subscribe_f(va_list ap)
>  	};
>  	trigger_add(&r->on_close_log, &on_close_log);
> 
> -	/* Setup WAL watcher for sending new rows to the replica. */
> -	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
> -			relay_process_wal_event, cbus_process);
> -
>  	/* Start fiber for receiving replica acks. */
>  	char name[FIBER_NAME_MAX];
>  	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
> @@ -562,27 +596,22 @@ relay_subscribe_f(va_list ap)
>  	fiber_set_joinable(reader, true);
>  	fiber_start(reader, relay, fiber());
> 
> -	/*
> -	 * If the replica happens to be up to date on subscribe,
> -	 * don't wait for timeout to happen - send a heartbeat
> -	 * message right away to update the replication lag as
> -	 * soon as possible.
> -	 */
> -	relay_send_heartbeat(relay);
> +	/* Start fiber for sending heartbeats to replica. */
> +	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "heartbeater");
> +	struct fiber *heartbeater = fiber_new_xc(name, relay_heartbeater_f);
> +	fiber_set_joinable(heartbeater, true);
> +	fiber_start(heartbeater, relay, fiber());
> +
> +	/* Setup WAL watcher for sending new rows to the replica. */
> +	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
> +			relay_process_wal_event, cbus_process);
> 
>  	/*
>  	 * Run the event loop until the connection is broken
>  	 * or an error occurs.
>  	 */
>  	while (!fiber_is_cancelled()) {
> -		double timeout = replication_timeout;
> -		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> -					    ERRINJ_DOUBLE);
> -		if (inj != NULL && inj->dparam != 0)
> -			timeout = inj->dparam;
> -
> -		fiber_cond_wait_deadline(&relay->reader_cond,
> -					 relay->last_row_time + timeout);
> +		fiber_cond_wait(&relay->reader_cond);
> 
>  		/*
>  		 * The fiber can be woken by IO cancel, by a timeout of
> @@ -590,9 +619,6 @@ relay_subscribe_f(va_list ap)
>  		 * Handle cbus messages first.
>  		 */
>  		cbus_process(&relay->endpoint);
> -		/* Check for a heartbeat timeout. */
> -		if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
> -			relay_send_heartbeat(relay);
>  		/*
>  		 * Check that the vclock has been updated and the previous
>  		 * status message is delivered
> @@ -631,9 +657,11 @@ relay_subscribe_f(va_list ap)
>  	trigger_clear(&on_close_log);
>  	wal_clear_watcher(&relay->wal_watcher, cbus_process);
> 
> -	/* Join ack reader fiber. */
> +	/* Join ack reader & heartbeater fibers. */
>  	fiber_cancel(reader);
> +	fiber_cancel(heartbeater);
>  	fiber_join(reader);
> +	fiber_join(heartbeater);
> 
>  	/* Destroy cpipe to tx. */
>  	cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
> @@ -716,6 +744,19 @@ static void
>  relay_send_row(struct xstream *stream, struct xrow_header *packet)
>  {
>  	struct relay *relay = container_of(stream, struct relay, stream);
> +
> +	relay->processed++;
> +	if (packet == NULL) {
> +		if (relay->processed % 10000 != 0)
> +			return;
> +
> +		ev_now_update(loop());
> +		if (ev_monotonic_now(loop()) - relay->last_row_time >
> replication_timeout) +			fiber_yield_timeout(0);
> +
> +		return;
> +	}
> +
>  	assert(iproto_type_is_dml(packet->type));
>  	/*
>  	 * Transform replica local requests to IPROTO_NOP so as to


[-- Attachment #2: diff.patch --]
[-- Type: text/x-patch, Size: 1811 bytes --]

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 6239fcfd3..51c0a3c07 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -635,13 +635,19 @@ applier_txn_commit_cb(struct trigger *trigger, void *event)
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
- * Return 0 for success or -1 in case of an error.
+ * Return 0 when a transaction was applied and sent to wal
+ *        1 when a transaction was skipped
+ *        -1 in case of an error.
  */
 static int
 applier_apply_tx(struct stailq *rows)
 {
 	struct xrow_header *first_row = &stailq_first_entry(rows,
 					struct applier_tx_row, next)->row;
+	/* Check if transaction is a heartbeat message. */
+	if (first_row->lsn == 0)
+		return 1;
+
 	struct replica *replica = replica_by_id(first_row->replica_id);
 	/*
 	 * In a full mesh topology, the same set of changes
@@ -655,7 +661,7 @@ applier_apply_tx(struct stailq *rows)
 	if (vclock_get(&replicaset.applier.vclock,
 		       first_row->replica_id) >= first_row->lsn) {
 		latch_unlock(latch);
-		return 0;
+		return 1;
 	}
 
 	/**
@@ -902,15 +908,16 @@ applier_subscribe(struct applier *applier)
 		applier_read_tx(applier, &rows);
 
 		applier->last_row_time = ev_monotonic_now(loop());
+		int apply_res = applier_apply_tx(&rows);
+
+		if (apply_res < 0)
+			diag_raise();
 		/*
-		 * In case of an heartbeat message wake a writer up
-		 * and check applier state.
+		 * In case of an heartbeat message or already applied
+		 * transaction  wake a writer up and check applier state.
 		 */
-		if (stailq_first_entry(&rows, struct applier_tx_row,
-				       next)->row.lsn == 0)
+		if (apply_res == 1)
 			fiber_cond_signal(&applier->writer_cond);
-		else if (applier_apply_tx(&rows) != 0)
-			diag_raise();
 
 		if (ibuf_used(ibuf) == 0)
 			ibuf_reset(ibuf);

  reply	other threads:[~2019-09-06  7:20 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-09-05 13:32 [tarantool-patches] " Roman Tokarev
2019-09-06  7:20 ` Georgy Kirichenko [this message]
2019-09-06  8:31   ` [tarantool-patches] " Georgy Kirichenko

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1797608.vBFlj55HbS@home.lan \
    --to=georgy@tarantool.org \
    --cc=dmarc-noreply@freelists.org \
    --cc=tarantool-patches@freelists.org \
    --subject='[tarantool-patches] Re: [PATCH] relay: send heartbeats while skipping WALs already applied by replica' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox