[tarantool-patches] Re: [PATCH v2 4/4] replication: use wal memory buffer to fetch rows

Georgy Kirichenko georgy at tarantool.org
Mon Sep 23 12:35:54 MSK 2019


On Sunday, September 22, 2019 7:14:13 PM MSK Vladislav Shpilevoy wrote:
> Thanks for the patch!
> 
> On 18/09/2019 11:36, Georgy Kirichenko wrote:
> > A relay tries to create a wal memory cursor and follow them to relay
> > data to it's replica.
> 
> 'follow them to relay data to replica'? Sorry, can't parse. Please,
> rephrase.
> 
> > If a relay failed to attach to a wal memory buffer
> > or went out of the buffer
> > then the relay recovers xlogs from files and
> > the makes a new try to attach.
> 
> 'the makes'? Can't parse, please rephrase. Just 'makes'?
> Or 'then makes'?
> 
> > Closes: #3794
> 
> Please, no ':'.
Ok
> 
> Don't we need a docbot request for this feature? Is it documented
> somewhere, that our replication is from disk always?
Oh, I will ask should this be documented
> 
> See 29 comments below.
> 
> > ---
> > 
> >  src/box/relay.cc                              | 169 +++++++++---------
> >  src/box/wal.c                                 | 155 ++++++++++++++++
> > 
> > +	relay->r = NULL;
> 
> 1. Why? 'relay' is allocated using 'calloc' a few lines
> above, 'r' is NULL already.
Accepted
> 
> >  	return relay;

> > +			if (relay->status_msg.msg.route != NULL)
> > +				continue;
> 
> 2. What is happening below? And how can msg.route be NULL?
Sometimes we used this pattern to mark a cbus message as processed
> 
> > +			struct vclock *send_vclock;
> > +			if (relay->version_id < version_id(1, 7, 4))
> > +				send_vclock = &relay->r->vclock;
> 
> 3. I know, this is old code existed before your patch, and it will
> be dropped, but why do you send relay->r->vclock and don't decode it
> anywhere? Above you decoded a received vclock into relay->recv_vclock.
> relay->r->vclock is not updated, and nonetheless you send it.
Before 1.7.4 we used recovery vclock as replica vclock. So recovery does the 
work.
> 
> > +			else
> > +				send_vclock = &relay->recv_vclock;
> > +			if (vclock_sum(&relay->status_msg.vclock) ==
> > +			    vclock_sum(send_vclock))
> > +				continue;
> 
> 4. How is it possible, that a vclock you are going to send, and
> a previously sent vclock are the same?
For instance if a relay receives an heartbeat-message answer (and vclock was 
not changed)
> 
> > +			static const struct cmsg_hop route[] = {
> > -	 * Run the event loop until the connection is broken
> > -	 * or an error occurs.
> > -	 */
> 
> 5. This comment is wrong now? Why did you drop it?
Sorry
> 
> >  	while (!fiber_is_cancelled()) {
> > 
> > +			relay_set_error(relay, diag_last_error(&fiber()->diag));
> > +			break;
> > 
> >  		}
> 
> 6. I just realized, that data appears in the buffer before it
> is written to disk. Can't it happen, that something is encoded to
> the buffer, sent to replica, and then failed to be written to
> WAL?
Wal does not yield between memory and disk writes - so it is not possible to 
see non-flushed data here.
> 
> > -		cmsg_init(&relay->status_msg.msg, route);
> > -		vclock_copy(&relay->status_msg.vclock, send_vclock);
> > -		relay->status_msg.relay = relay;
> > -		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
> > +		/* Recover xlogs from files. */
> 
> 7. Please, describe here in a comment, when such switch happens.
> Why is it needed to recover both from memory and files.
Ok
> 
> > +		try {
> > 
> > +	vclock_copy(&relay->relay_vclock, replica_clock);
> 
> 8. The comment above clearly is not about this line. Please, put it
> either above the comment, or after the 'if' below.
Ok
> 
> >  	if (replica->gc == NULL) {
> >  	
> >  		replica->gc = gc_consumer_register(replica_clock, "replica 
%s",
> >  		
> >  						   tt_uuid_str(&replica->uuid));
> > 
> > @@ -567,8 +554,13 @@ relay_subscribe(struct replica *replica, int fd,
> > uint64_t sync,> 
> >  	});
> >  	
> >  	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
> > 
> > -	relay->r = recovery_new(cfg_gets("wal_dir"), false,
> > -			        replica_clock);
> > +	relay->r = NULL;
> 
> 9. Why? If relay->r is not null, then it is a leak. If it is null,
> then you don't need to nullify it again.
Ok, will check it one more time again
> 
> > +	relay->wal_dir = strdup(cfg_gets("wal_dir"));
> > +	if (relay->wal_dir == NULL) {
> > +		diag_set(OutOfMemory, strlen(cfg_gets("wal_dir")),
> 
> 10. I know, I am a fucking bore, but 'strlen() + 1', for terminating
> zero :)
))
> 
> > +			 "runtime", "wal_dir");
> 
> 11. There is a strict rule what arguments we pass to diag_raise -
> it is size, allocator function, variable name. Allocator function
> here is 'strdup', not 'runtime'.
Ok
> 
> > +		diag_raise();
> 
> 12. diag_set + diag_raise = tnt_raise. Please, replace.
I hope this will change to diag_set + return -1 in the near future
> 
> > +	}
> > 
> >  	vclock_copy(&relay->tx.vclock, replica_clock);
> >  	relay->version_id = replica_version_id;
> > 
> > @@ -612,7 +604,10 @@ static void
> > 
> >  relay_send_row(struct xstream *stream, struct xrow_header *packet)
> >  {
> >  
> >  	struct relay *relay = container_of(stream, struct relay, stream);
> > 
> > -	assert(iproto_type_is_dml(packet->type));
> > +	if (packet->type != IPROTO_OK) {
> 
> 13. How can it be an error? We don't write errors to WAL, and packet here
> is a row, written to WAL. No? What is more, if it is not 'OK', below
> you may treat it as 'NOP' accidentally.
IPROTO_OK  means a heartbeat message. The opposite case - a dml with verified 
with an assert.

> 
> > +		assert(iproto_type_is_dml(packet->type));
> > +		vclock_follow_xrow(&relay->relay_vclock, packet);
> > +	}>  	/*
> > 
> >  	 * Transform replica local requests to IPROTO_NOP so as to
> >  	 * promote vclock on the replica without actually modifying
> > 
> > diff --git a/src/box/wal.c b/src/box/wal.c
> > index e77bd1ae1..0d900270c 100644
> > --- a/src/box/wal.c
> > +++ b/src/box/wal.c
> > @@ -159,6 +159,8 @@ struct wal_writer
> > 
> >  	struct rlist watchers;
> >  	/** Xrow buffer. */
> >  	struct xrow_buf xrow_buf;
> > 
> > +	/* xrow buffer condition signaled when buffer write was done. */
> 
> 14. Please, use a capital letter to start a sentence, and
> keep 66 symbols line width for comments.
Ok
> 
> 15. Why do you need each write signaled?
To wake all relays up. And there is an error - I broadcast the condition only 
when a whole batch was written out.
> 
> 16. If it is signaled on write, then I guess the name could
> be more clear? For example, xrow_buf_write_cond?
Accepted
> 
> 17. If this is an xrow_buf's cond, then why is it out of
> the xrow_buf structure? Otherwise it is not
> 'xrow_buf_write_cond', but rather just 'write_cond'.
Because it is not the buffer responsibility. Wal coordinates its memory buffer 
and relays. And I found that xrow_buf on_commit trigger would be to 
complicated.
> 
> > +	struct fiber_cond xrow_buf_cond;
> > 
> > +	fiber_cond_create(&writer->xrow_buf_cond);
> 
> 18. Where do you call fiber_cond_destroy()?
Accepted
> 
> >  	/** Initialize eio in this thread */
> >  	coio_enable();
> > +		}
> > +		/* Check if the row is already sent. */
> 
> 19. This comment says what you do, but does not say why. This is
> what is called a narrative comment, which should be avoided, because
> they never help. How can a row be already sent, if you've read it
> just now?
Ok
> 
> > +		if (rc == 0 && vclock_get(vclock, row->replica_id) >= row->lsn)
> > +				row = &hearth_beat;
> 
> 20. Unfortunately, it is not safe to keep a pointer at an
> object out of its scope. Heartbeath's memory may be reused
> further.
Definitely yes, it is my fault. 
> 
> > +			} else
> > +				continue;
rotate_vy_log();
> > 
> > +typedef int (*wal_relay_cb)(struct xrow_header *header, void *data);
> > +
> > +/**
> > + * Struct wal memory relay structure. */
> 
> 21. Writing of good comments may be harder that writing
> code, but it does not mean, that it should not be done.
> Please, write a normal comment.
Ok
> 
> 22. We write closing */ on a separate line. The only
> exception is that if a whole comment is in one line,
> including /**.
Ok
> 
> > +struct wal_relay {
> > +	struct cmsg base;
> > +	/* Cbus pipe to wal cord. */
> > +	struct cpipe wal_pipe;
> > +	/* Cbus pipe from wal cord. */
> > +	struct cpipe relay_pipe;
> 
> 23. What is a purpose of wal_pipe, what is a purpose
> of relay_pipe, what messages do they deliver?
> I think, it should written here in a comment.
Ok
> 
> > +
> > +	/* Vclock to start relaying. */
> > +	struct vclock *vclock;
> > +	/* Callback to call for each relaying row. */
> > +	wal_relay_cb on_wal_relay;
> 
> 24. Why is that callback needed?
> I think, it should written here in a comment.
Ok
> 
> > +	/* Data pointer to pass to callback. */
> > +	void *cb_data;
> > +	/* Relaying fiber. */
> > +	struct fiber *fiber;
> > +	/* Message to cancel relaying fiber. */
> > +	struct cmsg cancel_msg;
> > +	/* Fiber condition to wait until relaying was stopped. */
> > +	struct fiber_cond done_cond;
> > +	/* Turns to true when relaying was stopped. */
> > +	bool done;
> 
> 25. Then I guess, it is 'is_stopped', no? 'Done' word is
> too common, it means nothing. What is 'done'? Also we use
> 'is_' prefix for flags (or another more sutable verb like
> 'has_', 'are_', 'was_', etc).
Ok
> 
> > +	/* Return code. */
> > +	int rc;
> > +	/* Diagnostic area. */
> > +	struct diag diag;
> > +};
> > +
> > +
> 
> 26. Please, keep one empty line between declarations.
Ok
> 
> > +int
> > +wal_relay(struct wal_relay *wal_relay, struct vclock *vclock,
> > +	  wal_relay_cb on_wal_relay, void *cb_data, const char 
*endpoint_name);
> 
> 27. This function definitely is not a one-liner for which a
> comment could be omitted. Please, write what this function does,
> why, which non-trivial arguments does it take, etc.
Ok
> 
> > +
> > 
> >  #if defined(__cplusplus)
> >  } /* extern "C" */
> >  #endif /* defined(__cplusplus) */
> > 
> > diff --git a/src/lib/core/cbus.c b/src/lib/core/cbus.c
> > index b3b1280e7..b7e6d769b 100644
> > --- a/src/lib/core/cbus.c
> > +++ b/src/lib/core/cbus.c
> > @@ -284,6 +284,9 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async
> > *watcher, int events)> 
> >  	/* Trigger task processing when the queue becomes non-empty. */
> >  	bool output_was_empty;
> > 
> > +	int old_cancel_state;
> > +	pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
> 
> 28. Why?
Sorry, this is an artifact
> 
> > +
> > 
> >  	tt_pthread_mutex_lock(&endpoint->mutex);
> >  	output_was_empty = stailq_empty(&endpoint->output);
> >  	/** Flush input */
> > 
> > @@ -297,6 +300,7 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async
> > *watcher, int events)> 
> >  		ev_async_send(endpoint->consumer, &endpoint->async);
> >  	
> >  	}
> > 
> > +	pthread_setcancelstate(old_cancel_state, NULL);
> > 
> >  }
> 
> 29. I will review the tests later, when I will fully understand the
> code. But so few tests for such a serious feature looks not enough.
> Could you test it more rigorous?
The issue - this patchset is not changing the tarantool behavior, so there is 
no visible changes to test them. However, I will try to implement some hacking 
and errinj test to show that in-memory replication is working.








More information about the Tarantool-patches mailing list