[tarantool-patches] Re: [PATCH v2 4/4] replication: use wal memory buffer to fetch rows

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sun Sep 22 19:14:13 MSK 2019


Thanks for the patch!

On 18/09/2019 11:36, Georgy Kirichenko wrote:
> A relay tries to create a wal memory cursor and follow them to relay
> data to it's replica.

'follow them to relay data to replica'? Sorry, can't parse. Please,
rephrase.

> If a relay failed to attach to a wal memory buffer
> or went out of the buffer
> then the relay recovers xlogs from files and
> the makes a new try to attach.

'the makes'? Can't parse, please rephrase. Just 'makes'?
Or 'then makes'?

> 
> Closes: #3794

Please, no ':'.

Don't we need a docbot request for this feature? Is it documented
somewhere, that our replication is from disk always?

See 29 comments below.

> ---
>  src/box/relay.cc                              | 169 +++++++++---------
>  src/box/wal.c                                 | 155 ++++++++++++++++
>  src/box/wal.h                                 |  36 ++++
>  src/lib/core/cbus.c                           |   4 +
>  src/lib/core/errinj.h                         |   1 +
>  test/box/errinj.result                        |   2 +
>  test/replication/force_recovery.result        |   8 +
>  test/replication/force_recovery.test.lua      |   2 +
>  test/replication/replica_rejoin.result        |   8 +
>  test/replication/replica_rejoin.test.lua      |   2 +
>  .../show_error_on_disconnect.result           |   8 +
>  .../show_error_on_disconnect.test.lua         |   2 +
>  test/replication/suite.ini                    |   2 +-
>  test/xlog/panic_on_wal_error.result           |  12 ++
>  test/xlog/panic_on_wal_error.test.lua         |   3 +
>  test/xlog/suite.ini                           |   2 +-
>  16 files changed, 327 insertions(+), 89 deletions(-)
> 
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 21674119d..1e65d6d56 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -161,9 +163,9 @@ relay_new(struct replica *replica)
>  	}
>  	relay->replica = replica;
>  	relay->last_row_time = ev_monotonic_now(loop());
> -	fiber_cond_create(&relay->reader_cond);
>  	diag_create(&relay->diag);
>  	relay->state = RELAY_OFF;
> +	relay->r = NULL;

1. Why? 'relay' is allocated using 'calloc' a few lines
above, 'r' is NULL already.

>  	return relay;
>  }
>  
> @@ -404,7 +382,24 @@ relay_reader_f(va_list ap)
>  			/* vclock is followed while decoding, zeroing it. */
>  			vclock_create(&relay->recv_vclock);
>  			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
> -			fiber_cond_signal(&relay->reader_cond);
> +			if (relay->status_msg.msg.route != NULL)
> +				continue;

2. What is happening below? And how can msg.route be NULL?

> +			struct vclock *send_vclock;
> +			if (relay->version_id < version_id(1, 7, 4))
> +				send_vclock = &relay->r->vclock;

3. I know, this is old code existed before your patch, and it will
be dropped, but why do you send relay->r->vclock and don't decode it
anywhere? Above you decoded a received vclock into relay->recv_vclock.
relay->r->vclock is not updated, and nonetheless you send it.

> +			else
> +				send_vclock = &relay->recv_vclock;
> +			if (vclock_sum(&relay->status_msg.vclock) ==
> +			    vclock_sum(send_vclock))
> +				continue;

4. How is it possible, that a vclock you are going to send, and
a previously sent vclock are the same?

> +			static const struct cmsg_hop route[] = {
> +				{tx_status_update, NULL}
> +			};
> +			cmsg_init(&relay->status_msg.msg, route);
> +			vclock_copy(&relay->status_msg.vclock,
> +				    send_vclock);
> +			relay->status_msg.relay = relay;
> +			cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
>  		}
>  	} catch (Exception *e) {
>  		relay_set_error(relay, e);
> @@ -469,50 +480,28 @@ relay_subscribe_f(va_list ap)
>  	 */
>  	relay_send_heartbeat(relay);
>  
> -	/*
> -	 * Run the event loop until the connection is broken
> -	 * or an error occurs.
> -	 */

5. This comment is wrong now? Why did you drop it?

>  	while (!fiber_is_cancelled()) {
> -		double timeout = replication_timeout;
> -		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> -					    ERRINJ_DOUBLE);
> -		if (inj != NULL && inj->dparam != 0)
> -			timeout = inj->dparam;
> -
> -		fiber_cond_wait_deadline(&relay->reader_cond,
> -					 relay->last_row_time + timeout);
> -
> -		/*
> -		 * The fiber can be woken by IO cancel, by a timeout of
> -		 * status messaging or by an acknowledge to status message.
> -		 * Handle cbus messages first.
> -		 */
> -		cbus_process(&relay->endpoint);
> -		/* Check for a heartbeat timeout. */
> -		if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
> -			relay_send_heartbeat(relay);
> -		/*
> -		 * Check that the vclock has been updated and the previous
> -		 * status message is delivered
> -		 */
> -		if (relay->status_msg.msg.route != NULL)
> -			continue;
> -		struct vclock *send_vclock;
> -		if (relay->version_id < version_id(1, 7, 4))
> -			send_vclock = &r->vclock;
> -		else
> -			send_vclock = &relay->recv_vclock;
> -		if (vclock_sum(&relay->status_msg.vclock) ==
> -		    vclock_sum(send_vclock))
> -			continue;
> -		static const struct cmsg_hop route[] = {
> -			{tx_status_update, NULL}
> +		/* Try to relay direct from wal memory buffer. */
> +		if (wal_relay(&relay->wal_relay, &relay->relay_vclock,
> +			      relay_send_cb, relay,
> +			      tt_sprintf("relay_%p", relay)) != 0) {
> +			relay_set_error(relay, diag_last_error(&fiber()->diag));
> +			break;
>  		}
6. I just realized, that data appears in the buffer before it
is written to disk. Can't it happen, that something is encoded to
the buffer, sent to replica, and then failed to be written to
WAL?

> -		cmsg_init(&relay->status_msg.msg, route);
> -		vclock_copy(&relay->status_msg.vclock, send_vclock);
> -		relay->status_msg.relay = relay;
> -		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
> +		/* Recover xlogs from files. */

7. Please, describe here in a comment, when such switch happens.
Why is it needed to recover both from memory and files.

> +		try {
> +			relay->r = recovery_new(relay->wal_dir, false,
> +					        &relay->relay_vclock);
> +			auto relay_guard = make_scoped_guard([&] {
> +				recovery_delete(relay->r);
> +				relay->r = NULL;
> +			});
> +			recover_remaining_wals(relay->r, &relay->stream,
> +					       NULL, true);
> +		} catch (Exception *e) {
> +			relay_set_error(relay, e);
> +			break;
> +		}
>  	}
>  
>  	/*
> @@ -553,6 +539,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
>  	 * unless it has already been registered by initial
>  	 * join.
>  	 */
> +	vclock_copy(&relay->relay_vclock, replica_clock);

8. The comment above clearly is not about this line. Please, put it
either above the comment, or after the 'if' below.

>  	if (replica->gc == NULL) {
>  		replica->gc = gc_consumer_register(replica_clock, "replica %s",
>  						   tt_uuid_str(&replica->uuid));
> @@ -567,8 +554,13 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
>  	});
>  
>  	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
> -	relay->r = recovery_new(cfg_gets("wal_dir"), false,
> -			        replica_clock);
> +	relay->r = NULL;

9. Why? If relay->r is not null, then it is a leak. If it is null,
then you don't need to nullify it again.

> +	relay->wal_dir = strdup(cfg_gets("wal_dir"));
> +	if (relay->wal_dir == NULL) {
> +		diag_set(OutOfMemory, strlen(cfg_gets("wal_dir")),

10. I know, I am a fucking bore, but 'strlen() + 1', for terminating
zero :)

> +			 "runtime", "wal_dir");

11. There is a strict rule what arguments we pass to diag_raise -
it is size, allocator function, variable name. Allocator function
here is 'strdup', not 'runtime'.

> +		diag_raise();

12. diag_set + diag_raise = tnt_raise. Please, replace.

> +	}
>  	vclock_copy(&relay->tx.vclock, replica_clock);
>  	relay->version_id = replica_version_id;
>  
> @@ -612,7 +604,10 @@ static void
>  relay_send_row(struct xstream *stream, struct xrow_header *packet)
>  {
>  	struct relay *relay = container_of(stream, struct relay, stream);
> -	assert(iproto_type_is_dml(packet->type));
> +	if (packet->type != IPROTO_OK) {

13. How can it be an error? We don't write errors to WAL, and packet here
is a row, written to WAL. No? What is more, if it is not 'OK', below
you may treat it as 'NOP' accidentally.

> +		assert(iproto_type_is_dml(packet->type));
> +		vclock_follow_xrow(&relay->relay_vclock, packet);
> +	}>  	/*
>  	 * Transform replica local requests to IPROTO_NOP so as to
>  	 * promote vclock on the replica without actually modifying
> diff --git a/src/box/wal.c b/src/box/wal.c
> index e77bd1ae1..0d900270c 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -159,6 +159,8 @@ struct wal_writer
>  	struct rlist watchers;
>  	/** Xrow buffer. */
>  	struct xrow_buf xrow_buf;
> +	/* xrow buffer condition signaled when buffer write was done. */

14. Please, use a capital letter to start a sentence, and
keep 66 symbols line width for comments.

15. Why do you need each write signaled?

16. If it is signaled on write, then I guess the name could
be more clear? For example, xrow_buf_write_cond?

17. If this is an xrow_buf's cond, then why is it out of
the xrow_buf structure? Otherwise it is not
'xrow_buf_write_cond', but rather just 'write_cond'.

> +	struct fiber_cond xrow_buf_cond;
>  };
>  
>  struct wal_msg {
> @@ -1131,6 +1134,7 @@ wal_writer_f(va_list ap)
>  	(void) ap;
>  	struct wal_writer *writer = &wal_writer_singleton;
>  	xrow_buf_create(&writer->xrow_buf);
> +	fiber_cond_create(&writer->xrow_buf_cond);

18. Where do you call fiber_cond_destroy()?

>  
>  	/** Initialize eio in this thread */
>  	coio_enable();
> @@ -1451,3 +1455,154 @@ wal_atfork()
>  	if (xlog_is_open(&vy_log_writer.xlog))
>  		xlog_atfork(&vy_log_writer.xlog);
>  }
> +
> +/* Wake relay when wal_relay exited. */
> +static void
> +wal_relay_done(struct cmsg *base)
> +{
> +	struct wal_relay *msg =
> +		container_of(base, struct wal_relay, base);
> +	msg->done = true;
> +	fiber_cond_signal(&msg->done_cond);
> +}
> +
> +/* Wal relay fiber function. */
> +static int
> +wal_relay_f(va_list ap)
> +{
> +	struct wal_writer *writer = &wal_writer_singleton;
> +	struct wal_relay *msg = va_arg(ap, struct wal_relay *);
> +	struct vclock *vclock = msg->vclock;
> +	wal_relay_cb on_wal_relay = msg->on_wal_relay;
> +	void *cb_data = msg->cb_data;
> +
> +	double last_row_time = ev_monotonic_now(loop());
> +
> +	struct xrow_buf_cursor cursor;
> +	/* Attach to a wal memory. */
> +	if (xrow_buf_cursor_create(&writer->xrow_buf, &cursor, vclock) != 0)
> +		goto done;
> +	while (!fiber_is_cancelled()) {
> +		struct xrow_header *row;
> +		void *data;
> +		size_t size;
> +		/* Fetch the next row. */
> +		int rc = xrow_buf_cursor_next(&writer->xrow_buf, &cursor,
> +					     &row, &data, &size);
> +		if (rc < 0) {
> +			/*
> +			 * Wal memory buffer was rotated and we are not in
> +			 * memory.
> +			 */
> +			goto done;
> +		}
> +		/* Check if the row is already sent. */

19. This comment says what you do, but does not say why. This is
what is called a narrative comment, which should be avoided, because
they never help. How can a row be already sent, if you've read it
just now?

> +		if (rc == 0 && vclock_get(vclock, row->replica_id) >= row->lsn)
> +			continue;
> +		ERROR_INJECT(ERRINJ_WAL_MEM_IGNORE, goto done; );
> +		if (rc > 0) {
> +			/* All wal memory was relayed, wait for new data. */
> +			double timeout = replication_timeout;
> +			struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> +						    ERRINJ_DOUBLE);
> +			if (inj != NULL && inj->dparam != 0)
> +				timeout = inj->dparam;
> +
> +			/*
> +			 * Nothing to send so wait for the next row
> +			 * and send a hearth beat if timeout exceeded.
> +			 */
> +			fiber_cond_wait_deadline(&writer->xrow_buf_cond,
> +						 last_row_time + timeout);
> +			if (ev_monotonic_now(loop()) - last_row_time >
> +			    timeout) {
> +				/*
> +				 * There were no new rows so we are going
> +				 * to send a heartbeat.
> +				 */
> +				struct xrow_header hearth_beat;
> +				xrow_encode_timestamp(&hearth_beat, instance_id,
> +						      ev_now(loop()));
> +				row = &hearth_beat;

20. Unfortunately, it is not safe to keep a pointer at an
object out of its scope. Heartbeath's memory may be reused
further.

> +			} else
> +				continue;
> +		}
> +		last_row_time = ev_monotonic_now(loop());
> +		/* Invoke handler callback. */
> +		if (on_wal_relay(row, cb_data) != 0) {
> +			diag_move(&fiber()->diag, &msg->diag);
> +			goto done;
> +		}
> +	}
> +	static struct cmsg_hop done_route[] = {
> +		{wal_relay_done, NULL}
> +	};
> +done:
> +	/* Signal wal relay was finished. */
> +	cmsg_init(&msg->base, done_route);
> +	cpipe_push(&msg->relay_pipe, &msg->base);
> +	msg->fiber = NULL;
> +	return 0;
> +}
> +
> +static void
> +wal_relay_attach(void *data)
> +{
> +	struct wal_relay *msg = (struct wal_relay *)data;
> +	msg->fiber = fiber_new("wal relay fiber", wal_relay_f);
> +	fiber_start(msg->fiber, msg);
> +}
> +
> +static void
> +wal_relay_cancel(struct cmsg *base)
> +{
> +	struct wal_relay *msg = container_of(base, struct wal_relay,
> +						 cancel_msg);
> +	if (msg->fiber != NULL)
> +		fiber_cancel(msg->fiber);
> +}
> +
> +int
> +wal_relay(struct wal_relay *wal_relay, struct vclock *vclock,
> +	  wal_relay_cb on_wal_relay, void *cb_data, const char *endpoint_name)
> +{
> +	wal_relay->vclock = vclock;
> +	wal_relay->on_wal_relay = on_wal_relay;
> +	wal_relay->cb_data = cb_data;
> +	diag_create(&wal_relay->diag);
> +	wal_relay->cancel_msg.route = NULL;
> +
> +	fiber_cond_create(&wal_relay->done_cond);
> +	wal_relay->done = false;
> +
> +	/* Establish a connection with wal thread. */
> +	cbus_pair("wal", endpoint_name, &wal_relay->wal_pipe,
> +		  &wal_relay->relay_pipe,
> +		  wal_relay_attach, wal_relay, cbus_process);
> +
> +	while (!wal_relay->done) {
> +		if (fiber_is_cancelled() &&
> +		    wal_relay->cancel_msg.route == NULL) {
> +			/* Send a cancel message to a wal relay fiber. */
> +			static struct cmsg_hop cancel_route[]= {
> +				{wal_relay_cancel, NULL}};
> +			cmsg_init(&wal_relay->cancel_msg, cancel_route);
> +			cpipe_push(&wal_relay->wal_pipe, &wal_relay->cancel_msg);
> +		}
> +		fiber_cond_wait(&wal_relay->done_cond);
> +	}
> +
> +	/* Disconnect from wal thread. */
> +	cbus_unpair(&wal_relay->wal_pipe, &wal_relay->relay_pipe,
> +		    NULL, NULL, cbus_process);
> +
> +	if (!diag_is_empty(&wal_relay->diag)) {
> +		diag_move(&wal_relay->diag, &fiber()->diag);
> +		return -1;
> +	}
> +	if (fiber_is_cancelled()) {
> +		diag_set(FiberIsCancelled);
> +		return -1;
> +	}
> +	return 0;
> +}
> diff --git a/src/box/wal.h b/src/box/wal.h
> index 6725f26d3..7446fb6da 100644
> --- a/src/box/wal.h
> +++ b/src/box/wal.h
> @@ -241,6 +241,42 @@ wal_write_vy_log(struct journal_entry *req);
>  void
>  wal_rotate_vy_log();
>  
> +typedef int (*wal_relay_cb)(struct xrow_header *header, void *data);
> +
> +/**
> + * Struct wal memory relay structure. */

21. Writing of good comments may be harder that writing
code, but it does not mean, that it should not be done.
Please, write a normal comment.

22. We write closing */ on a separate line. The only
exception is that if a whole comment is in one line,
including /**.

> +struct wal_relay {
> +	struct cmsg base;
> +	/* Cbus pipe to wal cord. */
> +	struct cpipe wal_pipe;
> +	/* Cbus pipe from wal cord. */
> +	struct cpipe relay_pipe;

23. What is a purpose of wal_pipe, what is a purpose
of relay_pipe, what messages do they deliver?
I think, it should written here in a comment.

> +
> +	/* Vclock to start relaying. */
> +	struct vclock *vclock;
> +	/* Callback to call for each relaying row. */
> +	wal_relay_cb on_wal_relay;

24. Why is that callback needed?
I think, it should written here in a comment.

> +	/* Data pointer to pass to callback. */
> +	void *cb_data;
> +	/* Relaying fiber. */
> +	struct fiber *fiber;
> +	/* Message to cancel relaying fiber. */
> +	struct cmsg cancel_msg;
> +	/* Fiber condition to wait until relaying was stopped. */
> +	struct fiber_cond done_cond;
> +	/* Turns to true when relaying was stopped. */
> +	bool done;

25. Then I guess, it is 'is_stopped', no? 'Done' word is
too common, it means nothing. What is 'done'? Also we use
'is_' prefix for flags (or another more sutable verb like
'has_', 'are_', 'was_', etc). 

> +	/* Return code. */
> +	int rc;
> +	/* Diagnostic area. */
> +	struct diag diag;
> +};
> +
> +

26. Please, keep one empty line between declarations.

> +int
> +wal_relay(struct wal_relay *wal_relay, struct vclock *vclock,
> +	  wal_relay_cb on_wal_relay, void *cb_data, const char *endpoint_name);

27. This function definitely is not a one-liner for which a
comment could be omitted. Please, write what this function does,
why, which non-trivial arguments does it take, etc.

> +
>  #if defined(__cplusplus)
>  } /* extern "C" */
>  #endif /* defined(__cplusplus) */
> diff --git a/src/lib/core/cbus.c b/src/lib/core/cbus.c
> index b3b1280e7..b7e6d769b 100644
> --- a/src/lib/core/cbus.c
> +++ b/src/lib/core/cbus.c
> @@ -284,6 +284,9 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
>  	/* Trigger task processing when the queue becomes non-empty. */
>  	bool output_was_empty;
>  
> +	int old_cancel_state;
> +	pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);

28. Why?

> +
>  	tt_pthread_mutex_lock(&endpoint->mutex);
>  	output_was_empty = stailq_empty(&endpoint->output);
>  	/** Flush input */
> @@ -297,6 +300,7 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
>  
>  		ev_async_send(endpoint->consumer, &endpoint->async);
>  	}
> +	pthread_setcancelstate(old_cancel_state, NULL);
>  }

29. I will review the tests later, when I will fully understand the
code. But so few tests for such a serious feature looks not enough.
Could you test it more rigorous?




More information about the Tarantool-patches mailing list