[tarantool-patches] Re: [PATCH] relay: send heartbeats while skipping WALs already applied by replica

Georgy Kirichenko georgy at tarantool.org
Fri Sep 6 11:31:49 MSK 2019


Roman,
sorry for misunderstanding the issue you found.

There is a problem with too long xlog_cursor positioning which happens without 
any heartbeats or even yields , so I need some more time to think about.

Regards

On Friday, September 6, 2019 10:20:40 AM MSK Georgy Kirichenko wrote:
> Hi Roman!
> 
> Thanks you for your patch. I did an investigation in your issue and found
> that applier sends an ACK packet in two cases:
>  if a transaction was committed
>  if a heartbeat message was received
> 
> It means that the case when a relay sends already applied transactions which
> are going to be skipped by an applier there is no ACK packets from the
> applier to the relay. Worth nothing that a relay should not send any
> heartbeats is this case because even rows 'to be skipped' show - the relay
> is alive.
> 
> Based on this I would like to conclude that the best way to fix our issue is
> to signal an applier write condition after a transaction was skipped.
> 
> Please see attached diff. I would be grateful if you tested the diff against
> your case.
> 
> Have a nice day!
> 
> On Thursday, September 5, 2019 4:32:13 PM MSK Roman Tokarev wrote:
> > Currently, relay doesn't send heartbeat messages while skipping rows
> > already applied by a replica, because heartbeats are sending in the same
> > thread as relay reads WALs.
> > 
> > To allow heartbeats being sent this patch introduce the 'heartbeater'
> > fiber
> > that sends them if no rows were sent to a replica during replication
> > timeout. To allow this fiber to be executed, the main fiber, that reads
> > WALs yields after skipped 10K rows.
> > 
> > Fixes #4461
> > ---
> > 
> >  src/box/box.cc      |  3 ++
> >  src/box/recovery.cc | 34 +++++++++--------
> >  src/box/relay.cc    | 91 ++++++++++++++++++++++++++++++++-------------
> >  3 files changed, 88 insertions(+), 40 deletions(-)
> > 
> > diff --git a/src/box/box.cc b/src/box/box.cc
> > index ac10c21ad..c779044d5 100644
> > --- a/src/box/box.cc
> > +++ b/src/box/box.cc
> > @@ -326,6 +326,9 @@ recovery_journal_create(struct recovery_journal
> > *journal, struct vclock *v) static void
> > 
> >  apply_wal_row(struct xstream *stream, struct xrow_header *row)
> >  {
> > 
> > +	if (row == NULL)
> > +		return;
> > +
> > 
> >  	struct request request;
> >  	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> >  	if (request.type != IPROTO_NOP) {
> > 
> > diff --git a/src/box/recovery.cc b/src/box/recovery.cc
> > index d122d618a..3de5d7053 100644
> > --- a/src/box/recovery.cc
> > +++ b/src/box/recovery.cc
> > @@ -256,23 +256,27 @@ recover_xlog(struct recovery *r, struct xstream
> > *stream, if (stop_vclock != NULL &&
> > 
> >  		    r->vclock.signature >= stop_vclock->signature)
> >  			
> >  			return;
> > 
> > +		int rc;
> > 
> >  		int64_t current_lsn = vclock_get(&r->vclock, row.replica_id);
> >  		if (row.lsn <= current_lsn)
> > 
> > -			continue; /* already applied, skip */
> > -
> > -		/*
> > -		 * All rows in xlog files have an assigned
> > -		 * replica id.
> > -		 */
> > -		assert(row.replica_id != 0);
> > -		/*
> > -		 * We can promote the vclock either before or
> > -		 * after xstream_write(): it only makes any impact
> > -		 * in case of forced recovery, when we skip the
> > -		 * failed row anyway.
> > -		 */
> > -		vclock_follow_xrow(&r->vclock, &row);
> > -		if (xstream_write(stream, &row) == 0) {
> > +			/* already applied, skip */
> > +			rc = xstream_write(stream, NULL);
> > +		else {
> > +			/*
> > +			 * All rows in xlog files have an assigned
> > +			 * replica id.
> > +			 */
> > +			assert(row.replica_id != 0);
> > +			/*
> > +			 * We can promote the vclock either before or
> > +			 * after xstream_write(): it only makes any impact
> > +			 * in case of forced recovery, when we skip the
> > +			 * failed row anyway.
> > +			 */
> > +			vclock_follow_xrow(&r->vclock, &row);
> > +			rc = xstream_write(stream, &row);
> > +		}
> > +		if (rc == 0) {
> > 
> >  			++row_count;
> >  			if (row_count % 100000 == 0)
> >  			
> >  				say_info("%.1fM rows processed",
> > 
> > diff --git a/src/box/relay.cc b/src/box/relay.cc
> > index a19abf6a9..baf694bd3 100644
> > --- a/src/box/relay.cc
> > +++ b/src/box/relay.cc
> > @@ -128,6 +128,8 @@ struct relay {
> > 
> >  	 * confirmation from the replica.
> >  	 */
> >  	
> >  	struct stailq pending_gc;
> > 
> > +	/** Number or rows were processed. */
> > +	uint64_t processed;
> > 
> >  	/** Time when last row was sent to peer. */
> >  	double last_row_time;
> >  	/** Relay sync state. */
> > 
> > @@ -517,12 +519,48 @@ relay_send_heartbeat(struct relay *relay)
> > 
> >  {
> >  
> >  	struct xrow_header row;
> >  	xrow_encode_timestamp(&row, instance_id, ev_now(loop()));
> > 
> > +	relay_send(relay, &row);
> > +}
> > +
> > +/*
> > + * Relay heartbeater fiber function.
> > + * Send heartbeats to replica if no xrow has been sent during
> > + *  replication_timeout.
> > + */
> > +int
> > +relay_heartbeater_f(va_list ap)
> > +{
> > +	struct relay *relay = va_arg(ap, struct relay *);
> > +	struct fiber *relay_f = va_arg(ap, struct fiber *);
> > +
> > 
> >  	try {
> > 
> > -		relay_send(relay, &row);
> > +		/*
> > +		 * If the replica happens to be up to date on subscribe,
> > +		 * don't wait for timeout to happen - send a heartbeat
> > +		 * message right away to update the replication lag as
> > +		 * soon as possible.
> > +		 */
> > +		relay_send_heartbeat(relay);
> > +
> > +		while (!fiber_is_cancelled()) {
> > +			double timeout = replication_timeout;
> > +			struct errinj *inj =
> 
> errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> 
> > +						    ERRINJ_DOUBLE);
> > +			if (inj != NULL && inj->dparam != 0)
> > +				timeout = inj->dparam;
> > +
> > +			/* Check for a heartbeat timeout. */
> > +			if (ev_monotonic_now(loop()) - relay->last_row_time >
> 
> timeout)
> 
> > +				relay_send_heartbeat(relay);
> > +
> > +			fiber_sleep(timeout);
> > +		}
> > 
> >  	} catch (Exception *e) {
> >  	
> >  		relay_set_error(relay, e);
> > 
> > -		fiber_cancel(fiber());
> > +		fiber_cancel(relay_f);
> > 
> >  	}
> > 
> > +
> > +	return 0;
> > 
> >  }
> >  
> >  /**
> > 
> > @@ -551,10 +589,6 @@ relay_subscribe_f(va_list ap)
> > 
> >  	};
> >  	trigger_add(&r->on_close_log, &on_close_log);
> > 
> > -	/* Setup WAL watcher for sending new rows to the replica. */
> > -	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
> > -			relay_process_wal_event, cbus_process);
> > -
> > 
> >  	/* Start fiber for receiving replica acks. */
> >  	char name[FIBER_NAME_MAX];
> >  	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
> > 
> > @@ -562,27 +596,22 @@ relay_subscribe_f(va_list ap)
> > 
> >  	fiber_set_joinable(reader, true);
> >  	fiber_start(reader, relay, fiber());
> > 
> > -	/*
> > -	 * If the replica happens to be up to date on subscribe,
> > -	 * don't wait for timeout to happen - send a heartbeat
> > -	 * message right away to update the replication lag as
> > -	 * soon as possible.
> > -	 */
> > -	relay_send_heartbeat(relay);
> > +	/* Start fiber for sending heartbeats to replica. */
> > +	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "heartbeater");
> > +	struct fiber *heartbeater = fiber_new_xc(name, relay_heartbeater_f);
> > +	fiber_set_joinable(heartbeater, true);
> > +	fiber_start(heartbeater, relay, fiber());
> > +
> > +	/* Setup WAL watcher for sending new rows to the replica. */
> > +	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
> > +			relay_process_wal_event, cbus_process);
> > 
> >  	/*
> >  	
> >  	 * Run the event loop until the connection is broken
> >  	 * or an error occurs.
> >  	 */
> >  	
> >  	while (!fiber_is_cancelled()) {
> > 
> > -		double timeout = replication_timeout;
> > -		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> > -					    ERRINJ_DOUBLE);
> > -		if (inj != NULL && inj->dparam != 0)
> > -			timeout = inj->dparam;
> > -
> > -		fiber_cond_wait_deadline(&relay->reader_cond,
> > -					 relay->last_row_time + timeout);
> > +		fiber_cond_wait(&relay->reader_cond);
> > 
> >  		/*
> >  		
> >  		 * The fiber can be woken by IO cancel, by a timeout of
> > 
> > @@ -590,9 +619,6 @@ relay_subscribe_f(va_list ap)
> > 
> >  		 * Handle cbus messages first.
> >  		 */
> >  		
> >  		cbus_process(&relay->endpoint);
> > 
> > -		/* Check for a heartbeat timeout. */
> > -		if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
> > -			relay_send_heartbeat(relay);
> > 
> >  		/*
> >  		
> >  		 * Check that the vclock has been updated and the previous
> >  		 * status message is delivered
> > 
> > @@ -631,9 +657,11 @@ relay_subscribe_f(va_list ap)
> > 
> >  	trigger_clear(&on_close_log);
> >  	wal_clear_watcher(&relay->wal_watcher, cbus_process);
> > 
> > -	/* Join ack reader fiber. */
> > +	/* Join ack reader & heartbeater fibers. */
> > 
> >  	fiber_cancel(reader);
> > 
> > +	fiber_cancel(heartbeater);
> > 
> >  	fiber_join(reader);
> > 
> > +	fiber_join(heartbeater);
> > 
> >  	/* Destroy cpipe to tx. */
> >  	cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
> > 
> > @@ -716,6 +744,19 @@ static void
> > 
> >  relay_send_row(struct xstream *stream, struct xrow_header *packet)
> >  {
> >  
> >  	struct relay *relay = container_of(stream, struct relay, stream);
> > 
> > +
> > +	relay->processed++;
> > +	if (packet == NULL) {
> > +		if (relay->processed % 10000 != 0)
> > +			return;
> > +
> > +		ev_now_update(loop());
> > +		if (ev_monotonic_now(loop()) - relay->last_row_time >
> > replication_timeout) +			fiber_yield_timeout(0);
> > +
> > +		return;
> > +	}
> > +
> > 
> >  	assert(iproto_type_is_dml(packet->type));
> >  	/*
> >  	
> >  	 * Transform replica local requests to IPROTO_NOP so as to








More information about the Tarantool-patches mailing list