Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH] relay: send heartbeats while skipping WALs already applied by replica
@ 2019-09-05 13:32 Roman Tokarev
  2019-09-06  7:20 ` [tarantool-patches] " Georgy Kirichenko
  0 siblings, 1 reply; 3+ messages in thread
From: Roman Tokarev @ 2019-09-05 13:32 UTC (permalink / raw)
  To: tarantool-patches

Currently, relay doesn't send heartbeat messages while skipping rows
already applied by a replica, because heartbeats are sending in the same
thread as relay reads WALs.

To allow heartbeats being sent this patch introduce the 'heartbeater' fiber
that sends them if no rows were sent to a replica during replication timeout.
To allow this fiber to be executed, the main fiber, that reads WALs yields
after skipped 10K rows.

Fixes #4461
---
 src/box/box.cc      |  3 ++
 src/box/recovery.cc | 34 +++++++++--------
 src/box/relay.cc    | 91 ++++++++++++++++++++++++++++++++-------------
 3 files changed, 88 insertions(+), 40 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ac10c21ad..c779044d5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -326,6 +326,9 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
 static void
 apply_wal_row(struct xstream *stream, struct xrow_header *row)
 {
+	if (row == NULL)
+		return;
+
 	struct request request;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
 	if (request.type != IPROTO_NOP) {
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index d122d618a..3de5d7053 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -256,23 +256,27 @@ recover_xlog(struct recovery *r, struct xstream *stream,
 		if (stop_vclock != NULL &&
 		    r->vclock.signature >= stop_vclock->signature)
 			return;
+		int rc;
 		int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
 		if (row.lsn <= current_lsn)
-			continue; /* already applied, skip */
-
-		/*
-		 * All rows in xlog files have an assigned
-		 * replica id.
-		 */
-		assert(row.replica_id != 0);
-		/*
-		 * We can promote the vclock either before or
-		 * after xstream_write(): it only makes any impact
-		 * in case of forced recovery, when we skip the
-		 * failed row anyway.
-		 */
-		vclock_follow_xrow(&r->vclock, &row);
-		if (xstream_write(stream, &row) == 0) {
+			/* already applied, skip */
+			rc = xstream_write(stream, NULL);
+		else {
+			/*
+			 * All rows in xlog files have an assigned
+			 * replica id.
+			 */
+			assert(row.replica_id != 0);
+			/*
+			 * We can promote the vclock either before or
+			 * after xstream_write(): it only makes any impact
+			 * in case of forced recovery, when we skip the
+			 * failed row anyway.
+			 */
+			vclock_follow_xrow(&r->vclock, &row);
+			rc = xstream_write(stream, &row);
+		}
+		if (rc == 0) {
 			++row_count;
 			if (row_count % 100000 == 0)
 				say_info("%.1fM rows processed",
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a19abf6a9..baf694bd3 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -128,6 +128,8 @@ struct relay {
 	 * confirmation from the replica.
 	 */
 	struct stailq pending_gc;
+	/** Number or rows were processed. */
+	uint64_t processed;
 	/** Time when last row was sent to peer. */
 	double last_row_time;
 	/** Relay sync state. */
@@ -517,12 +519,48 @@ relay_send_heartbeat(struct relay *relay)
 {
 	struct xrow_header row;
 	xrow_encode_timestamp(&row, instance_id, ev_now(loop()));
+	relay_send(relay, &row);
+}
+
+/*
+ * Relay heartbeater fiber function.
+ * Send heartbeats to replica if no xrow has been sent during
+ *  replication_timeout.
+ */
+int
+relay_heartbeater_f(va_list ap)
+{
+	struct relay *relay = va_arg(ap, struct relay *);
+	struct fiber *relay_f = va_arg(ap, struct fiber *);
+
 	try {
-		relay_send(relay, &row);
+		/*
+		 * If the replica happens to be up to date on subscribe,
+		 * don't wait for timeout to happen - send a heartbeat
+		 * message right away to update the replication lag as
+		 * soon as possible.
+		 */
+		relay_send_heartbeat(relay);
+
+		while (!fiber_is_cancelled()) {
+			double timeout = replication_timeout;
+			struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
+						    ERRINJ_DOUBLE);
+			if (inj != NULL && inj->dparam != 0)
+				timeout = inj->dparam;
+
+			/* Check for a heartbeat timeout. */
+			if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
+				relay_send_heartbeat(relay);
+
+			fiber_sleep(timeout);
+		}
 	} catch (Exception *e) {
 		relay_set_error(relay, e);
-		fiber_cancel(fiber());
+		fiber_cancel(relay_f);
 	}
+
+	return 0;
 }
 
 /**
@@ -551,10 +589,6 @@ relay_subscribe_f(va_list ap)
 	};
 	trigger_add(&r->on_close_log, &on_close_log);
 
-	/* Setup WAL watcher for sending new rows to the replica. */
-	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
-			relay_process_wal_event, cbus_process);
-
 	/* Start fiber for receiving replica acks. */
 	char name[FIBER_NAME_MAX];
 	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
@@ -562,27 +596,22 @@ relay_subscribe_f(va_list ap)
 	fiber_set_joinable(reader, true);
 	fiber_start(reader, relay, fiber());
 
-	/*
-	 * If the replica happens to be up to date on subscribe,
-	 * don't wait for timeout to happen - send a heartbeat
-	 * message right away to update the replication lag as
-	 * soon as possible.
-	 */
-	relay_send_heartbeat(relay);
+	/* Start fiber for sending heartbeats to replica. */
+	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "heartbeater");
+	struct fiber *heartbeater = fiber_new_xc(name, relay_heartbeater_f);
+	fiber_set_joinable(heartbeater, true);
+	fiber_start(heartbeater, relay, fiber());
+
+	/* Setup WAL watcher for sending new rows to the replica. */
+	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
+			relay_process_wal_event, cbus_process);
 
 	/*
 	 * Run the event loop until the connection is broken
 	 * or an error occurs.
 	 */
 	while (!fiber_is_cancelled()) {
-		double timeout = replication_timeout;
-		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
-					    ERRINJ_DOUBLE);
-		if (inj != NULL && inj->dparam != 0)
-			timeout = inj->dparam;
-
-		fiber_cond_wait_deadline(&relay->reader_cond,
-					 relay->last_row_time + timeout);
+		fiber_cond_wait(&relay->reader_cond);
 
 		/*
 		 * The fiber can be woken by IO cancel, by a timeout of
@@ -590,9 +619,6 @@ relay_subscribe_f(va_list ap)
 		 * Handle cbus messages first.
 		 */
 		cbus_process(&relay->endpoint);
-		/* Check for a heartbeat timeout. */
-		if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
-			relay_send_heartbeat(relay);
 		/*
 		 * Check that the vclock has been updated and the previous
 		 * status message is delivered
@@ -631,9 +657,11 @@ relay_subscribe_f(va_list ap)
 	trigger_clear(&on_close_log);
 	wal_clear_watcher(&relay->wal_watcher, cbus_process);
 
-	/* Join ack reader fiber. */
+	/* Join ack reader & heartbeater fibers. */
 	fiber_cancel(reader);
+	fiber_cancel(heartbeater);
 	fiber_join(reader);
+	fiber_join(heartbeater);
 
 	/* Destroy cpipe to tx. */
 	cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
@@ -716,6 +744,19 @@ static void
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
+
+	relay->processed++;
+	if (packet == NULL) {
+		if (relay->processed % 10000 != 0)
+			return;
+
+		ev_now_update(loop());
+		if (ev_monotonic_now(loop()) - relay->last_row_time > replication_timeout)
+			fiber_yield_timeout(0);
+
+		return;
+	}
+
 	assert(iproto_type_is_dml(packet->type));
 	/*
 	 * Transform replica local requests to IPROTO_NOP so as to
-- 
2.20.1

^ permalink raw reply	[flat|nested] 3+ messages in thread

* [tarantool-patches] Re: [PATCH] relay: send heartbeats while skipping WALs already applied by replica
  2019-09-05 13:32 [tarantool-patches] [PATCH] relay: send heartbeats while skipping WALs already applied by replica Roman Tokarev
@ 2019-09-06  7:20 ` Georgy Kirichenko
  2019-09-06  8:31   ` Georgy Kirichenko
  0 siblings, 1 reply; 3+ messages in thread
From: Georgy Kirichenko @ 2019-09-06  7:20 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Roman Tokarev

[-- Attachment #1: Type: text/plain, Size: 8583 bytes --]

Hi Roman!

Thanks you for your patch. I did an investigation in your issue and found that 
applier sends an ACK packet in two cases:
 if a transaction was committed
 if a heartbeat message was received

It means that the case when a relay sends already applied transactions which 
are going to be skipped by an applier there is no ACK packets from the applier 
to the relay. Worth nothing that a relay should not send any heartbeats is 
this case because even rows 'to be skipped' show - the relay is alive.

Based on this I would like to conclude that the best way to fix our issue is to 
signal an applier write condition after a transaction was skipped.

Please see attached diff. I would be grateful if you tested the diff against 
your case.

Have a nice day!



On Thursday, September 5, 2019 4:32:13 PM MSK Roman Tokarev wrote:
> Currently, relay doesn't send heartbeat messages while skipping rows
> already applied by a replica, because heartbeats are sending in the same
> thread as relay reads WALs.
> 
> To allow heartbeats being sent this patch introduce the 'heartbeater' fiber
> that sends them if no rows were sent to a replica during replication
> timeout. To allow this fiber to be executed, the main fiber, that reads
> WALs yields after skipped 10K rows.
> 
> Fixes #4461
> ---
>  src/box/box.cc      |  3 ++
>  src/box/recovery.cc | 34 +++++++++--------
>  src/box/relay.cc    | 91 ++++++++++++++++++++++++++++++++-------------
>  3 files changed, 88 insertions(+), 40 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index ac10c21ad..c779044d5 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -326,6 +326,9 @@ recovery_journal_create(struct recovery_journal
> *journal, struct vclock *v) static void
>  apply_wal_row(struct xstream *stream, struct xrow_header *row)
>  {
> +	if (row == NULL)
> +		return;
> +
>  	struct request request;
>  	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
>  	if (request.type != IPROTO_NOP) {
> diff --git a/src/box/recovery.cc b/src/box/recovery.cc
> index d122d618a..3de5d7053 100644
> --- a/src/box/recovery.cc
> +++ b/src/box/recovery.cc
> @@ -256,23 +256,27 @@ recover_xlog(struct recovery *r, struct xstream
> *stream, if (stop_vclock != NULL &&
>  		    r->vclock.signature >= stop_vclock->signature)
>  			return;
> +		int rc;
>  		int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
>  		if (row.lsn <= current_lsn)
> -			continue; /* already applied, skip */
> -
> -		/*
> -		 * All rows in xlog files have an assigned
> -		 * replica id.
> -		 */
> -		assert(row.replica_id != 0);
> -		/*
> -		 * We can promote the vclock either before or
> -		 * after xstream_write(): it only makes any impact
> -		 * in case of forced recovery, when we skip the
> -		 * failed row anyway.
> -		 */
> -		vclock_follow_xrow(&r->vclock, &row);
> -		if (xstream_write(stream, &row) == 0) {
> +			/* already applied, skip */
> +			rc = xstream_write(stream, NULL);
> +		else {
> +			/*
> +			 * All rows in xlog files have an assigned
> +			 * replica id.
> +			 */
> +			assert(row.replica_id != 0);
> +			/*
> +			 * We can promote the vclock either before or
> +			 * after xstream_write(): it only makes any impact
> +			 * in case of forced recovery, when we skip the
> +			 * failed row anyway.
> +			 */
> +			vclock_follow_xrow(&r->vclock, &row);
> +			rc = xstream_write(stream, &row);
> +		}
> +		if (rc == 0) {
>  			++row_count;
>  			if (row_count % 100000 == 0)
>  				say_info("%.1fM rows processed",
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index a19abf6a9..baf694bd3 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -128,6 +128,8 @@ struct relay {
>  	 * confirmation from the replica.
>  	 */
>  	struct stailq pending_gc;
> +	/** Number or rows were processed. */
> +	uint64_t processed;
>  	/** Time when last row was sent to peer. */
>  	double last_row_time;
>  	/** Relay sync state. */
> @@ -517,12 +519,48 @@ relay_send_heartbeat(struct relay *relay)
>  {
>  	struct xrow_header row;
>  	xrow_encode_timestamp(&row, instance_id, ev_now(loop()));
> +	relay_send(relay, &row);
> +}
> +
> +/*
> + * Relay heartbeater fiber function.
> + * Send heartbeats to replica if no xrow has been sent during
> + *  replication_timeout.
> + */
> +int
> +relay_heartbeater_f(va_list ap)
> +{
> +	struct relay *relay = va_arg(ap, struct relay *);
> +	struct fiber *relay_f = va_arg(ap, struct fiber *);
> +
>  	try {
> -		relay_send(relay, &row);
> +		/*
> +		 * If the replica happens to be up to date on subscribe,
> +		 * don't wait for timeout to happen - send a heartbeat
> +		 * message right away to update the replication lag as
> +		 * soon as possible.
> +		 */
> +		relay_send_heartbeat(relay);
> +
> +		while (!fiber_is_cancelled()) {
> +			double timeout = replication_timeout;
> +			struct errinj *inj = 
errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> +						    ERRINJ_DOUBLE);
> +			if (inj != NULL && inj->dparam != 0)
> +				timeout = inj->dparam;
> +
> +			/* Check for a heartbeat timeout. */
> +			if (ev_monotonic_now(loop()) - relay->last_row_time > 
timeout)
> +				relay_send_heartbeat(relay);
> +
> +			fiber_sleep(timeout);
> +		}
>  	} catch (Exception *e) {
>  		relay_set_error(relay, e);
> -		fiber_cancel(fiber());
> +		fiber_cancel(relay_f);
>  	}
> +
> +	return 0;
>  }
> 
>  /**
> @@ -551,10 +589,6 @@ relay_subscribe_f(va_list ap)
>  	};
>  	trigger_add(&r->on_close_log, &on_close_log);
> 
> -	/* Setup WAL watcher for sending new rows to the replica. */
> -	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
> -			relay_process_wal_event, cbus_process);
> -
>  	/* Start fiber for receiving replica acks. */
>  	char name[FIBER_NAME_MAX];
>  	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
> @@ -562,27 +596,22 @@ relay_subscribe_f(va_list ap)
>  	fiber_set_joinable(reader, true);
>  	fiber_start(reader, relay, fiber());
> 
> -	/*
> -	 * If the replica happens to be up to date on subscribe,
> -	 * don't wait for timeout to happen - send a heartbeat
> -	 * message right away to update the replication lag as
> -	 * soon as possible.
> -	 */
> -	relay_send_heartbeat(relay);
> +	/* Start fiber for sending heartbeats to replica. */
> +	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "heartbeater");
> +	struct fiber *heartbeater = fiber_new_xc(name, relay_heartbeater_f);
> +	fiber_set_joinable(heartbeater, true);
> +	fiber_start(heartbeater, relay, fiber());
> +
> +	/* Setup WAL watcher for sending new rows to the replica. */
> +	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
> +			relay_process_wal_event, cbus_process);
> 
>  	/*
>  	 * Run the event loop until the connection is broken
>  	 * or an error occurs.
>  	 */
>  	while (!fiber_is_cancelled()) {
> -		double timeout = replication_timeout;
> -		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> -					    ERRINJ_DOUBLE);
> -		if (inj != NULL && inj->dparam != 0)
> -			timeout = inj->dparam;
> -
> -		fiber_cond_wait_deadline(&relay->reader_cond,
> -					 relay->last_row_time + timeout);
> +		fiber_cond_wait(&relay->reader_cond);
> 
>  		/*
>  		 * The fiber can be woken by IO cancel, by a timeout of
> @@ -590,9 +619,6 @@ relay_subscribe_f(va_list ap)
>  		 * Handle cbus messages first.
>  		 */
>  		cbus_process(&relay->endpoint);
> -		/* Check for a heartbeat timeout. */
> -		if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
> -			relay_send_heartbeat(relay);
>  		/*
>  		 * Check that the vclock has been updated and the previous
>  		 * status message is delivered
> @@ -631,9 +657,11 @@ relay_subscribe_f(va_list ap)
>  	trigger_clear(&on_close_log);
>  	wal_clear_watcher(&relay->wal_watcher, cbus_process);
> 
> -	/* Join ack reader fiber. */
> +	/* Join ack reader & heartbeater fibers. */
>  	fiber_cancel(reader);
> +	fiber_cancel(heartbeater);
>  	fiber_join(reader);
> +	fiber_join(heartbeater);
> 
>  	/* Destroy cpipe to tx. */
>  	cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
> @@ -716,6 +744,19 @@ static void
>  relay_send_row(struct xstream *stream, struct xrow_header *packet)
>  {
>  	struct relay *relay = container_of(stream, struct relay, stream);
> +
> +	relay->processed++;
> +	if (packet == NULL) {
> +		if (relay->processed % 10000 != 0)
> +			return;
> +
> +		ev_now_update(loop());
> +		if (ev_monotonic_now(loop()) - relay->last_row_time >
> replication_timeout) +			fiber_yield_timeout(0);
> +
> +		return;
> +	}
> +
>  	assert(iproto_type_is_dml(packet->type));
>  	/*
>  	 * Transform replica local requests to IPROTO_NOP so as to


[-- Attachment #2: diff.patch --]
[-- Type: text/x-patch, Size: 1811 bytes --]

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 6239fcfd3..51c0a3c07 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -635,13 +635,19 @@ applier_txn_commit_cb(struct trigger *trigger, void *event)
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
- * Return 0 for success or -1 in case of an error.
+ * Return 0 when a transaction was applied and sent to wal
+ *        1 when a transaction was skipped
+ *        -1 in case of an error.
  */
 static int
 applier_apply_tx(struct stailq *rows)
 {
 	struct xrow_header *first_row = &stailq_first_entry(rows,
 					struct applier_tx_row, next)->row;
+	/* Check if transaction is a heartbeat message. */
+	if (first_row->lsn == 0)
+		return 1;
+
 	struct replica *replica = replica_by_id(first_row->replica_id);
 	/*
 	 * In a full mesh topology, the same set of changes
@@ -655,7 +661,7 @@ applier_apply_tx(struct stailq *rows)
 	if (vclock_get(&replicaset.applier.vclock,
 		       first_row->replica_id) >= first_row->lsn) {
 		latch_unlock(latch);
-		return 0;
+		return 1;
 	}
 
 	/**
@@ -902,15 +908,16 @@ applier_subscribe(struct applier *applier)
 		applier_read_tx(applier, &rows);
 
 		applier->last_row_time = ev_monotonic_now(loop());
+		int apply_res = applier_apply_tx(&rows);
+
+		if (apply_res < 0)
+			diag_raise();
 		/*
-		 * In case of an heartbeat message wake a writer up
-		 * and check applier state.
+		 * In case of an heartbeat message or already applied
+		 * transaction  wake a writer up and check applier state.
 		 */
-		if (stailq_first_entry(&rows, struct applier_tx_row,
-				       next)->row.lsn == 0)
+		if (apply_res == 1)
 			fiber_cond_signal(&applier->writer_cond);
-		else if (applier_apply_tx(&rows) != 0)
-			diag_raise();
 
 		if (ibuf_used(ibuf) == 0)
 			ibuf_reset(ibuf);

^ permalink raw reply	[flat|nested] 3+ messages in thread

* [tarantool-patches] Re: [PATCH] relay: send heartbeats while skipping WALs already applied by replica
  2019-09-06  7:20 ` [tarantool-patches] " Georgy Kirichenko
@ 2019-09-06  8:31   ` Georgy Kirichenko
  0 siblings, 0 replies; 3+ messages in thread
From: Georgy Kirichenko @ 2019-09-06  8:31 UTC (permalink / raw)
  To: tarantool-patches

Roman,
sorry for misunderstanding the issue you found.

There is a problem with too long xlog_cursor positioning which happens without 
any heartbeats or even yields , so I need some more time to think about.

Regards

On Friday, September 6, 2019 10:20:40 AM MSK Georgy Kirichenko wrote:
> Hi Roman!
> 
> Thanks you for your patch. I did an investigation in your issue and found
> that applier sends an ACK packet in two cases:
>  if a transaction was committed
>  if a heartbeat message was received
> 
> It means that the case when a relay sends already applied transactions which
> are going to be skipped by an applier there is no ACK packets from the
> applier to the relay. Worth nothing that a relay should not send any
> heartbeats is this case because even rows 'to be skipped' show - the relay
> is alive.
> 
> Based on this I would like to conclude that the best way to fix our issue is
> to signal an applier write condition after a transaction was skipped.
> 
> Please see attached diff. I would be grateful if you tested the diff against
> your case.
> 
> Have a nice day!
> 
> On Thursday, September 5, 2019 4:32:13 PM MSK Roman Tokarev wrote:
> > Currently, relay doesn't send heartbeat messages while skipping rows
> > already applied by a replica, because heartbeats are sending in the same
> > thread as relay reads WALs.
> > 
> > To allow heartbeats being sent this patch introduce the 'heartbeater'
> > fiber
> > that sends them if no rows were sent to a replica during replication
> > timeout. To allow this fiber to be executed, the main fiber, that reads
> > WALs yields after skipped 10K rows.
> > 
> > Fixes #4461
> > ---
> > 
> >  src/box/box.cc      |  3 ++
> >  src/box/recovery.cc | 34 +++++++++--------
> >  src/box/relay.cc    | 91 ++++++++++++++++++++++++++++++++-------------
> >  3 files changed, 88 insertions(+), 40 deletions(-)
> > 
> > diff --git a/src/box/box.cc b/src/box/box.cc
> > index ac10c21ad..c779044d5 100644
> > --- a/src/box/box.cc
> > +++ b/src/box/box.cc
> > @@ -326,6 +326,9 @@ recovery_journal_create(struct recovery_journal
> > *journal, struct vclock *v) static void
> > 
> >  apply_wal_row(struct xstream *stream, struct xrow_header *row)
> >  {
> > 
> > +	if (row == NULL)
> > +		return;
> > +
> > 
> >  	struct request request;
> >  	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> >  	if (request.type != IPROTO_NOP) {
> > 
> > diff --git a/src/box/recovery.cc b/src/box/recovery.cc
> > index d122d618a..3de5d7053 100644
> > --- a/src/box/recovery.cc
> > +++ b/src/box/recovery.cc
> > @@ -256,23 +256,27 @@ recover_xlog(struct recovery *r, struct xstream
> > *stream, if (stop_vclock != NULL &&
> > 
> >  		    r->vclock.signature >= stop_vclock->signature)
> >  			
> >  			return;
> > 
> > +		int rc;
> > 
> >  		int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
> >  		if (row.lsn <= current_lsn)
> > 
> > -			continue; /* already applied, skip */
> > -
> > -		/*
> > -		 * All rows in xlog files have an assigned
> > -		 * replica id.
> > -		 */
> > -		assert(row.replica_id != 0);
> > -		/*
> > -		 * We can promote the vclock either before or
> > -		 * after xstream_write(): it only makes any impact
> > -		 * in case of forced recovery, when we skip the
> > -		 * failed row anyway.
> > -		 */
> > -		vclock_follow_xrow(&r->vclock, &row);
> > -		if (xstream_write(stream, &row) == 0) {
> > +			/* already applied, skip */
> > +			rc = xstream_write(stream, NULL);
> > +		else {
> > +			/*
> > +			 * All rows in xlog files have an assigned
> > +			 * replica id.
> > +			 */
> > +			assert(row.replica_id != 0);
> > +			/*
> > +			 * We can promote the vclock either before or
> > +			 * after xstream_write(): it only makes any impact
> > +			 * in case of forced recovery, when we skip the
> > +			 * failed row anyway.
> > +			 */
> > +			vclock_follow_xrow(&r->vclock, &row);
> > +			rc = xstream_write(stream, &row);
> > +		}
> > +		if (rc == 0) {
> > 
> >  			++row_count;
> >  			if (row_count % 100000 == 0)
> >  			
> >  				say_info("%.1fM rows processed",
> > 
> > diff --git a/src/box/relay.cc b/src/box/relay.cc
> > index a19abf6a9..baf694bd3 100644
> > --- a/src/box/relay.cc
> > +++ b/src/box/relay.cc
> > @@ -128,6 +128,8 @@ struct relay {
> > 
> >  	 * confirmation from the replica.
> >  	 */
> >  	
> >  	struct stailq pending_gc;
> > 
> > +	/** Number or rows were processed. */
> > +	uint64_t processed;
> > 
> >  	/** Time when last row was sent to peer. */
> >  	double last_row_time;
> >  	/** Relay sync state. */
> > 
> > @@ -517,12 +519,48 @@ relay_send_heartbeat(struct relay *relay)
> > 
> >  {
> >  
> >  	struct xrow_header row;
> >  	xrow_encode_timestamp(&row, instance_id, ev_now(loop()));
> > 
> > +	relay_send(relay, &row);
> > +}
> > +
> > +/*
> > + * Relay heartbeater fiber function.
> > + * Send heartbeats to replica if no xrow has been sent during
> > + *  replication_timeout.
> > + */
> > +int
> > +relay_heartbeater_f(va_list ap)
> > +{
> > +	struct relay *relay = va_arg(ap, struct relay *);
> > +	struct fiber *relay_f = va_arg(ap, struct fiber *);
> > +
> > 
> >  	try {
> > 
> > -		relay_send(relay, &row);
> > +		/*
> > +		 * If the replica happens to be up to date on subscribe,
> > +		 * don't wait for timeout to happen - send a heartbeat
> > +		 * message right away to update the replication lag as
> > +		 * soon as possible.
> > +		 */
> > +		relay_send_heartbeat(relay);
> > +
> > +		while (!fiber_is_cancelled()) {
> > +			double timeout = replication_timeout;
> > +			struct errinj *inj =
> 
> errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> 
> > +						    ERRINJ_DOUBLE);
> > +			if (inj != NULL && inj->dparam != 0)
> > +				timeout = inj->dparam;
> > +
> > +			/* Check for a heartbeat timeout. */
> > +			if (ev_monotonic_now(loop()) - relay->last_row_time >
> 
> timeout)
> 
> > +				relay_send_heartbeat(relay);
> > +
> > +			fiber_sleep(timeout);
> > +		}
> > 
> >  	} catch (Exception *e) {
> >  	
> >  		relay_set_error(relay, e);
> > 
> > -		fiber_cancel(fiber());
> > +		fiber_cancel(relay_f);
> > 
> >  	}
> > 
> > +
> > +	return 0;
> > 
> >  }
> >  
> >  /**
> > 
> > @@ -551,10 +589,6 @@ relay_subscribe_f(va_list ap)
> > 
> >  	};
> >  	trigger_add(&r->on_close_log, &on_close_log);
> > 
> > -	/* Setup WAL watcher for sending new rows to the replica. */
> > -	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
> > -			relay_process_wal_event, cbus_process);
> > -
> > 
> >  	/* Start fiber for receiving replica acks. */
> >  	char name[FIBER_NAME_MAX];
> >  	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
> > 
> > @@ -562,27 +596,22 @@ relay_subscribe_f(va_list ap)
> > 
> >  	fiber_set_joinable(reader, true);
> >  	fiber_start(reader, relay, fiber());
> > 
> > -	/*
> > -	 * If the replica happens to be up to date on subscribe,
> > -	 * don't wait for timeout to happen - send a heartbeat
> > -	 * message right away to update the replication lag as
> > -	 * soon as possible.
> > -	 */
> > -	relay_send_heartbeat(relay);
> > +	/* Start fiber for sending heartbeats to replica. */
> > +	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "heartbeater");
> > +	struct fiber *heartbeater = fiber_new_xc(name, relay_heartbeater_f);
> > +	fiber_set_joinable(heartbeater, true);
> > +	fiber_start(heartbeater, relay, fiber());
> > +
> > +	/* Setup WAL watcher for sending new rows to the replica. */
> > +	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
> > +			relay_process_wal_event, cbus_process);
> > 
> >  	/*
> >  	
> >  	 * Run the event loop until the connection is broken
> >  	 * or an error occurs.
> >  	 */
> >  	
> >  	while (!fiber_is_cancelled()) {
> > 
> > -		double timeout = replication_timeout;
> > -		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> > -					    ERRINJ_DOUBLE);
> > -		if (inj != NULL && inj->dparam != 0)
> > -			timeout = inj->dparam;
> > -
> > -		fiber_cond_wait_deadline(&relay->reader_cond,
> > -					 relay->last_row_time + timeout);
> > +		fiber_cond_wait(&relay->reader_cond);
> > 
> >  		/*
> >  		
> >  		 * The fiber can be woken by IO cancel, by a timeout of
> > 
> > @@ -590,9 +619,6 @@ relay_subscribe_f(va_list ap)
> > 
> >  		 * Handle cbus messages first.
> >  		 */
> >  		
> >  		cbus_process(&relay->endpoint);
> > 
> > -		/* Check for a heartbeat timeout. */
> > -		if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
> > -			relay_send_heartbeat(relay);
> > 
> >  		/*
> >  		
> >  		 * Check that the vclock has been updated and the previous
> >  		 * status message is delivered
> > 
> > @@ -631,9 +657,11 @@ relay_subscribe_f(va_list ap)
> > 
> >  	trigger_clear(&on_close_log);
> >  	wal_clear_watcher(&relay->wal_watcher, cbus_process);
> > 
> > -	/* Join ack reader fiber. */
> > +	/* Join ack reader & heartbeater fibers. */
> > 
> >  	fiber_cancel(reader);
> > 
> > +	fiber_cancel(heartbeater);
> > 
> >  	fiber_join(reader);
> > 
> > +	fiber_join(heartbeater);
> > 
> >  	/* Destroy cpipe to tx. */
> >  	cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
> > 
> > @@ -716,6 +744,19 @@ static void
> > 
> >  relay_send_row(struct xstream *stream, struct xrow_header *packet)
> >  {
> >  
> >  	struct relay *relay = container_of(stream, struct relay, stream);
> > 
> > +
> > +	relay->processed++;
> > +	if (packet == NULL) {
> > +		if (relay->processed % 10000 != 0)
> > +			return;
> > +
> > +		ev_now_update(loop());
> > +		if (ev_monotonic_now(loop()) - relay->last_row_time >
> > replication_timeout) +			fiber_yield_timeout(0);
> > +
> > +		return;
> > +	}
> > +
> > 
> >  	assert(iproto_type_is_dml(packet->type));
> >  	/*
> >  	
> >  	 * Transform replica local requests to IPROTO_NOP so as to

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2019-09-06  8:31 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-09-05 13:32 [tarantool-patches] [PATCH] relay: send heartbeats while skipping WALs already applied by replica Roman Tokarev
2019-09-06  7:20 ` [tarantool-patches] " Georgy Kirichenko
2019-09-06  8:31   ` Georgy Kirichenko

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox