[tarantool-patches] [PATCH 5/7] Replication: in memory replication

Vladimir Davydov vdavydov.dev at gmail.com
Wed Aug 21 16:52:02 MSK 2019


On Tue, Aug 13, 2019 at 09:27:43AM +0300, Georgy Kirichenko wrote:
> diff --git a/src/box/gc.h b/src/box/gc.h
> index 9b38a0c06..f28b716b5 100644
> --- a/src/box/gc.h
> +++ b/src/box/gc.h
> @@ -125,6 +125,8 @@ struct gc_state {
>  	/** True if log is opened. */
>  	bool log_opened;
>  	/** Registered consumers, linked by gc_consumer::node. */
> +	/** xdir to track wal files. */
> +	struct xdir xdir;

AFAICS this xdir is only created and destroyed - it isn't used anywhere.

>  	gc_tree_t consumers;
>  	/** Fiber responsible for periodic checkpointing. */
>  	struct fiber *checkpoint_fiber;
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index a1b841291..05fc0f691 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -116,6 +114,9 @@ struct relay {
>  	/** Relay sync state. */
>  	enum relay_state state;
>  
> +	struct vclock relay_vclock;
> +	char *wal_dir;
> +

Comments, please. It would also be nice to update a comment to relay->r
to clarify when it can be NULL.

>  	struct {
>  		/* Align to prevent false-sharing with tx thread */
>  		alignas(CACHELINE_SIZE)
> @@ -434,6 +426,27 @@ relay_send_heartbeat(struct relay *relay)
>  	}
>  }
>  
> +static int
> +relay_send_cb(struct xrow_header *row, void *data)
> +{
> +	try {
> +		struct relay *relay = (struct relay *)data;
> +		relay_send_row(&relay->stream, row);
> +		return 0;
> +	} catch (Exception *e) {
> +		return -1;
> +	}
> +}
> +
> +static void
> +relay_endpoint_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
> +{
> +	(void) loop;
> +	(void) events;
> +	struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data;
> +	cbus_process(endpoint);
> +}

I guess we could define this helper somewhere in cbus internals, just
like fiber_schedule_cb.

> diff --git a/src/box/wal.c b/src/box/wal.c
> index 6cdb0db15..0457f3d46 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -1163,7 +1166,9 @@ wal_cord_f(va_list ap)
>  {
>  	(void) ap;
>  	struct wal_writer *writer = &wal_writer_singleton;
> +	fiber_cond_create(&writer->wal_mem_cond);
>  	wal_mem_create(&writer->wal_mem);
> +	wal_mem_svp(&writer->wal_mem, &writer->vclock);

Looks liks this belongs to the previous patch.

>  
>  	/** Initialize eio in this thread */
>  	coio_enable();
> @@ -1494,3 +1500,149 @@ wal_atfork()
>  	if (xlog_is_open(&vy_log_writer.xlog))
>  		xlog_atfork(&vy_log_writer.xlog);
>  }
> +
> +struct wal_relay_msg {
> +	struct cmsg base;
> +	struct cpipe wal_pipe;
> +	struct cpipe relay_pipe;
> +
> +	struct vclock *vclock;
> +	wal_relay_cb on_wal_relay;
> +	void *cb_data;
> +	struct fiber *fiber;
> +	struct cmsg cancel_msg;
> +	struct fiber_cond done_cond;
> +	bool done;
> +	int rc;
> +	struct diag diag;
> +};

Comments, comments...

> +
> +static void
> +wal_relay_done(struct cmsg *base)
> +{
> +	struct wal_relay_msg *msg =
> +		container_of(base, struct wal_relay_msg, base);
> +	msg->done = true;
> +	fiber_cond_signal(&msg->done_cond);
> +}
> +
> +static int
> +wal_relay_f(va_list ap)
> +{
> +	struct wal_writer *writer = &wal_writer_singleton;
> +	struct wal_relay_msg *msg = va_arg(ap, struct wal_relay_msg *);
> +	struct vclock *vclock = msg->vclock;
> +	wal_relay_cb on_wal_relay = msg->on_wal_relay;
> +	void *cb_data = msg->cb_data;
> +
> +	double last_row_time = ev_monotonic_now(loop());
> +
> +	struct wal_mem_cursor cursor;
> +	if (wal_mem_cursor_create(&writer->wal_mem, &cursor, vclock) != 0)
> +		goto done;
> +	while (!fiber_is_cancelled()) {
> +		struct xrow_header *row;
> +		void *data;
> +		size_t size;
> +		int rc = wal_mem_cursor_next(&writer->wal_mem, &cursor,
> +					     &row, &data, &size);
> +		if (rc < 0) {
> +			/* Outdated cursor. */
> +			break;
> +		}
> +		if (rc == 0 && vclock_get(vclock, row->replica_id) >= row->lsn)
> +			continue;
> +		if (rc > 0) {
> +			double timeout = replication_timeout;
> +			struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> +						    ERRINJ_DOUBLE);
> +			if (inj != NULL && inj->dparam != 0)
> +				timeout = inj->dparam;
> +
> +			/*
> +			 * Nothing to send so wait for the next row
> +			 * and send a hearth beat if timeout exceeded.
> +			 */
> +			fiber_cond_wait_deadline(&writer->wal_mem_cond,
> +						 last_row_time + timeout);
> +			if (fiber_is_cancelled())
> +				break;
> +			if (ev_monotonic_now(loop()) - last_row_time >
> +			    timeout) {
> +				struct xrow_header hearth_beat;
> +				xrow_encode_timestamp(&hearth_beat, instance_id,

s/hearth/heart

> +						      ev_now(loop()));
> +				row = &hearth_beat;
> +			} else
> +				continue;
> +		}
> +		last_row_time = ev_monotonic_now(loop());
> +		if (on_wal_relay(row, cb_data) != 0) {
> +			diag_move(&fiber()->diag, &msg->diag);
> +			break;
> +		}
> +	}
> +	static struct cmsg_hop done_route[] = {
> +		{wal_relay_done, NULL}
> +	};
> +done:
> +	cmsg_init(&msg->base, done_route);
> +	cpipe_push(&msg->relay_pipe, &msg->base);
> +	msg->fiber = NULL;
> +	return 0;
> +}
> diff --git a/src/box/wal.h b/src/box/wal.h
> index 1a7156d97..bd298cebe 100644
> --- a/src/box/wal.h
> +++ b/src/box/wal.h
> @@ -235,6 +235,12 @@ wal_write_vy_log(struct journal_entry *req);
>  void
>  wal_rotate_vy_log();
>  
> +typedef int (*wal_relay_cb)(struct xrow_header *header, void *data);
> +
> +int
> +wal_relay(struct vclock *vclock, wal_relay_cb on_wal_relay, void *cb_data,
> +	  const char *endpoint_name);

Comments, comments...

> +
>  #if defined(__cplusplus)
>  } /* extern "C" */
>  #endif /* defined(__cplusplus) */
> diff --git a/src/box/wal_mem.c b/src/box/wal_mem.c
> index fdfc6f93d..b01604d55 100644
> --- a/src/box/wal_mem.c
> +++ b/src/box/wal_mem.c
> @@ -247,14 +247,13 @@ wal_mem_cursor_next(struct wal_mem *wal_mem,
>  	}
>  
>  	struct wal_mem_buf *mem_buf;
> -	size_t last_row_index;
>  
>  next_buffer:
>  	mem_buf = wal_mem->buf +
> -		  wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
> -	last_row_index = ibuf_used(&mem_buf->rows) /
> -			 sizeof(struct wal_mem_buf_row);
> -	if (last_row_index == wal_mem_cursor->row_index) {
> +		  wal_mem_cursor->buf_index % WAL_MEM_BUF_COUNT;
> +	size_t buf_row_count = ibuf_used(&mem_buf->rows) /
> +			       sizeof(struct wal_mem_buf_row);
> +	if (buf_row_count == wal_mem_cursor->row_index) {
>  		/* No more rows in the current buffer. */
>  		if (wal_mem->last_buf_index == wal_mem_cursor->buf_index)
>  			/* No more rows in the memory. */
> @@ -269,5 +268,6 @@ next_buffer:
>  	*row = &buf_row->xrow;
>  	*data = buf_row->data;
>  	*size = buf_row->size;
> +	++wal_mem_cursor->row_index;
>  	return 0;
>  }

Again, this seems to belong to the previous patch.



More information about the Tarantool-patches mailing list