[tarantool-patches] [PATCH] Relay logs from wal thread

Georgy Kirichenko georgy at tarantool.org
Tue Jan 22 17:18:36 MSK 2019


Use wal thread to relay rows. Relay writer and reader fibers live
in a wal thread. Writer fiber uses wal memory to send rows to the
peer when it possible or spawns cords to recover rows from a file.

Wal writer stores rows into two memory buffers swapping by buffer
threshold. Memory buffers are splitted into chunks with the
same server id issued by chunk threshold. In order to search
position in wal memory all chunks indexed by wal_mem_index
array. Each wal_mem_index contains corresponding replica_id and
vclock just before first row in the chunk as well as chunk buffer
number, position and size of the chunk in memory.

Closes: #3794

Issue: https://github.com/tarantool/tarantool/issues/3794
Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-3794--relay-from-wal
---
 src/box/relay.cc                              | 340 +-------
 src/box/relay.h                               |   7 +
 src/box/wal.c                                 | 819 +++++++++++++++++-
 src/box/wal.h                                 |   7 +
 src/box/xrow_io.cc                            |  13 +
 src/box/xrow_io.h                             |   4 +
 src/errinj.h                                  |   1 +
 test/box/errinj.result                        |   2 +
 test/replication/force_recovery.result        |  11 +
 test/replication/force_recovery.test.lua      |   3 +
 test/replication/replica_rejoin.result        |   7 +
 test/replication/replica_rejoin.test.lua      |   2 +
 .../show_error_on_disconnect.result           |   7 +
 .../show_error_on_disconnect.test.lua         |   2 +
 test/vinyl/layout.result                      |  10 -
 test/xlog/panic_on_wal_error.result           |  12 +
 test/xlog/panic_on_wal_error.test.lua         |   4 +
 17 files changed, 919 insertions(+), 332 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2bb260e06..9a4e63405 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -73,23 +73,6 @@ struct relay_status_msg {
 	struct vclock vclock;
 };
 
-/**
- * Cbus message to update replica gc state in tx thread.
- */
-struct relay_gc_msg {
-	/** Parent */
-	struct cmsg msg;
-	/**
-	 * Link in the list of pending gc messages,
-	 * see relay::pending_gc.
-	 */
-	struct stailq_entry in_pending;
-	/** Relay instance */
-	struct relay *relay;
-	/** Vclock to advance to */
-	struct vclock vclock;
-};
-
 /** State of a replication relay. */
 struct relay {
 	/** The thread in which we relay data to the replica. */
@@ -121,20 +104,9 @@ struct relay {
 	 * dataset on the other side and send missing data rows if any.
 	 */
 	struct vclock local_vclock_at_subscribe;
+	/** Cached wal_dir cfg option. */
+	const char *wal_dir;
 
-	/** Relay endpoint */
-	struct cbus_endpoint endpoint;
-	/** A pipe from 'relay' thread to 'tx' */
-	struct cpipe tx_pipe;
-	/** A pipe from 'tx' thread to 'relay' */
-	struct cpipe relay_pipe;
-	/** Status message */
-	struct relay_status_msg status_msg;
-	/**
-	 * List of garbage collection messages awaiting
-	 * confirmation from the replica.
-	 */
-	struct stailq pending_gc;
 	/** Time when last row was sent to peer. */
 	double last_row_tm;
 	/** Relay sync state. */
@@ -187,7 +159,6 @@ relay_new(struct replica *replica)
 	relay->replica = replica;
 	fiber_cond_create(&relay->reader_cond);
 	diag_create(&relay->diag);
-	stailq_create(&relay->pending_gc);
 	relay->state = RELAY_OFF;
 	return relay;
 }
@@ -235,19 +206,14 @@ relay_exit(struct relay *relay)
 	 * cursor, which must be closed in the same thread
 	 * that opened it (it uses cord's slab allocator).
 	 */
-	recovery_delete(relay->r);
+	if (relay->r)
+		recovery_delete(relay->r);
 	relay->r = NULL;
 }
 
 static void
 relay_stop(struct relay *relay)
 {
-	struct relay_gc_msg *gc_msg, *next_gc_msg;
-	stailq_foreach_entry_safe(gc_msg, next_gc_msg,
-				  &relay->pending_gc, in_pending) {
-		free(gc_msg);
-	}
-	stailq_create(&relay->pending_gc);
 	if (relay->r != NULL)
 		recovery_delete(relay->r);
 	relay->r = NULL;
@@ -372,286 +338,33 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
  * The message which updated tx thread with a new vclock has returned back
  * to the relay.
  */
-static void
-relay_status_update(struct cmsg *msg)
-{
-	msg->route = NULL;
-}
-
-/**
- * Deliver a fresh relay vclock to tx thread.
- */
-static void
-tx_status_update(struct cmsg *msg)
-{
-	struct relay_status_msg *status = (struct relay_status_msg *)msg;
-	vclock_copy(&status->relay->tx.vclock, &status->vclock);
-	static const struct cmsg_hop route[] = {
-		{relay_status_update, NULL}
-	};
-	cmsg_init(msg, route);
-	cpipe_push(&status->relay->relay_pipe, msg);
-}
-
-/**
- * Update replica gc state in tx thread.
- */
-static void
-tx_gc_advance(struct cmsg *msg)
-{
-	struct relay_gc_msg *m = (struct relay_gc_msg *)msg;
-	gc_consumer_advance(m->relay->replica->gc, &m->vclock);
-	free(m);
-}
-
-static void
-relay_on_close_log_f(struct trigger *trigger, void * /* event */)
-{
-	static const struct cmsg_hop route[] = {
-		{tx_gc_advance, NULL}
-	};
-	struct relay *relay = (struct relay *)trigger->data;
-	struct relay_gc_msg *m = (struct relay_gc_msg *)malloc(sizeof(*m));
-	if (m == NULL) {
-		say_warn("failed to allocate relay gc message");
-		return;
-	}
-	cmsg_init(&m->msg, route);
-	m->relay = relay;
-	vclock_copy(&m->vclock, &relay->r->vclock);
-	/*
-	 * Do not invoke garbage collection until the replica
-	 * confirms that it has received data stored in the
-	 * sent xlog.
-	 */
-	stailq_add_tail_entry(&relay->pending_gc, m, in_pending);
-}
-
-/**
- * Invoke pending garbage collection requests.
- *
- * This function schedules the most recent gc message whose
- * vclock is less than or equal to the given one. Older
- * messages are discarded as their job will be done by the
- * scheduled message anyway.
- */
-static inline void
-relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
-{
-	struct relay_gc_msg *curr, *next, *gc_msg = NULL;
-	stailq_foreach_entry_safe(curr, next, &relay->pending_gc, in_pending) {
-		if (vclock_sum(&curr->vclock) > vclock_sum(vclock))
-			break;
-		stailq_shift(&relay->pending_gc);
-		free(gc_msg);
-		gc_msg = curr;
-	}
-	if (gc_msg != NULL)
-		cpipe_push(&relay->tx_pipe, &gc_msg->msg);
-}
-
-static void
-relay_set_error(struct relay *relay, struct error *e)
-{
-	/* Don't override existing error. */
-	if (diag_is_empty(&relay->diag))
-		diag_add_error(&relay->diag, e);
-}
-
-static void
-relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
-{
-	struct relay *relay = container_of(watcher, struct relay, wal_watcher);
-	if (fiber_is_cancelled()) {
-		/*
-		 * The relay is exiting. Rescanning the WAL at this
-		 * point would be pointless and even dangerous,
-		 * because the relay could have written a packet
-		 * fragment to the socket before being cancelled
-		 * so that writing another row to the socket would
-		 * lead to corrupted replication stream and, as
-		 * a result, permanent replication breakdown.
-		 */
-		return;
-	}
-	try {
-		recover_remaining_wals(relay->r, &relay->stream, NULL,
-				       (events & WAL_EVENT_ROTATE) != 0);
-		relay_flush(relay);
-	} catch (Exception *e) {
-		relay_set_error(relay, e);
-		fiber_cancel(fiber());
-	}
-}
-
-/*
- * Relay reader fiber function.
- * Read xrow encoded vclocks sent by the replica.
- */
 int
-relay_reader_f(va_list ap)
+relay_status_update(struct replica *replica, struct vclock *vclock)
 {
-	struct relay *relay = va_arg(ap, struct relay *);
-	struct fiber *relay_f = va_arg(ap, struct fiber *);
-
-	struct ibuf ibuf;
-	struct ev_io io;
-	coio_create(&io, relay->io.fd);
-	ibuf_create(&ibuf, &cord()->slabc, 1024);
-	try {
-		while (!fiber_is_cancelled()) {
-			struct xrow_header xrow;
-			coio_read_xrow_timeout_xc(&io, &ibuf, &xrow,
-					replication_disconnect_timeout());
-			/* vclock is followed while decoding, zeroing it. */
-			vclock_create(&relay->recv_vclock);
-			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
-			fiber_cond_signal(&relay->reader_cond);
-		}
-	} catch (Exception *e) {
-		relay_set_error(relay, e);
-		fiber_cancel(relay_f);
-	}
-	ibuf_destroy(&ibuf);
+	struct relay *relay = replica->relay;
+	vclock_copy(&relay->tx.vclock, vclock);
+	if (vclock_compare(vclock, &replica->gc->vclock) == 1)
+		gc_consumer_advance(replica->gc, vclock);
 	return 0;
 }
 
-/**
- * Send a heartbeat message over a connected relay.
- */
-static void
-relay_send_heartbeat(struct relay *relay)
-{
-	struct xrow_header row;
-	xrow_encode_timestamp(&row, instance_id, ev_now(loop()));
-	try {
-		relay_send(relay, &row);
-	} catch (Exception *e) {
-		relay_set_error(relay, e);
-		fiber_cancel(fiber());
-	}
-}
-
-/**
- * A libev callback invoked when a relay client socket is ready
- * for read. This currently only happens when the client closes
- * its socket, and we get an EOF.
- */
-static int
-relay_subscribe_f(va_list ap)
+int
+relay_recover_wals(struct replica *replica, struct recovery *recovery)
 {
-	struct relay *relay = va_arg(ap, struct relay *);
-	struct recovery *r = relay->r;
+	struct relay *relay = replica->relay;
 	ibuf_create(&relay->send_buf, &cord()->slabc,
 		    2 * RELAY_BUFFER_SEND_THRESHOLD);
 
-	coio_enable();
-	relay_set_cord_name(relay->io.fd);
-
-	/* Create cpipe to tx for propagating vclock. */
-	cbus_endpoint_create(&relay->endpoint, tt_sprintf("relay_%p", relay),
-			     fiber_schedule_cb, fiber());
-	cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
-		  &relay->relay_pipe, NULL, NULL, cbus_process);
-
-	/* Setup garbage collection trigger. */
-	struct trigger on_close_log = {
-		RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL
-	};
-	trigger_add(&r->on_close_log, &on_close_log);
-
-	/* Setup WAL watcher for sending new rows to the replica. */
-	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
-			relay_process_wal_event, cbus_process);
-
-	/* Start fiber for receiving replica acks. */
-	char name[FIBER_NAME_MAX];
-	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
-	struct fiber *reader = fiber_new_xc(name, relay_reader_f);
-	fiber_set_joinable(reader, true);
-	fiber_start(reader, relay, fiber());
-
-	/*
-	 * If the replica happens to be up to date on subscribe,
-	 * don't wait for timeout to happen - send a heartbeat
-	 * message right away to update the replication lag as
-	 * soon as possible.
-	 */
-	relay_send_heartbeat(relay);
-
-	/*
-	 * Run the event loop until the connection is broken
-	 * or an error occurs.
-	 */
-	while (!fiber_is_cancelled()) {
-		double timeout = replication_timeout;
-		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
-					    ERRINJ_DOUBLE);
-		if (inj != NULL && inj->dparam != 0)
-			timeout = inj->dparam;
-
-		fiber_cond_wait_deadline(&relay->reader_cond,
-					 relay->last_row_tm + timeout);
-
-		/*
-		 * The fiber can be woken by IO cancel, by a timeout of
-		 * status messaging or by an acknowledge to status message.
-		 * Handle cbus messages first.
-		 */
-		cbus_process(&relay->endpoint);
-		/* Check for a heartbeat timeout. */
-		if (ev_monotonic_now(loop()) - relay->last_row_tm > timeout)
-			relay_send_heartbeat(relay);
-		/*
-		 * Check that the vclock has been updated and the previous
-		 * status message is delivered
-		 */
-		if (relay->status_msg.msg.route != NULL)
-			continue;
-		struct vclock *send_vclock;
-		if (relay->version_id < version_id(1, 7, 4))
-			send_vclock = &r->vclock;
-		else
-			send_vclock = &relay->recv_vclock;
-		if (vclock_sum(&relay->status_msg.vclock) ==
-		    vclock_sum(send_vclock))
-			continue;
-		static const struct cmsg_hop route[] = {
-			{tx_status_update, NULL}
-		};
-		cmsg_init(&relay->status_msg.msg, route);
-		vclock_copy(&relay->status_msg.vclock, send_vclock);
-		relay->status_msg.relay = relay;
-		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
-		/* Collect xlog files received by the replica. */
-		relay_schedule_pending_gc(relay, send_vclock);
+	int res = 0;
+	try {
+		recover_remaining_wals(recovery, &relay->stream, NULL, true);
+		relay_flush(relay);
+	} catch (Exception &ex) {
+		res = -1;
 	}
 
-	/*
-	 * Log the error that caused the relay to break the loop.
-	 * Don't clear the error for status reporting.
-	 */
-	assert(!diag_is_empty(&relay->diag));
-	diag_add_error(diag_get(), diag_last_error(&relay->diag));
-	diag_log();
-	say_crit("exiting the relay loop");
-
-	/* Clear garbage collector trigger and WAL watcher. */
-	trigger_clear(&on_close_log);
-	wal_clear_watcher(&relay->wal_watcher, cbus_process);
-
-	/* Join ack reader fiber. */
-	fiber_cancel(reader);
-	fiber_join(reader);
-
-	/* Destroy cpipe to tx. */
-	cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
-		    NULL, NULL, cbus_process);
-	cbus_endpoint_destroy(&relay->endpoint, cbus_process);
-
-	relay_exit(relay);
 	ibuf_destroy(&relay->send_buf);
-	return -1;
+	return res;
 }
 
 /** Replication acceptor fiber handler. */
@@ -676,21 +389,22 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 
 	relay_start(relay, fd, sync, relay_send_row);
 	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
-	relay->r = recovery_new(cfg_gets("wal_dir"), false,
-			        replica_clock);
+	relay->wal_dir = cfg_gets("wal_dir");
+	relay->r = NULL;
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
 
-	int rc = cord_costart(&relay->cord, "subscribe",
-			      relay_subscribe_f, relay);
-	if (rc == 0)
-		rc = cord_cojoin(&relay->cord);
+	wal_relay(replica, replica_clock, fd);
 
+	relay_exit(relay);
 	relay_stop(relay);
 	replica_on_relay_stop(replica);
 
-	if (rc != 0)
+	if (!diag_is_empty(&fiber()->diag)) {
+		if (diag_is_empty(&relay->diag))
+			diag_add_error(&relay->diag, diag_last_error(&fiber()->diag));
 		diag_raise();
+	}
 }
 
 static void
diff --git a/src/box/relay.h b/src/box/relay.h
index c0848899d..c2d97f5e6 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -41,6 +41,7 @@ struct relay;
 struct replica;
 struct tt_uuid;
 struct vclock;
+struct recovery;
 
 enum relay_state {
 	/**
@@ -85,6 +86,12 @@ relay_get_state(const struct relay *relay);
 const struct vclock *
 relay_vclock(const struct relay *relay);
 
+int
+relay_status_update(struct replica *replica, struct vclock *vclock);
+
+int
+relay_recover_wals(struct replica *replica, struct recovery *recovery);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/wal.c b/src/box/wal.c
index 17ead08e7..6c8fcd08a 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -36,13 +36,19 @@
 #include "errinj.h"
 #include "error.h"
 #include "exception.h"
+#include "sio.h"
+#include "coio.h"
 
 #include "xlog.h"
 #include "xrow.h"
+#include "xrow_io.h"
 #include "vy_log.h"
 #include "cbus.h"
 #include "coio_task.h"
 #include "replication.h"
+#include "recovery.h"
+#include "relay.h"
+#include "iproto_constants.h"
 
 enum {
 	/**
@@ -54,6 +60,10 @@ enum {
 	 * latency. 1 MB seems to be a well balanced choice.
 	 */
 	WAL_FALLOCATE_LEN = 1024 * 1024,
+	/** Wal memory threshold. */
+	WAL_MEMORY_THRESHOLD = 4 * 1024 * 1024,
+	/** Wal memory chunk threshold. */
+	WAL_MEM_CHUNK_THRESHOLD = 16 * 1024,
 };
 
 const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
@@ -77,6 +87,7 @@ struct wal_thread {
 	 * priority pipe and DOES NOT support yield.
 	 */
 	struct cpipe tx_prio_pipe;
+	struct rlist relay;
 };
 
 /*
@@ -158,6 +169,26 @@ struct wal_writer
 	 * Used for replication relays.
 	 */
 	struct rlist watchers;
+	/**
+	 * Wal memory buffer routines.
+	 * Writer stores rows into two memory buffers swapping by buffer
+	 * threshold. Memory buffers are splitted into chunks with the
+	 * same server id issued by chunk threshold. In order to search
+	 * position in wal memory all chunks indexed by wal_mem_index array.
+	 * Each wal_mem_index contains corresponding replica_id and vclock
+	 * just before first row in the chunk as well as chunk buffer number,
+	 * position and size of the chunk in memory. When buffer should be
+	 * swapped then all buffers chunks discarded and wal_mem_discard_count_count
+	 * increases in order to adjust relays position.
+	 */
+	/** Rows buffers. */
+	struct ibuf wal_mem[2];
+	/** Index buffer. */
+	struct ibuf wal_mem_index;
+	/** Count of discarded mem chunks. */
+	uint64_t wal_mem_discard_count;
+	/** Condition to signal if there are new rows. */
+	struct fiber_cond memory_cond;
 };
 
 struct wal_msg {
@@ -175,6 +206,66 @@ struct wal_msg {
 	struct vclock vclock;
 };
 
+/**
+ * Wal memory chunk index.
+ */
+struct wal_mem_index {
+	/** replica id. */
+	uint32_t replica_id;
+	/** vclock just before first row in the chunk. */
+	struct vclock vclock;
+	/** Buffer number. */
+	uint8_t buf_no;
+	/** Chunk starting offset. */
+	uint64_t pos;
+	/** Chunk size. */
+	uint64_t size;
+};
+
+/**
+ * Wal memory position checkpoint.
+ */
+struct wal_mem_checkpoint {
+	/** Chunks count. */
+	uint32_t count;
+	/** Chunk size. */
+	uint32_t size;
+};
+
+/** Current relaying position. */
+struct wal_relay_mem_pos {
+	uint64_t wal_mem_discard_count;
+	uint32_t chunk_index;
+	uint32_t offset;
+};
+
+/**
+ * Wal relay structure.
+ */
+struct wal_relay {
+	struct rlist item;
+	/** Writer. */
+	struct wal_writer *writer;
+	/** Sent vclock. */
+	struct vclock vclock;
+	/** Vclock when subscribed. */
+	struct vclock vclock_at_subscribe;
+	/** Socket to send data. */
+	int fd;
+	/** Peer replica. */
+	struct replica *replica;
+	/** Cord to recover and relay data from files. */
+	struct cord cord;
+	/** A diagnostic area. */
+	struct diag diag;
+	/** Current position. */
+	struct wal_relay_mem_pos pos;
+	/** Relay writer fiber. */
+	struct fiber *writer_fiber;
+	/** Relay reader fiber. */
+	struct fiber *reader_fiber;
+};
+
 /**
  * Vinyl metadata log writer.
  */
@@ -230,7 +321,6 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
 	xlog_tx_begin(l);
 	struct xrow_header **row = entry->rows;
 	for (; row < entry->rows + entry->n_rows; row++) {
-		(*row)->tm = ev_now(loop());
 		struct errinj *inj = errinj(ERRINJ_WAL_BREAK_LSN, ERRINJ_INT);
 		if (inj != NULL && inj->iparam == (*row)->lsn) {
 			(*row)->lsn = inj->iparam - 1;
@@ -339,6 +429,43 @@ tx_notify_checkpoint(struct cmsg *msg)
 	free(msg);
 }
 
+/**
+ * A message to initialize a writer in a wal thread.
+ */
+struct wal_writer_create_msg {
+	struct cbus_call_msg base;
+	struct wal_writer *writer;
+};
+
+/**
+ * Writer initialization to do in a wal thread.
+ */
+static int
+wal_writer_create_wal(struct cbus_call_msg *base)
+{
+	struct wal_writer_create_msg *msg =
+		container_of(base, struct wal_writer_create_msg, base);
+	struct wal_writer *writer= msg->writer;
+	ibuf_create(&writer->wal_mem[0], &cord()->slabc, 65536);
+	ibuf_create(&writer->wal_mem[1], &cord()->slabc, 65536);
+	ibuf_create(&writer->wal_mem_index, &cord()->slabc, 8192);
+	struct wal_mem_index *index;
+	index = ibuf_alloc(&writer->wal_mem_index, sizeof(struct wal_mem_index));
+	if (index == NULL) {
+		/* Could not initialize wal writer. */
+		panic("Could not create wal");
+		unreachable();
+	}
+	writer->wal_mem_discard_count = 0;
+	vclock_copy(&index->vclock, &writer->vclock);
+	index->pos = 0;
+	index->size = 0;
+	index->buf_no = 0;
+	fiber_cond_create(&writer->memory_cond);
+
+	return 0;
+}
+
 /**
  * Initialize WAL writer context. Even though it's a singleton,
  * encapsulate the details just in case we may use
@@ -377,6 +504,11 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 
 	writer->on_garbage_collection = on_garbage_collection;
 	writer->on_checkpoint_threshold = on_checkpoint_threshold;
+	struct wal_writer_create_msg msg;
+	msg.writer = writer;
+	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+			   &msg.base, wal_writer_create_wal, NULL,
+			   TIMEOUT_INFINITY);
 }
 
 /** Destroy a WAL writer structure. */
@@ -920,9 +1052,9 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
 	}
 	if ((*begin)->replica_id != instance_id) {
 		/*
-		 * Move all local changes to the end of rows array and
+		 * Move all local changes to the end of rows array to form
 		 * a fake local transaction (like an autonomous transaction)
-		 * because we could not replicate the transaction back.
+		 * in order to be able to replicate local changes back.
 		 */
 		struct xrow_header **row = end - 1;
 		while (row >= begin) {
@@ -943,7 +1075,7 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
 		while (begin < end && begin[0]->replica_id != instance_id)
 			++begin;
 	}
-	/* Setup txn_id and tnx_replica_id for localy generated rows. */
+	/* Setup txn_id and tnx_replica_id for locally generated rows. */
 	row = begin;
 	while (row < end) {
 		row[0]->txn_id = begin[0]->lsn;
@@ -953,6 +1085,186 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
 	}
 }
 
+static inline struct wal_mem_index *
+wal_mem_index_first(struct wal_writer *writer)
+{
+	return (struct wal_mem_index *)writer->wal_mem_index.rpos;
+}
+
+static inline struct wal_mem_index *
+wal_mem_index_last(struct wal_writer *writer)
+{
+	return (struct wal_mem_index *)writer->wal_mem_index.wpos - 1;
+}
+
+static inline struct wal_mem_index *
+wal_mem_index_new(struct wal_writer *writer, int buf_no, struct vclock *vclock)
+{
+	struct wal_mem_index *last = wal_mem_index_last(writer);
+	if (last->size == 0)
+		return last;
+	last = (struct wal_mem_index *)ibuf_alloc(&writer->wal_mem_index,
+						  sizeof(struct wal_mem_index));
+	if (last == NULL) {
+		diag_set(OutOfMemory, sizeof(struct wal_mem_index),
+			 "region", "struct wal_mem_index");
+		return NULL;
+	}
+	last->buf_no = buf_no;
+	last->size = 0;
+	last->pos = ibuf_used(writer->wal_mem + buf_no);
+	vclock_copy(&last->vclock, vclock);
+	return last;
+}
+
+/** Save current memory position. */
+static inline void
+wal_mem_get_checkpoint(struct wal_writer *writer,
+		       struct wal_mem_checkpoint *mem_checkpoint)
+{
+	mem_checkpoint->count = wal_mem_index_last(writer) -
+			      wal_mem_index_first(writer);
+	mem_checkpoint->size = wal_mem_index_last(writer)->size;
+}
+
+/** Restore memory position. */
+static inline void
+wal_mem_set_checkpoint(struct wal_writer *writer,
+		       struct wal_mem_checkpoint *mem_checkpoint)
+{
+	struct wal_mem_index *index = wal_mem_index_first(writer) +
+				      mem_checkpoint->count;
+	assert(index->buf_no == wal_mem_index_last(writer)->buf_no);
+	index->size = mem_checkpoint->size;
+	/* Truncate buffers. */
+	writer->wal_mem_index.wpos = (char *)(index + 1);
+	struct ibuf *buf = writer->wal_mem + index->buf_no;
+	buf->wpos = buf->buf + index->pos + index->size;
+}
+
+/**
+ * Prepare wal memory to accept new rows and rotate buffer if it needs.
+ */
+static int
+wal_mem_prepare(struct wal_writer *writer)
+{
+	struct wal_mem_index *last = wal_mem_index_last(writer);
+	uint8_t buf_no = last->buf_no;
+	if (ibuf_used(&writer->wal_mem[buf_no]) > WAL_MEMORY_THRESHOLD) {
+		/* We are going to rotate buffers. */
+		struct wal_mem_index *first = wal_mem_index_first(writer);
+		while (first->buf_no == 1 - buf_no)
+			++first;
+		/* Discard all indexes on buffer to clear. */
+		writer->wal_mem_discard_count += first - wal_mem_index_first(writer);
+		writer->wal_mem_index.rpos = (char *)first;
+
+		buf_no = 1 - buf_no;
+		ibuf_reset(&writer->wal_mem[buf_no]);
+
+		last = wal_mem_index_new(writer, buf_no, &writer->vclock);
+		if (last == NULL)
+			return -1;
+	}
+	return 0;
+}
+
+/** Get a chunk to store rows data. */
+static struct wal_mem_index *
+wal_get_chunk(struct wal_writer *writer, uint32_t replica_id,
+	      struct vclock *vclock)
+{
+	struct wal_mem_index *last = wal_mem_index_last(writer);
+	int buf_no = last->buf_no;
+
+	if (last->size == 0)
+		last->replica_id = replica_id;
+
+	if (last->size > WAL_MEM_CHUNK_THRESHOLD ||
+	    last->replica_id != replica_id) {
+		/* Open new chunk. */
+		last = wal_mem_index_new(writer, buf_no, vclock);
+		if (last == NULL)
+			return NULL;
+		last->replica_id = replica_id;
+	}
+	return last;
+}
+
+/**
+ * Encode an entry into a wal memory buffer.
+ */
+static ssize_t
+wal_encode_entry(struct wal_writer *writer, struct journal_entry *entry,
+		 struct vclock *vclock)
+{
+	double tm = ev_now(loop());
+	struct xrow_header nop_row;
+	nop_row.type = IPROTO_NOP;
+	nop_row.group_id = GROUP_DEFAULT;
+	nop_row.bodycnt = 0;
+	nop_row.tm = tm;
+
+	struct wal_mem_index *last = wal_mem_index_last(writer);
+	int buf_no = last->buf_no;
+	last = NULL;
+	struct ibuf *mem_buf = writer->wal_mem + buf_no;
+
+	/* vclock to track encoding row. */
+	struct vclock chunk_vclock;
+	vclock_copy(&chunk_vclock, vclock);
+
+	wal_assign_lsn(vclock, entry->rows, entry->rows + entry->n_rows);
+	entry->res = vclock_sum(vclock);
+
+	struct xrow_header **row = entry->rows;
+	while (row < entry->rows + entry->n_rows) {
+		(*row)->tm = tm;
+		struct iovec iov[XROW_IOVMAX];
+		if ((*row)->group_id == GROUP_LOCAL) {
+			nop_row.replica_id = (*row)->replica_id;
+			nop_row.lsn = (*row)->lsn;
+			nop_row.txn_id = (*row)->txn_id;
+			nop_row.txn_replica_id = (*row)->txn_replica_id;
+		}
+		struct xrow_header *send_row = (*row)->group_id != GROUP_LOCAL ?
+					        *row : &nop_row;
+		if (last == NULL) {
+			/* Do not allow split transactions into chunks. */
+			last = wal_get_chunk(writer, send_row->replica_id,
+					     &chunk_vclock);
+		}
+		if (last == NULL)
+			goto error;
+		vclock_follow_xrow(&chunk_vclock, send_row);
+
+		int iovcnt = xrow_to_iovec(send_row, iov);
+		if (iovcnt < 0)
+			goto error;
+		uint64_t xrow_size = 0;
+		for (int i = 0; i < iovcnt; ++i)
+			xrow_size += iov[i].iov_len;
+		if (ibuf_reserve(mem_buf, xrow_size) == NULL) {
+			diag_set(OutOfMemory, xrow_size,
+				 "region", "entry memory");
+			goto error;
+		}
+
+		for (int i = 0; i < iovcnt; ++i) {
+			memcpy(ibuf_alloc(mem_buf, iov[i].iov_len),
+			       iov[i].iov_base, iov[i].iov_len);
+			last->size += iov[i].iov_len;
+		}
+		if (row[0]->txn_last == 1)
+			last = NULL;
+		++row;
+	}
+	return 0;
+
+error:
+	return -1;
+}
+
 static void
 wal_write_to_disk(struct cmsg *msg)
 {
@@ -1016,22 +1328,32 @@ wal_write_to_disk(struct cmsg *msg)
 	int rc;
 	struct journal_entry *entry;
 	struct stailq_entry *last_committed = NULL;
+	if (wal_mem_prepare(writer) != 0)
+		goto done;
+	struct wal_mem_checkpoint mem_checkpoint;
+	wal_mem_get_checkpoint(writer, &mem_checkpoint);
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
-		wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
-		entry->res = vclock_sum(&vclock);
+		rc = wal_encode_entry(writer, entry, &vclock);
+		if (rc != 0)
+			goto done;
 		rc = xlog_write_entry(l, entry);
-		if (rc < 0)
+		if (rc < 0) {
+			wal_mem_set_checkpoint(writer, &mem_checkpoint);
 			goto done;
+		}
 		if (rc > 0) {
 			writer->checkpoint_wal_size += rc;
 			last_committed = &entry->fifo;
 			vclock_copy(&writer->vclock, &vclock);
+			wal_mem_get_checkpoint(writer, &mem_checkpoint);
 		}
 		/* rc == 0: the write is buffered in xlog_tx */
 	}
 	rc = xlog_flush(l);
-	if (rc < 0)
+	if (rc < 0) {
+		wal_mem_set_checkpoint(writer, &mem_checkpoint);
 		goto done;
+	}
 
 	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
@@ -1087,6 +1409,7 @@ done:
 		wal_writer_begin_rollback(writer);
 	}
 	fiber_gc();
+	fiber_cond_broadcast(&writer->memory_cond);
 	wal_notify_watchers(writer, WAL_EVENT_WRITE);
 }
 
@@ -1095,6 +1418,8 @@ static int
 wal_thread_f(va_list ap)
 {
 	(void) ap;
+	struct wal_writer *writer = &wal_writer_singleton;
+	rlist_create(&wal_thread.relay);
 
 	/** Initialize eio in this thread */
 	coio_enable();
@@ -1110,7 +1435,14 @@ wal_thread_f(va_list ap)
 
 	cbus_loop(&endpoint);
 
-	struct wal_writer *writer = &wal_writer_singleton;
+	struct wal_relay *msg;
+	rlist_foreach_entry(msg, &wal_thread.relay, item) {
+		if (msg->cord.id != 0) {
+			if (tt_pthread_cancel(msg->cord.id) == ESRCH)
+				continue;
+			tt_pthread_join(msg->cord.id, NULL);
+		}
+	}
 
 	/*
 	 * Create a new empty WAL on shutdown so that we don't
@@ -1396,6 +1728,475 @@ wal_notify_watchers(struct wal_writer *writer, unsigned events)
 		wal_watcher_notify(watcher, events);
 }
 
+/**
+ * Send data to peer.
+ * Assume that socket already in non blocking mode.
+ *
+ * Return:
+ * 0 chunk sent without yield
+ * 1 chunk sent with yield
+ * -1 an error occurred
+ */
+static int
+wal_relay_send(void *data, size_t to_write, int fd)
+{
+	{
+		struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL);
+		if (inj != NULL && inj->bparam) {
+			void *errinj_data =  region_alloc(&fiber()->gc, to_write);
+			if (errinj_data == NULL) {
+				diag_set(OutOfMemory, to_write, "region", "relay pending data");
+				return -1;
+			}
+			memcpy(errinj_data, data, to_write);
+			data = errinj_data;
+			while (inj != NULL && inj->bparam)
+				fiber_sleep(0.01);
+		}
+	}
+
+	int rc = 0;
+	ssize_t written = write(fd, data, to_write);
+	if (written < 0 && ! (errno == EAGAIN || errno == EWOULDBLOCK))
+		goto io_error;
+	if (written == (ssize_t)to_write)
+		goto done;
+
+	/* Preserve data to send. */
+	if (written < 0)
+		written = 0;
+	to_write -= written;
+	void *pending_data = region_alloc(&fiber()->gc, to_write);
+	if (pending_data == NULL) {
+		diag_set(OutOfMemory, to_write, "region", "relay pending data");
+		return -1;
+	}
+	data += written;
+	memcpy(pending_data, data, to_write);
+	while (to_write > 0) {
+		if (coio_wait(fd, COIO_WRITE, TIMEOUT_INFINITY) < 0)
+			goto error;
+		written = write(fd, pending_data, to_write);
+		if (written < 0 && ! (errno == EAGAIN || errno == EWOULDBLOCK))
+			goto io_error;
+		if (written < 0)
+			written = 0;
+		pending_data += written;
+		to_write -= written;
+	}
+	rc = 1;
+
+done:
+	fiber_gc();
+	{
+		struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL);
+		inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
+		if (inj != NULL && inj->dparam > 0)
+			fiber_sleep(inj->dparam);
+	}
+	return rc;
+
+io_error:
+	diag_set(SocketError, sio_socketname(fd), "write");
+error:
+	fiber_gc();
+	return -1;
+}
+
+/** Send heartbeat to the peer. */
+static int
+wal_relay_heartbeat(int fd)
+{
+	struct xrow_header row;
+	xrow_encode_timestamp(&row, instance_id, ev_now(loop()));
+	struct iovec iov[XROW_IOVMAX];
+	int iovcnt = xrow_to_iovec(&row, iov);
+	if (iovcnt < 0)
+		return -1;
+	for (int i = 0; i < iovcnt; ++i)
+		if (wal_relay_send(iov[i].iov_base, iov[i].iov_len, fd) < 0)
+			return -1;
+	return 0;
+}
+
+/**
+ * Send memory chunk to peer.
+ * Assume that socket already in non blocking mode.
+ *
+ * Return:
+ * 0 chunk sent without yield
+ * 1 chunk sent with yield
+ * -1 an error occurred
+ */
+static int
+wal_relay_send_chunk(struct wal_relay *wal_relay)
+{
+	struct wal_writer *writer = wal_relay->writer;
+	struct wal_relay_mem_pos *pos = &wal_relay->pos;
+
+	/* Adjust position in case of rotation. */
+	assert(pos->chunk_index >= writer->wal_mem_discard_count -
+				 pos->wal_mem_discard_count);
+	pos->chunk_index -= writer->wal_mem_discard_count -
+			    pos->wal_mem_discard_count;
+	pos->wal_mem_discard_count = writer->wal_mem_discard_count;
+
+	struct wal_mem_index *first = wal_mem_index_first(writer);
+	struct wal_mem_index *last = wal_mem_index_last(writer);
+	struct wal_mem_index *chunk = first + pos->chunk_index;
+
+	if (chunk < last && chunk->size == pos->offset) {
+		/* Current chunk is done, use the next one. */
+		++pos->chunk_index;
+		pos->offset = 0;
+		++chunk;
+	}
+	assert(first <= chunk && last >= chunk);
+	assert(chunk->size >= pos->offset);
+	ssize_t to_write = chunk->size - pos->offset;
+	if (to_write == 0)
+		return 0;
+	/* Preserve the clock after the transfer because it might change. */
+	struct vclock last_vclock;
+	vclock_copy(&last_vclock,
+		    chunk == last ? &writer->vclock : &(chunk + 1)->vclock);
+	struct ibuf *buf = writer->wal_mem + chunk->buf_no;
+	void *data = buf->rpos + chunk->pos + pos->offset;
+	if ((wal_relay->replica->id != chunk->replica_id ||
+	     vclock_get(&chunk->vclock, chunk->replica_id) <
+	     vclock_get(&wal_relay->vclock_at_subscribe, chunk->replica_id)) &&
+	    wal_relay_send(data, to_write, wal_relay->fd) < 0)
+		return -1;
+	vclock_copy(&wal_relay->vclock, &last_vclock);
+	pos->offset += to_write;
+	return 0;
+}
+
+/** Test if vclock position in a writer memory. */
+static inline bool
+wal_relay_in_mem(struct wal_writer *writer, struct vclock *vclock)
+{
+	struct errinj *inj = errinj(ERRINJ_WAL_RELAY_DISABLE_MEM,
+				    ERRINJ_BOOL);
+	if (inj != NULL && inj->bparam) {
+		return false;
+	}
+	struct wal_mem_index *first = wal_mem_index_first(writer);
+	int cmp = vclock_compare(&first->vclock, vclock);
+	return cmp == -1 || cmp == 0;
+}
+
+/** Setup relay position. */
+static void
+wal_relay_setup_chunk(struct wal_relay *wal_relay)
+{
+	struct wal_writer *writer = wal_relay->writer;
+	struct vclock *vclock = &wal_relay->vclock;
+	struct wal_mem_index *first = wal_mem_index_first(writer);
+	struct wal_mem_index *last = wal_mem_index_last(writer);
+	struct wal_mem_index *mid = NULL;
+	while (last - first > 1) {
+		mid = first + (last - first + 1) / 2;
+		int cmp = vclock_compare(vclock, &mid->vclock);
+		if (cmp == 0) {
+			first = last = mid;
+			break;
+		}
+		if (cmp == 1)
+			first = mid;
+		else
+			last = mid;
+	}
+	if (last != first) {
+		int cmp = vclock_compare(vclock, &last->vclock);
+		if (cmp == 0 || cmp == 1)
+			++first;
+	}
+	wal_relay->pos.chunk_index = first - wal_mem_index_first(writer);
+	wal_relay->pos.wal_mem_discard_count = writer->wal_mem_discard_count;
+	wal_relay->pos.offset = 0;
+}
+
+/**
+ * Recover wal memory from current position until the end.
+ */
+static int
+wal_relay_recover_mem(struct wal_relay *wal_relay)
+{
+	struct wal_writer *writer = wal_relay->writer;
+	struct vclock *vclock = &wal_relay->vclock;
+	if (!wal_relay_in_mem(writer, vclock))
+		return 0;
+	wal_relay_setup_chunk(wal_relay);
+
+	do {
+		int rc = wal_relay_send_chunk(wal_relay);
+		if (rc < 0)
+			return -1;
+		if (vclock_compare(&writer->vclock, vclock) == 1) {
+			/* There are more data, send the next chunk. */
+			continue;
+		}
+		double timeout = replication_timeout;
+		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
+					    ERRINJ_DOUBLE);
+		if (inj != NULL && inj->dparam != 0)
+			timeout = inj->dparam;
+		/* Wait for new rows or heartbeat timeout. */
+		while (fiber_cond_wait_timeout(&writer->memory_cond,
+					       timeout) == -1 &&
+		       !fiber_is_cancelled(fiber())) {
+			if (wal_relay_heartbeat(wal_relay->fd) < 0)
+				return -1;
+			if (inj != NULL && inj->dparam != 0)
+				timeout = inj->dparam;
+			continue;
+		}
+	} while (!fiber_is_cancelled(fiber()) &&
+		 wal_relay_in_mem(writer, vclock));
+
+	return fiber_is_cancelled(fiber()) ? -1: 0;
+}
+
+/**
+ * Cord function to recover xlog files.
+ */
+static int
+wal_relay_recover_file_f(va_list ap)
+{
+	struct wal_relay *wal_relay = va_arg(ap, struct wal_relay *);
+	struct wal_writer *writer = wal_relay->writer;
+	struct replica *replica = wal_relay->replica;
+	struct vclock *vclock = &wal_relay->vclock;
+
+	struct recovery *recovery;
+	recovery = recovery_new(writer->wal_dir.dirname, false, vclock);
+	if (recovery == NULL)
+		return -1;
+
+	int res = relay_recover_wals(replica, recovery);
+	vclock_copy(vclock, &recovery->vclock);
+
+	recovery_delete(recovery);
+	return res;
+}
+
+/**
+ * Create cord to recover and relay xlog files.
+ */
+static int
+wal_relay_recover_file(struct wal_relay *msg)
+{
+	cord_costart(&msg->cord, "recovery wal files",
+		     wal_relay_recover_file_f, msg);
+	int res =  cord_cojoin(&msg->cord);
+	msg->cord.id = 0;
+	return res;
+}
+
+/**
+ * Message to inform relay about peer vclock changes.
+ */
+struct wal_relay_status_msg {
+	struct cbus_call_msg base;
+	/** Replica. */
+	struct replica *replica;
+	/** Known replica vclock. */
+	struct vclock vclock;
+};
+
+static int
+tx_wal_relay_status(struct cbus_call_msg *base)
+{
+	struct wal_relay_status_msg *msg =
+		container_of(base, struct wal_relay_status_msg, base);
+	relay_status_update(msg->replica, &msg->vclock);
+	return 0;
+}
+
+/**
+ * Peer vclock reader fiber function.
+ */
+static int
+wal_relay_reader_f(va_list ap)
+{
+	struct wal_relay *wal_relay = va_arg(ap, struct wal_relay *);
+	struct replica *replica = wal_relay->replica;
+
+	struct ibuf ibuf;
+	ibuf_create(&ibuf, &cord()->slabc, 1024);
+	while (!fiber_is_cancelled()) {
+		struct xrow_header xrow;
+		if (coio_read_xrow_timeout(wal_relay->fd, &ibuf, &xrow,
+					   replication_disconnect_timeout()) != 0)
+			break;
+		struct wal_relay_status_msg msg;
+		/* vclock is followed while decoding, zeroing it. */
+		vclock_create(&msg.vclock);
+		xrow_decode_vclock(&xrow, &msg.vclock);
+		msg.replica = replica;
+		cbus_call(&wal_thread.tx_prio_pipe, &wal_thread.wal_pipe,
+			  &msg.base, tx_wal_relay_status, NULL,
+			  TIMEOUT_INFINITY);
+	}
+	if (diag_is_empty(&wal_relay->diag))
+		diag_move(&fiber()->diag, &wal_relay->diag);
+	fiber_cancel(wal_relay->writer_fiber);
+	ibuf_destroy(&ibuf);
+	return 0;
+
+}
+
+/**
+ * Message to control wal relay.
+ */
+struct wal_relay_start_msg {
+	struct cmsg base;
+	/** Stop condition. */
+	struct fiber_cond stop_cond;
+	/** Done status to protect against spurious wakeup. */
+	bool done;
+	/** Replica. */
+	struct replica *replica;
+	/** Replica known vclock. */
+	struct vclock *vclock;
+	/** Replica socket. */
+	int fd;
+	/** Diagnostic area. */
+	struct diag diag;
+};
+
+/**
+ * Handler to inform when wal relay is exited.
+ */
+static void
+wal_relay_done(struct cmsg *base)
+{
+	struct wal_relay_start_msg *msg;
+	msg = container_of(base, struct wal_relay_start_msg, base);
+	msg->done = true;
+	fiber_cond_signal(&msg->stop_cond);
+}
+
+/**
+ * Helper to send wal relay done message.
+ */
+static void
+wal_relay_done_send(struct wal_relay_start_msg *msg)
+{
+	static struct cmsg_hop done_route[] = {
+		{wal_relay_done, NULL}
+	};
+	/*
+	 * Because of complicated cbus routing and fiber_start behavior
+	 * message could be still in cbus processing, so let cbus finish
+	 * with it.
+	 */
+	fiber_reschedule();
+	cmsg_init(&msg->base, done_route);
+	diag_move(&fiber()->diag, &msg->diag);
+	cpipe_push(&wal_thread.tx_prio_pipe, &msg->base);
+}
+
+/**
+ * Wal relay writer fiber.
+ */
+static int
+wal_relay_f(va_list ap)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+
+	struct wal_relay_start_msg *msg = va_arg(ap, struct wal_relay_start_msg *);
+	struct wal_relay wal_relay;
+	memset(&wal_relay, 0, sizeof(wal_relay));
+	rlist_add(&wal_thread.relay, &wal_relay.item);
+	wal_relay.writer = writer;
+	wal_relay.replica = msg->replica;
+	wal_relay.fd = msg->fd;
+	vclock_copy(&wal_relay.vclock, msg->vclock);
+	vclock_copy(&wal_relay.vclock_at_subscribe, &writer->vclock);
+	diag_create(&wal_relay.diag);
+	wal_relay.writer_fiber = fiber();
+	wal_relay.reader_fiber = fiber_new("relay_status", wal_relay_reader_f);
+	if (wal_relay.reader_fiber == NULL)
+		goto done;
+	fiber_set_joinable(wal_relay.reader_fiber, true);
+	fiber_start(wal_relay.reader_fiber, &wal_relay);
+
+	/* Open a new chunk to separate logs before subscribe. */
+	if (wal_mem_index_new(writer, wal_mem_index_last(writer)->buf_no,
+			      &writer->vclock) == NULL)
+		goto done;
+
+	if (wal_relay_heartbeat(msg->fd) < 0)
+		goto done;
+
+	while (!fiber_is_cancelled(fiber())) {
+		if (wal_relay_recover_mem(&wal_relay) < 0)
+			break;
+		if (wal_relay_recover_file(&wal_relay) < 0)
+			break;
+	}
+
+done:
+	if (diag_is_empty(&fiber()->diag))
+		diag_move(&wal_relay.diag, &fiber()->diag);
+	fiber_cancel(wal_relay.reader_fiber);
+	fiber_join(wal_relay.reader_fiber);
+
+	rlist_del(&wal_relay.item);
+	vclock_copy(msg->vclock, &wal_relay.vclock);
+
+	wal_relay_done_send(msg);
+	return 0;
+}
+
+/**
+ * Start a wal relay in a wal thread.
+ */
+static void
+wal_relay_start(struct cmsg *base)
+{
+	struct wal_relay_start_msg *msg;
+	msg = container_of(base, struct wal_relay_start_msg, base);
+
+	struct fiber *writer_fiber = fiber_new("wal_relay", wal_relay_f);
+	if (writer_fiber == NULL)
+		return wal_relay_done_send(msg);
+	fiber_start(writer_fiber, msg);
+}
+
+/**
+ * Start a wal relay.
+ */
+int
+wal_relay(struct replica *replica, struct vclock *vclock, int fd)
+{
+	/*
+	 * Send wal relay start to wal thread and then wait for a
+	 * finish condition.
+	 * We are not able to do that job in synchronous manner with
+	 * cbus_call and fiber join because wal thread has no fiber pool
+	 * and then cbus handler not allowed to yield.
+	 */
+	struct wal_relay_start_msg msg;
+	memset(&msg, 0, sizeof(msg));
+	msg.vclock = vclock;
+	msg.fd = fd;
+	msg.replica = replica;
+	msg.done = false;
+	fiber_cond_create(&msg.stop_cond);
+	diag_create(&msg.diag);
+	static struct cmsg_hop start_route[] = {
+		{wal_relay_start, NULL}};
+	cmsg_init(&msg.base, start_route);
+	cpipe_push(&wal_thread.wal_pipe, &msg.base);
+	while (!msg.done)
+		fiber_cond_wait(&msg.stop_cond);
+	if (!diag_is_empty(&msg.diag))
+		diag_move(&msg.diag, &fiber()->diag);
+	return diag_is_empty(&fiber()->diag) ? 0: -1;
+}
 
 /**
  * After fork, the WAL writer thread disappears.
diff --git a/src/box/wal.h b/src/box/wal.h
index a9452f2bd..6e38da403 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -40,6 +40,7 @@
 struct fiber;
 struct wal_writer;
 struct tt_uuid;
+struct replica;
 
 enum wal_mode { WAL_NONE = 0, WAL_WRITE, WAL_FSYNC, WAL_MODE_MAX };
 
@@ -232,6 +233,12 @@ wal_write_vy_log(struct journal_entry *req);
 void
 wal_rotate_vy_log();
 
+/**
+ * Relay wal rows from vclock into fd.
+ */
+int
+wal_relay(struct replica *replica, struct vclock *vclock, int fd);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index e9ee6b0c8..f7678ff53 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -92,6 +92,19 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
 	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len);
 }
 
+int
+coio_read_xrow_timeout(int fd, struct ibuf *in,
+		       struct xrow_header *row, ev_tstamp timeout)
+{
+	struct ev_io io;
+	coio_create(&io, fd);
+	try {
+		coio_read_xrow_timeout_xc(&io, in, row, timeout);
+	} catch(...) {
+		return -1;
+	}
+	return 0;
+}
 
 void
 coio_write_xrow(struct ev_io *coio, const struct xrow_header *row)
diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h
index 0eb7a8ace..04990eec3 100644
--- a/src/box/xrow_io.h
+++ b/src/box/xrow_io.h
@@ -45,6 +45,10 @@ void
 coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
 			  struct xrow_header *row, double timeout);
 
+int
+coio_read_xrow_timeout(int fd, struct ibuf *in,
+		       struct xrow_header *row, double timeout);
+
 void
 coio_write_xrow(struct ev_io *coio, const struct xrow_header *row);
 
diff --git a/src/errinj.h b/src/errinj.h
index 39de63d19..a2e8e95eb 100644
--- a/src/errinj.h
+++ b/src/errinj.h
@@ -123,6 +123,7 @@ struct errinj {
 	_(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT, {.iparam = -1}) \
 	_(ERRINJ_WAL_BREAK_LSN, ERRINJ_INT, {.iparam = -1}) \
 	_(ERRINJ_VY_COMPACTION_DELAY, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_WAL_RELAY_DISABLE_MEM, ERRINJ_BOOL, {.bparam = false}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 12303670e..5f025eea7 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -36,6 +36,8 @@ errinj.info()
     state: false
   ERRINJ_VYRUN_INDEX_GARBAGE:
     state: false
+  ERRINJ_WAL_RELAY_DISABLE_MEM:
+    state: false
   ERRINJ_VY_DELAY_PK_LOOKUP:
     state: false
   ERRINJ_VY_TASK_COMPLETE:
diff --git a/test/replication/force_recovery.result b/test/replication/force_recovery.result
index f50452858..fb9fac562 100644
--- a/test/replication/force_recovery.result
+++ b/test/replication/force_recovery.result
@@ -58,6 +58,13 @@ fio.unlink(xlog)
 ---
 - true
 ...
+errinj = box.error.injection
+---
+...
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
+---
+- ok
+...
 -- Check that even though box.cfg.force_recovery is set,
 -- replication will still fail due to LSN gap.
 box.cfg{force_recovery = true}
@@ -102,6 +109,10 @@ test_run:cmd("delete server test")
 test_run:cleanup_cluster()
 ---
 ...
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", false)
+---
+- ok
+...
 box.schema.user.revoke('guest', 'replication')
 ---
 ...
diff --git a/test/replication/force_recovery.test.lua b/test/replication/force_recovery.test.lua
index 54307814b..6c4a7f983 100644
--- a/test/replication/force_recovery.test.lua
+++ b/test/replication/force_recovery.test.lua
@@ -23,6 +23,8 @@ box.space.test:replace{1}
 box.snapshot()
 box.space.test:replace{2}
 fio.unlink(xlog)
+errinj = box.error.injection
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
 
 -- Check that even though box.cfg.force_recovery is set,
 -- replication will still fail due to LSN gap.
@@ -39,5 +41,6 @@ test_run:cmd("stop server test")
 test_run:cmd("cleanup server test")
 test_run:cmd("delete server test")
 test_run:cleanup_cluster()
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", false)
 box.schema.user.revoke('guest', 'replication')
 box.space.test:drop()
diff --git a/test/replication/replica_rejoin.result b/test/replication/replica_rejoin.result
index 87d626e20..62684e47c 100644
--- a/test/replication/replica_rejoin.result
+++ b/test/replication/replica_rejoin.result
@@ -210,6 +210,13 @@ fio = require('fio')
 box.cfg{checkpoint_count = checkpoint_count}
 ---
 ...
+errinj = box.error.injection
+---
+...
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
+---
+- ok
+...
 test_run:cmd("start server replica")
 ---
 - true
diff --git a/test/replication/replica_rejoin.test.lua b/test/replication/replica_rejoin.test.lua
index 9bf43eff8..40de0fc98 100644
--- a/test/replication/replica_rejoin.test.lua
+++ b/test/replication/replica_rejoin.test.lua
@@ -78,6 +78,8 @@ for i = 1, 3 do box.space.test:insert{i * 100} end
 fio = require('fio')
 #fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) -- 1
 box.cfg{checkpoint_count = checkpoint_count}
+errinj = box.error.injection
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
 test_run:cmd("start server replica")
 test_run:cmd("switch replica")
 box.info.status -- orphan
diff --git a/test/replication/show_error_on_disconnect.result b/test/replication/show_error_on_disconnect.result
index af082203b..072739fb0 100644
--- a/test/replication/show_error_on_disconnect.result
+++ b/test/replication/show_error_on_disconnect.result
@@ -30,6 +30,13 @@ test_run:cmd("switch master_quorum2")
 ---
 - true
 ...
+errinj = box.error.injection
+---
+...
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
+---
+- ok
+...
 box.space.test:insert{1}
 ---
 - [1]
diff --git a/test/replication/show_error_on_disconnect.test.lua b/test/replication/show_error_on_disconnect.test.lua
index 40e9dbc5e..cdae48416 100644
--- a/test/replication/show_error_on_disconnect.test.lua
+++ b/test/replication/show_error_on_disconnect.test.lua
@@ -13,6 +13,8 @@ test_run:cmd("switch master_quorum1")
 repl = box.cfg.replication
 box.cfg{replication = ""}
 test_run:cmd("switch master_quorum2")
+errinj = box.error.injection
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
 box.space.test:insert{1}
 box.snapshot()
 box.space.test:insert{2}
diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result
index a6b577dbc..c920f0c3d 100644
--- a/test/vinyl/layout.result
+++ b/test/vinyl/layout.result
@@ -188,52 +188,42 @@ result
         BODY:
           tuple: [11, {}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [7, {2: 5}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [7, {2: 4}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [4, {0: 2, 2: 10}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [5, {0: 2, 2: 10, 9: 14}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [8, {1: 3, 2: 10, 8: 11}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [10, {0: 2, 9: 14}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [4, {2: 12}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [5, {2: 12, 9: 14}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [8, {1: 1, 2: 12, 8: 13}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [10, {9: 14}]
diff --git a/test/xlog/panic_on_wal_error.result b/test/xlog/panic_on_wal_error.result
index 22f14f912..e033c3046 100644
--- a/test/xlog/panic_on_wal_error.result
+++ b/test/xlog/panic_on_wal_error.result
@@ -121,6 +121,14 @@ box.cfg.force_recovery
 -- try to start the replica, ha-ha
 -- (replication should fail, some rows are missing)
 --
+--
+errinj = box.error.injection
+---
+...
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
+---
+- ok
+...
 test_run:cmd("start server replica")
 ---
 - true
@@ -161,6 +169,10 @@ test_run:cmd("cleanup server replica")
 ---
 - true
 ...
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", false)
+---
+- ok
+...
 --
 -- cleanup
 box.space.test:drop()
diff --git a/test/xlog/panic_on_wal_error.test.lua b/test/xlog/panic_on_wal_error.test.lua
index 2e95431c6..b1884fe22 100644
--- a/test/xlog/panic_on_wal_error.test.lua
+++ b/test/xlog/panic_on_wal_error.test.lua
@@ -57,6 +57,9 @@ box.cfg.force_recovery
 -- try to start the replica, ha-ha
 -- (replication should fail, some rows are missing)
 --
+--
+errinj = box.error.injection
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
 test_run:cmd("start server replica")
 test_run:cmd("switch replica")
 fiber = require('fiber')
@@ -69,6 +72,7 @@ box.space.test:select{}
 test_run:cmd("switch default")
 test_run:cmd("stop server replica")
 test_run:cmd("cleanup server replica")
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", false)
 --
 -- cleanup
 box.space.test:drop()
-- 
2.20.1





More information about the Tarantool-patches mailing list