Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH] Relay logs from wal thread
@ 2019-01-22 14:18 Georgy Kirichenko
  2019-01-29  8:53 ` Vladimir Davydov
  0 siblings, 1 reply; 7+ messages in thread
From: Georgy Kirichenko @ 2019-01-22 14:18 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Use wal thread to relay rows. Relay writer and reader fibers live
in a wal thread. Writer fiber uses wal memory to send rows to the
peer when it possible or spawns cords to recover rows from a file.

Wal writer stores rows into two memory buffers swapping by buffer
threshold. Memory buffers are splitted into chunks with the
same server id issued by chunk threshold. In order to search
position in wal memory all chunks indexed by wal_mem_index
array. Each wal_mem_index contains corresponding replica_id and
vclock just before first row in the chunk as well as chunk buffer
number, position and size of the chunk in memory.

Closes: #3794

Issue: https://github.com/tarantool/tarantool/issues/3794
Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-3794--relay-from-wal
---
 src/box/relay.cc                              | 340 +-------
 src/box/relay.h                               |   7 +
 src/box/wal.c                                 | 819 +++++++++++++++++-
 src/box/wal.h                                 |   7 +
 src/box/xrow_io.cc                            |  13 +
 src/box/xrow_io.h                             |   4 +
 src/errinj.h                                  |   1 +
 test/box/errinj.result                        |   2 +
 test/replication/force_recovery.result        |  11 +
 test/replication/force_recovery.test.lua      |   3 +
 test/replication/replica_rejoin.result        |   7 +
 test/replication/replica_rejoin.test.lua      |   2 +
 .../show_error_on_disconnect.result           |   7 +
 .../show_error_on_disconnect.test.lua         |   2 +
 test/vinyl/layout.result                      |  10 -
 test/xlog/panic_on_wal_error.result           |  12 +
 test/xlog/panic_on_wal_error.test.lua         |   4 +
 17 files changed, 919 insertions(+), 332 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2bb260e06..9a4e63405 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -73,23 +73,6 @@ struct relay_status_msg {
 	struct vclock vclock;
 };
 
-/**
- * Cbus message to update replica gc state in tx thread.
- */
-struct relay_gc_msg {
-	/** Parent */
-	struct cmsg msg;
-	/**
-	 * Link in the list of pending gc messages,
-	 * see relay::pending_gc.
-	 */
-	struct stailq_entry in_pending;
-	/** Relay instance */
-	struct relay *relay;
-	/** Vclock to advance to */
-	struct vclock vclock;
-};
-
 /** State of a replication relay. */
 struct relay {
 	/** The thread in which we relay data to the replica. */
@@ -121,20 +104,9 @@ struct relay {
 	 * dataset on the other side and send missing data rows if any.
 	 */
 	struct vclock local_vclock_at_subscribe;
+	/** Cached wal_dir cfg option. */
+	const char *wal_dir;
 
-	/** Relay endpoint */
-	struct cbus_endpoint endpoint;
-	/** A pipe from 'relay' thread to 'tx' */
-	struct cpipe tx_pipe;
-	/** A pipe from 'tx' thread to 'relay' */
-	struct cpipe relay_pipe;
-	/** Status message */
-	struct relay_status_msg status_msg;
-	/**
-	 * List of garbage collection messages awaiting
-	 * confirmation from the replica.
-	 */
-	struct stailq pending_gc;
 	/** Time when last row was sent to peer. */
 	double last_row_tm;
 	/** Relay sync state. */
@@ -187,7 +159,6 @@ relay_new(struct replica *replica)
 	relay->replica = replica;
 	fiber_cond_create(&relay->reader_cond);
 	diag_create(&relay->diag);
-	stailq_create(&relay->pending_gc);
 	relay->state = RELAY_OFF;
 	return relay;
 }
@@ -235,19 +206,14 @@ relay_exit(struct relay *relay)
 	 * cursor, which must be closed in the same thread
 	 * that opened it (it uses cord's slab allocator).
 	 */
-	recovery_delete(relay->r);
+	if (relay->r)
+		recovery_delete(relay->r);
 	relay->r = NULL;
 }
 
 static void
 relay_stop(struct relay *relay)
 {
-	struct relay_gc_msg *gc_msg, *next_gc_msg;
-	stailq_foreach_entry_safe(gc_msg, next_gc_msg,
-				  &relay->pending_gc, in_pending) {
-		free(gc_msg);
-	}
-	stailq_create(&relay->pending_gc);
 	if (relay->r != NULL)
 		recovery_delete(relay->r);
 	relay->r = NULL;
@@ -372,286 +338,33 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
  * The message which updated tx thread with a new vclock has returned back
  * to the relay.
  */
-static void
-relay_status_update(struct cmsg *msg)
-{
-	msg->route = NULL;
-}
-
-/**
- * Deliver a fresh relay vclock to tx thread.
- */
-static void
-tx_status_update(struct cmsg *msg)
-{
-	struct relay_status_msg *status = (struct relay_status_msg *)msg;
-	vclock_copy(&status->relay->tx.vclock, &status->vclock);
-	static const struct cmsg_hop route[] = {
-		{relay_status_update, NULL}
-	};
-	cmsg_init(msg, route);
-	cpipe_push(&status->relay->relay_pipe, msg);
-}
-
-/**
- * Update replica gc state in tx thread.
- */
-static void
-tx_gc_advance(struct cmsg *msg)
-{
-	struct relay_gc_msg *m = (struct relay_gc_msg *)msg;
-	gc_consumer_advance(m->relay->replica->gc, &m->vclock);
-	free(m);
-}
-
-static void
-relay_on_close_log_f(struct trigger *trigger, void * /* event */)
-{
-	static const struct cmsg_hop route[] = {
-		{tx_gc_advance, NULL}
-	};
-	struct relay *relay = (struct relay *)trigger->data;
-	struct relay_gc_msg *m = (struct relay_gc_msg *)malloc(sizeof(*m));
-	if (m == NULL) {
-		say_warn("failed to allocate relay gc message");
-		return;
-	}
-	cmsg_init(&m->msg, route);
-	m->relay = relay;
-	vclock_copy(&m->vclock, &relay->r->vclock);
-	/*
-	 * Do not invoke garbage collection until the replica
-	 * confirms that it has received data stored in the
-	 * sent xlog.
-	 */
-	stailq_add_tail_entry(&relay->pending_gc, m, in_pending);
-}
-
-/**
- * Invoke pending garbage collection requests.
- *
- * This function schedules the most recent gc message whose
- * vclock is less than or equal to the given one. Older
- * messages are discarded as their job will be done by the
- * scheduled message anyway.
- */
-static inline void
-relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
-{
-	struct relay_gc_msg *curr, *next, *gc_msg = NULL;
-	stailq_foreach_entry_safe(curr, next, &relay->pending_gc, in_pending) {
-		if (vclock_sum(&curr->vclock) > vclock_sum(vclock))
-			break;
-		stailq_shift(&relay->pending_gc);
-		free(gc_msg);
-		gc_msg = curr;
-	}
-	if (gc_msg != NULL)
-		cpipe_push(&relay->tx_pipe, &gc_msg->msg);
-}
-
-static void
-relay_set_error(struct relay *relay, struct error *e)
-{
-	/* Don't override existing error. */
-	if (diag_is_empty(&relay->diag))
-		diag_add_error(&relay->diag, e);
-}
-
-static void
-relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
-{
-	struct relay *relay = container_of(watcher, struct relay, wal_watcher);
-	if (fiber_is_cancelled()) {
-		/*
-		 * The relay is exiting. Rescanning the WAL at this
-		 * point would be pointless and even dangerous,
-		 * because the relay could have written a packet
-		 * fragment to the socket before being cancelled
-		 * so that writing another row to the socket would
-		 * lead to corrupted replication stream and, as
-		 * a result, permanent replication breakdown.
-		 */
-		return;
-	}
-	try {
-		recover_remaining_wals(relay->r, &relay->stream, NULL,
-				       (events & WAL_EVENT_ROTATE) != 0);
-		relay_flush(relay);
-	} catch (Exception *e) {
-		relay_set_error(relay, e);
-		fiber_cancel(fiber());
-	}
-}
-
-/*
- * Relay reader fiber function.
- * Read xrow encoded vclocks sent by the replica.
- */
 int
-relay_reader_f(va_list ap)
+relay_status_update(struct replica *replica, struct vclock *vclock)
 {
-	struct relay *relay = va_arg(ap, struct relay *);
-	struct fiber *relay_f = va_arg(ap, struct fiber *);
-
-	struct ibuf ibuf;
-	struct ev_io io;
-	coio_create(&io, relay->io.fd);
-	ibuf_create(&ibuf, &cord()->slabc, 1024);
-	try {
-		while (!fiber_is_cancelled()) {
-			struct xrow_header xrow;
-			coio_read_xrow_timeout_xc(&io, &ibuf, &xrow,
-					replication_disconnect_timeout());
-			/* vclock is followed while decoding, zeroing it. */
-			vclock_create(&relay->recv_vclock);
-			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
-			fiber_cond_signal(&relay->reader_cond);
-		}
-	} catch (Exception *e) {
-		relay_set_error(relay, e);
-		fiber_cancel(relay_f);
-	}
-	ibuf_destroy(&ibuf);
+	struct relay *relay = replica->relay;
+	vclock_copy(&relay->tx.vclock, vclock);
+	if (vclock_compare(vclock, &replica->gc->vclock) == 1)
+		gc_consumer_advance(replica->gc, vclock);
 	return 0;
 }
 
-/**
- * Send a heartbeat message over a connected relay.
- */
-static void
-relay_send_heartbeat(struct relay *relay)
-{
-	struct xrow_header row;
-	xrow_encode_timestamp(&row, instance_id, ev_now(loop()));
-	try {
-		relay_send(relay, &row);
-	} catch (Exception *e) {
-		relay_set_error(relay, e);
-		fiber_cancel(fiber());
-	}
-}
-
-/**
- * A libev callback invoked when a relay client socket is ready
- * for read. This currently only happens when the client closes
- * its socket, and we get an EOF.
- */
-static int
-relay_subscribe_f(va_list ap)
+int
+relay_recover_wals(struct replica *replica, struct recovery *recovery)
 {
-	struct relay *relay = va_arg(ap, struct relay *);
-	struct recovery *r = relay->r;
+	struct relay *relay = replica->relay;
 	ibuf_create(&relay->send_buf, &cord()->slabc,
 		    2 * RELAY_BUFFER_SEND_THRESHOLD);
 
-	coio_enable();
-	relay_set_cord_name(relay->io.fd);
-
-	/* Create cpipe to tx for propagating vclock. */
-	cbus_endpoint_create(&relay->endpoint, tt_sprintf("relay_%p", relay),
-			     fiber_schedule_cb, fiber());
-	cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
-		  &relay->relay_pipe, NULL, NULL, cbus_process);
-
-	/* Setup garbage collection trigger. */
-	struct trigger on_close_log = {
-		RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL
-	};
-	trigger_add(&r->on_close_log, &on_close_log);
-
-	/* Setup WAL watcher for sending new rows to the replica. */
-	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
-			relay_process_wal_event, cbus_process);
-
-	/* Start fiber for receiving replica acks. */
-	char name[FIBER_NAME_MAX];
-	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
-	struct fiber *reader = fiber_new_xc(name, relay_reader_f);
-	fiber_set_joinable(reader, true);
-	fiber_start(reader, relay, fiber());
-
-	/*
-	 * If the replica happens to be up to date on subscribe,
-	 * don't wait for timeout to happen - send a heartbeat
-	 * message right away to update the replication lag as
-	 * soon as possible.
-	 */
-	relay_send_heartbeat(relay);
-
-	/*
-	 * Run the event loop until the connection is broken
-	 * or an error occurs.
-	 */
-	while (!fiber_is_cancelled()) {
-		double timeout = replication_timeout;
-		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
-					    ERRINJ_DOUBLE);
-		if (inj != NULL && inj->dparam != 0)
-			timeout = inj->dparam;
-
-		fiber_cond_wait_deadline(&relay->reader_cond,
-					 relay->last_row_tm + timeout);
-
-		/*
-		 * The fiber can be woken by IO cancel, by a timeout of
-		 * status messaging or by an acknowledge to status message.
-		 * Handle cbus messages first.
-		 */
-		cbus_process(&relay->endpoint);
-		/* Check for a heartbeat timeout. */
-		if (ev_monotonic_now(loop()) - relay->last_row_tm > timeout)
-			relay_send_heartbeat(relay);
-		/*
-		 * Check that the vclock has been updated and the previous
-		 * status message is delivered
-		 */
-		if (relay->status_msg.msg.route != NULL)
-			continue;
-		struct vclock *send_vclock;
-		if (relay->version_id < version_id(1, 7, 4))
-			send_vclock = &r->vclock;
-		else
-			send_vclock = &relay->recv_vclock;
-		if (vclock_sum(&relay->status_msg.vclock) ==
-		    vclock_sum(send_vclock))
-			continue;
-		static const struct cmsg_hop route[] = {
-			{tx_status_update, NULL}
-		};
-		cmsg_init(&relay->status_msg.msg, route);
-		vclock_copy(&relay->status_msg.vclock, send_vclock);
-		relay->status_msg.relay = relay;
-		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
-		/* Collect xlog files received by the replica. */
-		relay_schedule_pending_gc(relay, send_vclock);
+	int res = 0;
+	try {
+		recover_remaining_wals(recovery, &relay->stream, NULL, true);
+		relay_flush(relay);
+	} catch (Exception &ex) {
+		res = -1;
 	}
 
-	/*
-	 * Log the error that caused the relay to break the loop.
-	 * Don't clear the error for status reporting.
-	 */
-	assert(!diag_is_empty(&relay->diag));
-	diag_add_error(diag_get(), diag_last_error(&relay->diag));
-	diag_log();
-	say_crit("exiting the relay loop");
-
-	/* Clear garbage collector trigger and WAL watcher. */
-	trigger_clear(&on_close_log);
-	wal_clear_watcher(&relay->wal_watcher, cbus_process);
-
-	/* Join ack reader fiber. */
-	fiber_cancel(reader);
-	fiber_join(reader);
-
-	/* Destroy cpipe to tx. */
-	cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
-		    NULL, NULL, cbus_process);
-	cbus_endpoint_destroy(&relay->endpoint, cbus_process);
-
-	relay_exit(relay);
 	ibuf_destroy(&relay->send_buf);
-	return -1;
+	return res;
 }
 
 /** Replication acceptor fiber handler. */
@@ -676,21 +389,22 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 
 	relay_start(relay, fd, sync, relay_send_row);
 	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
-	relay->r = recovery_new(cfg_gets("wal_dir"), false,
-			        replica_clock);
+	relay->wal_dir = cfg_gets("wal_dir");
+	relay->r = NULL;
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
 
-	int rc = cord_costart(&relay->cord, "subscribe",
-			      relay_subscribe_f, relay);
-	if (rc == 0)
-		rc = cord_cojoin(&relay->cord);
+	wal_relay(replica, replica_clock, fd);
 
+	relay_exit(relay);
 	relay_stop(relay);
 	replica_on_relay_stop(replica);
 
-	if (rc != 0)
+	if (!diag_is_empty(&fiber()->diag)) {
+		if (diag_is_empty(&relay->diag))
+			diag_add_error(&relay->diag, diag_last_error(&fiber()->diag));
 		diag_raise();
+	}
 }
 
 static void
diff --git a/src/box/relay.h b/src/box/relay.h
index c0848899d..c2d97f5e6 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -41,6 +41,7 @@ struct relay;
 struct replica;
 struct tt_uuid;
 struct vclock;
+struct recovery;
 
 enum relay_state {
 	/**
@@ -85,6 +86,12 @@ relay_get_state(const struct relay *relay);
 const struct vclock *
 relay_vclock(const struct relay *relay);
 
+int
+relay_status_update(struct replica *replica, struct vclock *vclock);
+
+int
+relay_recover_wals(struct replica *replica, struct recovery *recovery);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/wal.c b/src/box/wal.c
index 17ead08e7..6c8fcd08a 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -36,13 +36,19 @@
 #include "errinj.h"
 #include "error.h"
 #include "exception.h"
+#include "sio.h"
+#include "coio.h"
 
 #include "xlog.h"
 #include "xrow.h"
+#include "xrow_io.h"
 #include "vy_log.h"
 #include "cbus.h"
 #include "coio_task.h"
 #include "replication.h"
+#include "recovery.h"
+#include "relay.h"
+#include "iproto_constants.h"
 
 enum {
 	/**
@@ -54,6 +60,10 @@ enum {
 	 * latency. 1 MB seems to be a well balanced choice.
 	 */
 	WAL_FALLOCATE_LEN = 1024 * 1024,
+	/** Wal memory threshold. */
+	WAL_MEMORY_THRESHOLD = 4 * 1024 * 1024,
+	/** Wal memory chunk threshold. */
+	WAL_MEM_CHUNK_THRESHOLD = 16 * 1024,
 };
 
 const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
@@ -77,6 +87,7 @@ struct wal_thread {
 	 * priority pipe and DOES NOT support yield.
 	 */
 	struct cpipe tx_prio_pipe;
+	struct rlist relay;
 };
 
 /*
@@ -158,6 +169,26 @@ struct wal_writer
 	 * Used for replication relays.
 	 */
 	struct rlist watchers;
+	/**
+	 * Wal memory buffer routines.
+	 * Writer stores rows into two memory buffers swapping by buffer
+	 * threshold. Memory buffers are splitted into chunks with the
+	 * same server id issued by chunk threshold. In order to search
+	 * position in wal memory all chunks indexed by wal_mem_index array.
+	 * Each wal_mem_index contains corresponding replica_id and vclock
+	 * just before first row in the chunk as well as chunk buffer number,
+	 * position and size of the chunk in memory. When buffer should be
+	 * swapped then all buffers chunks discarded and wal_mem_discard_count_count
+	 * increases in order to adjust relays position.
+	 */
+	/** Rows buffers. */
+	struct ibuf wal_mem[2];
+	/** Index buffer. */
+	struct ibuf wal_mem_index;
+	/** Count of discarded mem chunks. */
+	uint64_t wal_mem_discard_count;
+	/** Condition to signal if there are new rows. */
+	struct fiber_cond memory_cond;
 };
 
 struct wal_msg {
@@ -175,6 +206,66 @@ struct wal_msg {
 	struct vclock vclock;
 };
 
+/**
+ * Wal memory chunk index.
+ */
+struct wal_mem_index {
+	/** replica id. */
+	uint32_t replica_id;
+	/** vclock just before first row in the chunk. */
+	struct vclock vclock;
+	/** Buffer number. */
+	uint8_t buf_no;
+	/** Chunk starting offset. */
+	uint64_t pos;
+	/** Chunk size. */
+	uint64_t size;
+};
+
+/**
+ * Wal memory position checkpoint.
+ */
+struct wal_mem_checkpoint {
+	/** Chunks count. */
+	uint32_t count;
+	/** Chunk size. */
+	uint32_t size;
+};
+
+/** Current relaying position. */
+struct wal_relay_mem_pos {
+	uint64_t wal_mem_discard_count;
+	uint32_t chunk_index;
+	uint32_t offset;
+};
+
+/**
+ * Wal relay structure.
+ */
+struct wal_relay {
+	struct rlist item;
+	/** Writer. */
+	struct wal_writer *writer;
+	/** Sent vclock. */
+	struct vclock vclock;
+	/** Vclock when subscribed. */
+	struct vclock vclock_at_subscribe;
+	/** Socket to send data. */
+	int fd;
+	/** Peer replica. */
+	struct replica *replica;
+	/** Cord to recover and relay data from files. */
+	struct cord cord;
+	/** A diagnostic area. */
+	struct diag diag;
+	/** Current position. */
+	struct wal_relay_mem_pos pos;
+	/** Relay writer fiber. */
+	struct fiber *writer_fiber;
+	/** Relay reader fiber. */
+	struct fiber *reader_fiber;
+};
+
 /**
  * Vinyl metadata log writer.
  */
@@ -230,7 +321,6 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
 	xlog_tx_begin(l);
 	struct xrow_header **row = entry->rows;
 	for (; row < entry->rows + entry->n_rows; row++) {
-		(*row)->tm = ev_now(loop());
 		struct errinj *inj = errinj(ERRINJ_WAL_BREAK_LSN, ERRINJ_INT);
 		if (inj != NULL && inj->iparam == (*row)->lsn) {
 			(*row)->lsn = inj->iparam - 1;
@@ -339,6 +429,43 @@ tx_notify_checkpoint(struct cmsg *msg)
 	free(msg);
 }
 
+/**
+ * A message to initialize a writer in a wal thread.
+ */
+struct wal_writer_create_msg {
+	struct cbus_call_msg base;
+	struct wal_writer *writer;
+};
+
+/**
+ * Writer initialization to do in a wal thread.
+ */
+static int
+wal_writer_create_wal(struct cbus_call_msg *base)
+{
+	struct wal_writer_create_msg *msg =
+		container_of(base, struct wal_writer_create_msg, base);
+	struct wal_writer *writer= msg->writer;
+	ibuf_create(&writer->wal_mem[0], &cord()->slabc, 65536);
+	ibuf_create(&writer->wal_mem[1], &cord()->slabc, 65536);
+	ibuf_create(&writer->wal_mem_index, &cord()->slabc, 8192);
+	struct wal_mem_index *index;
+	index = ibuf_alloc(&writer->wal_mem_index, sizeof(struct wal_mem_index));
+	if (index == NULL) {
+		/* Could not initialize wal writer. */
+		panic("Could not create wal");
+		unreachable();
+	}
+	writer->wal_mem_discard_count = 0;
+	vclock_copy(&index->vclock, &writer->vclock);
+	index->pos = 0;
+	index->size = 0;
+	index->buf_no = 0;
+	fiber_cond_create(&writer->memory_cond);
+
+	return 0;
+}
+
 /**
  * Initialize WAL writer context. Even though it's a singleton,
  * encapsulate the details just in case we may use
@@ -377,6 +504,11 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 
 	writer->on_garbage_collection = on_garbage_collection;
 	writer->on_checkpoint_threshold = on_checkpoint_threshold;
+	struct wal_writer_create_msg msg;
+	msg.writer = writer;
+	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+			   &msg.base, wal_writer_create_wal, NULL,
+			   TIMEOUT_INFINITY);
 }
 
 /** Destroy a WAL writer structure. */
@@ -920,9 +1052,9 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
 	}
 	if ((*begin)->replica_id != instance_id) {
 		/*
-		 * Move all local changes to the end of rows array and
+		 * Move all local changes to the end of rows array to form
 		 * a fake local transaction (like an autonomous transaction)
-		 * because we could not replicate the transaction back.
+		 * in order to be able to replicate local changes back.
 		 */
 		struct xrow_header **row = end - 1;
 		while (row >= begin) {
@@ -943,7 +1075,7 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
 		while (begin < end && begin[0]->replica_id != instance_id)
 			++begin;
 	}
-	/* Setup txn_id and tnx_replica_id for localy generated rows. */
+	/* Setup txn_id and tnx_replica_id for locally generated rows. */
 	row = begin;
 	while (row < end) {
 		row[0]->txn_id = begin[0]->lsn;
@@ -953,6 +1085,186 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
 	}
 }
 
+static inline struct wal_mem_index *
+wal_mem_index_first(struct wal_writer *writer)
+{
+	return (struct wal_mem_index *)writer->wal_mem_index.rpos;
+}
+
+static inline struct wal_mem_index *
+wal_mem_index_last(struct wal_writer *writer)
+{
+	return (struct wal_mem_index *)writer->wal_mem_index.wpos - 1;
+}
+
+static inline struct wal_mem_index *
+wal_mem_index_new(struct wal_writer *writer, int buf_no, struct vclock *vclock)
+{
+	struct wal_mem_index *last = wal_mem_index_last(writer);
+	if (last->size == 0)
+		return last;
+	last = (struct wal_mem_index *)ibuf_alloc(&writer->wal_mem_index,
+						  sizeof(struct wal_mem_index));
+	if (last == NULL) {
+		diag_set(OutOfMemory, sizeof(struct wal_mem_index),
+			 "region", "struct wal_mem_index");
+		return NULL;
+	}
+	last->buf_no = buf_no;
+	last->size = 0;
+	last->pos = ibuf_used(writer->wal_mem + buf_no);
+	vclock_copy(&last->vclock, vclock);
+	return last;
+}
+
+/** Save current memory position. */
+static inline void
+wal_mem_get_checkpoint(struct wal_writer *writer,
+		       struct wal_mem_checkpoint *mem_checkpoint)
+{
+	mem_checkpoint->count = wal_mem_index_last(writer) -
+			      wal_mem_index_first(writer);
+	mem_checkpoint->size = wal_mem_index_last(writer)->size;
+}
+
+/** Restore memory position. */
+static inline void
+wal_mem_set_checkpoint(struct wal_writer *writer,
+		       struct wal_mem_checkpoint *mem_checkpoint)
+{
+	struct wal_mem_index *index = wal_mem_index_first(writer) +
+				      mem_checkpoint->count;
+	assert(index->buf_no == wal_mem_index_last(writer)->buf_no);
+	index->size = mem_checkpoint->size;
+	/* Truncate buffers. */
+	writer->wal_mem_index.wpos = (char *)(index + 1);
+	struct ibuf *buf = writer->wal_mem + index->buf_no;
+	buf->wpos = buf->buf + index->pos + index->size;
+}
+
+/**
+ * Prepare wal memory to accept new rows and rotate buffer if it needs.
+ */
+static int
+wal_mem_prepare(struct wal_writer *writer)
+{
+	struct wal_mem_index *last = wal_mem_index_last(writer);
+	uint8_t buf_no = last->buf_no;
+	if (ibuf_used(&writer->wal_mem[buf_no]) > WAL_MEMORY_THRESHOLD) {
+		/* We are going to rotate buffers. */
+		struct wal_mem_index *first = wal_mem_index_first(writer);
+		while (first->buf_no == 1 - buf_no)
+			++first;
+		/* Discard all indexes on buffer to clear. */
+		writer->wal_mem_discard_count += first - wal_mem_index_first(writer);
+		writer->wal_mem_index.rpos = (char *)first;
+
+		buf_no = 1 - buf_no;
+		ibuf_reset(&writer->wal_mem[buf_no]);
+
+		last = wal_mem_index_new(writer, buf_no, &writer->vclock);
+		if (last == NULL)
+			return -1;
+	}
+	return 0;
+}
+
+/** Get a chunk to store rows data. */
+static struct wal_mem_index *
+wal_get_chunk(struct wal_writer *writer, uint32_t replica_id,
+	      struct vclock *vclock)
+{
+	struct wal_mem_index *last = wal_mem_index_last(writer);
+	int buf_no = last->buf_no;
+
+	if (last->size == 0)
+		last->replica_id = replica_id;
+
+	if (last->size > WAL_MEM_CHUNK_THRESHOLD ||
+	    last->replica_id != replica_id) {
+		/* Open new chunk. */
+		last = wal_mem_index_new(writer, buf_no, vclock);
+		if (last == NULL)
+			return NULL;
+		last->replica_id = replica_id;
+	}
+	return last;
+}
+
+/**
+ * Encode an entry into a wal memory buffer.
+ */
+static ssize_t
+wal_encode_entry(struct wal_writer *writer, struct journal_entry *entry,
+		 struct vclock *vclock)
+{
+	double tm = ev_now(loop());
+	struct xrow_header nop_row;
+	nop_row.type = IPROTO_NOP;
+	nop_row.group_id = GROUP_DEFAULT;
+	nop_row.bodycnt = 0;
+	nop_row.tm = tm;
+
+	struct wal_mem_index *last = wal_mem_index_last(writer);
+	int buf_no = last->buf_no;
+	last = NULL;
+	struct ibuf *mem_buf = writer->wal_mem + buf_no;
+
+	/* vclock to track encoding row. */
+	struct vclock chunk_vclock;
+	vclock_copy(&chunk_vclock, vclock);
+
+	wal_assign_lsn(vclock, entry->rows, entry->rows + entry->n_rows);
+	entry->res = vclock_sum(vclock);
+
+	struct xrow_header **row = entry->rows;
+	while (row < entry->rows + entry->n_rows) {
+		(*row)->tm = tm;
+		struct iovec iov[XROW_IOVMAX];
+		if ((*row)->group_id == GROUP_LOCAL) {
+			nop_row.replica_id = (*row)->replica_id;
+			nop_row.lsn = (*row)->lsn;
+			nop_row.txn_id = (*row)->txn_id;
+			nop_row.txn_replica_id = (*row)->txn_replica_id;
+		}
+		struct xrow_header *send_row = (*row)->group_id != GROUP_LOCAL ?
+					        *row : &nop_row;
+		if (last == NULL) {
+			/* Do not allow split transactions into chunks. */
+			last = wal_get_chunk(writer, send_row->replica_id,
+					     &chunk_vclock);
+		}
+		if (last == NULL)
+			goto error;
+		vclock_follow_xrow(&chunk_vclock, send_row);
+
+		int iovcnt = xrow_to_iovec(send_row, iov);
+		if (iovcnt < 0)
+			goto error;
+		uint64_t xrow_size = 0;
+		for (int i = 0; i < iovcnt; ++i)
+			xrow_size += iov[i].iov_len;
+		if (ibuf_reserve(mem_buf, xrow_size) == NULL) {
+			diag_set(OutOfMemory, xrow_size,
+				 "region", "entry memory");
+			goto error;
+		}
+
+		for (int i = 0; i < iovcnt; ++i) {
+			memcpy(ibuf_alloc(mem_buf, iov[i].iov_len),
+			       iov[i].iov_base, iov[i].iov_len);
+			last->size += iov[i].iov_len;
+		}
+		if (row[0]->txn_last == 1)
+			last = NULL;
+		++row;
+	}
+	return 0;
+
+error:
+	return -1;
+}
+
 static void
 wal_write_to_disk(struct cmsg *msg)
 {
@@ -1016,22 +1328,32 @@ wal_write_to_disk(struct cmsg *msg)
 	int rc;
 	struct journal_entry *entry;
 	struct stailq_entry *last_committed = NULL;
+	if (wal_mem_prepare(writer) != 0)
+		goto done;
+	struct wal_mem_checkpoint mem_checkpoint;
+	wal_mem_get_checkpoint(writer, &mem_checkpoint);
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
-		wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
-		entry->res = vclock_sum(&vclock);
+		rc = wal_encode_entry(writer, entry, &vclock);
+		if (rc != 0)
+			goto done;
 		rc = xlog_write_entry(l, entry);
-		if (rc < 0)
+		if (rc < 0) {
+			wal_mem_set_checkpoint(writer, &mem_checkpoint);
 			goto done;
+		}
 		if (rc > 0) {
 			writer->checkpoint_wal_size += rc;
 			last_committed = &entry->fifo;
 			vclock_copy(&writer->vclock, &vclock);
+			wal_mem_get_checkpoint(writer, &mem_checkpoint);
 		}
 		/* rc == 0: the write is buffered in xlog_tx */
 	}
 	rc = xlog_flush(l);
-	if (rc < 0)
+	if (rc < 0) {
+		wal_mem_set_checkpoint(writer, &mem_checkpoint);
 		goto done;
+	}
 
 	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
@@ -1087,6 +1409,7 @@ done:
 		wal_writer_begin_rollback(writer);
 	}
 	fiber_gc();
+	fiber_cond_broadcast(&writer->memory_cond);
 	wal_notify_watchers(writer, WAL_EVENT_WRITE);
 }
 
@@ -1095,6 +1418,8 @@ static int
 wal_thread_f(va_list ap)
 {
 	(void) ap;
+	struct wal_writer *writer = &wal_writer_singleton;
+	rlist_create(&wal_thread.relay);
 
 	/** Initialize eio in this thread */
 	coio_enable();
@@ -1110,7 +1435,14 @@ wal_thread_f(va_list ap)
 
 	cbus_loop(&endpoint);
 
-	struct wal_writer *writer = &wal_writer_singleton;
+	struct wal_relay *msg;
+	rlist_foreach_entry(msg, &wal_thread.relay, item) {
+		if (msg->cord.id != 0) {
+			if (tt_pthread_cancel(msg->cord.id) == ESRCH)
+				continue;
+			tt_pthread_join(msg->cord.id, NULL);
+		}
+	}
 
 	/*
 	 * Create a new empty WAL on shutdown so that we don't
@@ -1396,6 +1728,475 @@ wal_notify_watchers(struct wal_writer *writer, unsigned events)
 		wal_watcher_notify(watcher, events);
 }
 
+/**
+ * Send data to peer.
+ * Assume that socket already in non blocking mode.
+ *
+ * Return:
+ * 0 chunk sent without yield
+ * 1 chunk sent with yield
+ * -1 an error occurred
+ */
+static int
+wal_relay_send(void *data, size_t to_write, int fd)
+{
+	{
+		struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL);
+		if (inj != NULL && inj->bparam) {
+			void *errinj_data =  region_alloc(&fiber()->gc, to_write);
+			if (errinj_data == NULL) {
+				diag_set(OutOfMemory, to_write, "region", "relay pending data");
+				return -1;
+			}
+			memcpy(errinj_data, data, to_write);
+			data = errinj_data;
+			while (inj != NULL && inj->bparam)
+				fiber_sleep(0.01);
+		}
+	}
+
+	int rc = 0;
+	ssize_t written = write(fd, data, to_write);
+	if (written < 0 && ! (errno == EAGAIN || errno == EWOULDBLOCK))
+		goto io_error;
+	if (written == (ssize_t)to_write)
+		goto done;
+
+	/* Preserve data to send. */
+	if (written < 0)
+		written = 0;
+	to_write -= written;
+	void *pending_data = region_alloc(&fiber()->gc, to_write);
+	if (pending_data == NULL) {
+		diag_set(OutOfMemory, to_write, "region", "relay pending data");
+		return -1;
+	}
+	data += written;
+	memcpy(pending_data, data, to_write);
+	while (to_write > 0) {
+		if (coio_wait(fd, COIO_WRITE, TIMEOUT_INFINITY) < 0)
+			goto error;
+		written = write(fd, pending_data, to_write);
+		if (written < 0 && ! (errno == EAGAIN || errno == EWOULDBLOCK))
+			goto io_error;
+		if (written < 0)
+			written = 0;
+		pending_data += written;
+		to_write -= written;
+	}
+	rc = 1;
+
+done:
+	fiber_gc();
+	{
+		struct errinj *inj = errinj(ERRINJ_RELAY_SEND_DELAY, ERRINJ_BOOL);
+		inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
+		if (inj != NULL && inj->dparam > 0)
+			fiber_sleep(inj->dparam);
+	}
+	return rc;
+
+io_error:
+	diag_set(SocketError, sio_socketname(fd), "write");
+error:
+	fiber_gc();
+	return -1;
+}
+
+/** Send heartbeat to the peer. */
+static int
+wal_relay_heartbeat(int fd)
+{
+	struct xrow_header row;
+	xrow_encode_timestamp(&row, instance_id, ev_now(loop()));
+	struct iovec iov[XROW_IOVMAX];
+	int iovcnt = xrow_to_iovec(&row, iov);
+	if (iovcnt < 0)
+		return -1;
+	for (int i = 0; i < iovcnt; ++i)
+		if (wal_relay_send(iov[i].iov_base, iov[i].iov_len, fd) < 0)
+			return -1;
+	return 0;
+}
+
+/**
+ * Send memory chunk to peer.
+ * Assume that socket already in non blocking mode.
+ *
+ * Return:
+ * 0 chunk sent without yield
+ * 1 chunk sent with yield
+ * -1 an error occurred
+ */
+static int
+wal_relay_send_chunk(struct wal_relay *wal_relay)
+{
+	struct wal_writer *writer = wal_relay->writer;
+	struct wal_relay_mem_pos *pos = &wal_relay->pos;
+
+	/* Adjust position in case of rotation. */
+	assert(pos->chunk_index >= writer->wal_mem_discard_count -
+				 pos->wal_mem_discard_count);
+	pos->chunk_index -= writer->wal_mem_discard_count -
+			    pos->wal_mem_discard_count;
+	pos->wal_mem_discard_count = writer->wal_mem_discard_count;
+
+	struct wal_mem_index *first = wal_mem_index_first(writer);
+	struct wal_mem_index *last = wal_mem_index_last(writer);
+	struct wal_mem_index *chunk = first + pos->chunk_index;
+
+	if (chunk < last && chunk->size == pos->offset) {
+		/* Current chunk is done, use the next one. */
+		++pos->chunk_index;
+		pos->offset = 0;
+		++chunk;
+	}
+	assert(first <= chunk && last >= chunk);
+	assert(chunk->size >= pos->offset);
+	ssize_t to_write = chunk->size - pos->offset;
+	if (to_write == 0)
+		return 0;
+	/* Preserve the clock after the transfer because it might change. */
+	struct vclock last_vclock;
+	vclock_copy(&last_vclock,
+		    chunk == last ? &writer->vclock : &(chunk + 1)->vclock);
+	struct ibuf *buf = writer->wal_mem + chunk->buf_no;
+	void *data = buf->rpos + chunk->pos + pos->offset;
+	if ((wal_relay->replica->id != chunk->replica_id ||
+	     vclock_get(&chunk->vclock, chunk->replica_id) <
+	     vclock_get(&wal_relay->vclock_at_subscribe, chunk->replica_id)) &&
+	    wal_relay_send(data, to_write, wal_relay->fd) < 0)
+		return -1;
+	vclock_copy(&wal_relay->vclock, &last_vclock);
+	pos->offset += to_write;
+	return 0;
+}
+
+/** Test if vclock position in a writer memory. */
+static inline bool
+wal_relay_in_mem(struct wal_writer *writer, struct vclock *vclock)
+{
+	struct errinj *inj = errinj(ERRINJ_WAL_RELAY_DISABLE_MEM,
+				    ERRINJ_BOOL);
+	if (inj != NULL && inj->bparam) {
+		return false;
+	}
+	struct wal_mem_index *first = wal_mem_index_first(writer);
+	int cmp = vclock_compare(&first->vclock, vclock);
+	return cmp == -1 || cmp == 0;
+}
+
+/** Setup relay position. */
+static void
+wal_relay_setup_chunk(struct wal_relay *wal_relay)
+{
+	struct wal_writer *writer = wal_relay->writer;
+	struct vclock *vclock = &wal_relay->vclock;
+	struct wal_mem_index *first = wal_mem_index_first(writer);
+	struct wal_mem_index *last = wal_mem_index_last(writer);
+	struct wal_mem_index *mid = NULL;
+	while (last - first > 1) {
+		mid = first + (last - first + 1) / 2;
+		int cmp = vclock_compare(vclock, &mid->vclock);
+		if (cmp == 0) {
+			first = last = mid;
+			break;
+		}
+		if (cmp == 1)
+			first = mid;
+		else
+			last = mid;
+	}
+	if (last != first) {
+		int cmp = vclock_compare(vclock, &last->vclock);
+		if (cmp == 0 || cmp == 1)
+			++first;
+	}
+	wal_relay->pos.chunk_index = first - wal_mem_index_first(writer);
+	wal_relay->pos.wal_mem_discard_count = writer->wal_mem_discard_count;
+	wal_relay->pos.offset = 0;
+}
+
+/**
+ * Recover wal memory from current position until the end.
+ */
+static int
+wal_relay_recover_mem(struct wal_relay *wal_relay)
+{
+	struct wal_writer *writer = wal_relay->writer;
+	struct vclock *vclock = &wal_relay->vclock;
+	if (!wal_relay_in_mem(writer, vclock))
+		return 0;
+	wal_relay_setup_chunk(wal_relay);
+
+	do {
+		int rc = wal_relay_send_chunk(wal_relay);
+		if (rc < 0)
+			return -1;
+		if (vclock_compare(&writer->vclock, vclock) == 1) {
+			/* There are more data, send the next chunk. */
+			continue;
+		}
+		double timeout = replication_timeout;
+		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
+					    ERRINJ_DOUBLE);
+		if (inj != NULL && inj->dparam != 0)
+			timeout = inj->dparam;
+		/* Wait for new rows or heartbeat timeout. */
+		while (fiber_cond_wait_timeout(&writer->memory_cond,
+					       timeout) == -1 &&
+		       !fiber_is_cancelled(fiber())) {
+			if (wal_relay_heartbeat(wal_relay->fd) < 0)
+				return -1;
+			if (inj != NULL && inj->dparam != 0)
+				timeout = inj->dparam;
+			continue;
+		}
+	} while (!fiber_is_cancelled(fiber()) &&
+		 wal_relay_in_mem(writer, vclock));
+
+	return fiber_is_cancelled(fiber()) ? -1: 0;
+}
+
+/**
+ * Cord function to recover xlog files.
+ */
+static int
+wal_relay_recover_file_f(va_list ap)
+{
+	struct wal_relay *wal_relay = va_arg(ap, struct wal_relay *);
+	struct wal_writer *writer = wal_relay->writer;
+	struct replica *replica = wal_relay->replica;
+	struct vclock *vclock = &wal_relay->vclock;
+
+	struct recovery *recovery;
+	recovery = recovery_new(writer->wal_dir.dirname, false, vclock);
+	if (recovery == NULL)
+		return -1;
+
+	int res = relay_recover_wals(replica, recovery);
+	vclock_copy(vclock, &recovery->vclock);
+
+	recovery_delete(recovery);
+	return res;
+}
+
+/**
+ * Create cord to recover and relay xlog files.
+ */
+static int
+wal_relay_recover_file(struct wal_relay *msg)
+{
+	cord_costart(&msg->cord, "recovery wal files",
+		     wal_relay_recover_file_f, msg);
+	int res =  cord_cojoin(&msg->cord);
+	msg->cord.id = 0;
+	return res;
+}
+
+/**
+ * Message to inform relay about peer vclock changes.
+ */
+struct wal_relay_status_msg {
+	struct cbus_call_msg base;
+	/** Replica. */
+	struct replica *replica;
+	/** Known replica vclock. */
+	struct vclock vclock;
+};
+
+static int
+tx_wal_relay_status(struct cbus_call_msg *base)
+{
+	struct wal_relay_status_msg *msg =
+		container_of(base, struct wal_relay_status_msg, base);
+	relay_status_update(msg->replica, &msg->vclock);
+	return 0;
+}
+
+/**
+ * Peer vclock reader fiber function.
+ */
+static int
+wal_relay_reader_f(va_list ap)
+{
+	struct wal_relay *wal_relay = va_arg(ap, struct wal_relay *);
+	struct replica *replica = wal_relay->replica;
+
+	struct ibuf ibuf;
+	ibuf_create(&ibuf, &cord()->slabc, 1024);
+	while (!fiber_is_cancelled()) {
+		struct xrow_header xrow;
+		if (coio_read_xrow_timeout(wal_relay->fd, &ibuf, &xrow,
+					   replication_disconnect_timeout()) != 0)
+			break;
+		struct wal_relay_status_msg msg;
+		/* vclock is followed while decoding, zeroing it. */
+		vclock_create(&msg.vclock);
+		xrow_decode_vclock(&xrow, &msg.vclock);
+		msg.replica = replica;
+		cbus_call(&wal_thread.tx_prio_pipe, &wal_thread.wal_pipe,
+			  &msg.base, tx_wal_relay_status, NULL,
+			  TIMEOUT_INFINITY);
+	}
+	if (diag_is_empty(&wal_relay->diag))
+		diag_move(&fiber()->diag, &wal_relay->diag);
+	fiber_cancel(wal_relay->writer_fiber);
+	ibuf_destroy(&ibuf);
+	return 0;
+
+}
+
+/**
+ * Message to control wal relay.
+ */
+struct wal_relay_start_msg {
+	struct cmsg base;
+	/** Stop condition. */
+	struct fiber_cond stop_cond;
+	/** Done status to protect against spurious wakeup. */
+	bool done;
+	/** Replica. */
+	struct replica *replica;
+	/** Replica known vclock. */
+	struct vclock *vclock;
+	/** Replica socket. */
+	int fd;
+	/** Diagnostic area. */
+	struct diag diag;
+};
+
+/**
+ * Handler to inform when wal relay is exited.
+ */
+static void
+wal_relay_done(struct cmsg *base)
+{
+	struct wal_relay_start_msg *msg;
+	msg = container_of(base, struct wal_relay_start_msg, base);
+	msg->done = true;
+	fiber_cond_signal(&msg->stop_cond);
+}
+
+/**
+ * Helper to send wal relay done message.
+ */
+static void
+wal_relay_done_send(struct wal_relay_start_msg *msg)
+{
+	static struct cmsg_hop done_route[] = {
+		{wal_relay_done, NULL}
+	};
+	/*
+	 * Because of complicated cbus routing and fiber_start behavior
+	 * message could be still in cbus processing, so let cbus finish
+	 * with it.
+	 */
+	fiber_reschedule();
+	cmsg_init(&msg->base, done_route);
+	diag_move(&fiber()->diag, &msg->diag);
+	cpipe_push(&wal_thread.tx_prio_pipe, &msg->base);
+}
+
+/**
+ * Wal relay writer fiber.
+ */
+static int
+wal_relay_f(va_list ap)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+
+	struct wal_relay_start_msg *msg = va_arg(ap, struct wal_relay_start_msg *);
+	struct wal_relay wal_relay;
+	memset(&wal_relay, 0, sizeof(wal_relay));
+	rlist_add(&wal_thread.relay, &wal_relay.item);
+	wal_relay.writer = writer;
+	wal_relay.replica = msg->replica;
+	wal_relay.fd = msg->fd;
+	vclock_copy(&wal_relay.vclock, msg->vclock);
+	vclock_copy(&wal_relay.vclock_at_subscribe, &writer->vclock);
+	diag_create(&wal_relay.diag);
+	wal_relay.writer_fiber = fiber();
+	wal_relay.reader_fiber = fiber_new("relay_status", wal_relay_reader_f);
+	if (wal_relay.reader_fiber == NULL)
+		goto done;
+	fiber_set_joinable(wal_relay.reader_fiber, true);
+	fiber_start(wal_relay.reader_fiber, &wal_relay);
+
+	/* Open a new chunk to separate logs before subscribe. */
+	if (wal_mem_index_new(writer, wal_mem_index_last(writer)->buf_no,
+			      &writer->vclock) == NULL)
+		goto done;
+
+	if (wal_relay_heartbeat(msg->fd) < 0)
+		goto done;
+
+	while (!fiber_is_cancelled(fiber())) {
+		if (wal_relay_recover_mem(&wal_relay) < 0)
+			break;
+		if (wal_relay_recover_file(&wal_relay) < 0)
+			break;
+	}
+
+done:
+	if (diag_is_empty(&fiber()->diag))
+		diag_move(&wal_relay.diag, &fiber()->diag);
+	fiber_cancel(wal_relay.reader_fiber);
+	fiber_join(wal_relay.reader_fiber);
+
+	rlist_del(&wal_relay.item);
+	vclock_copy(msg->vclock, &wal_relay.vclock);
+
+	wal_relay_done_send(msg);
+	return 0;
+}
+
+/**
+ * Start a wal relay in a wal thread.
+ */
+static void
+wal_relay_start(struct cmsg *base)
+{
+	struct wal_relay_start_msg *msg;
+	msg = container_of(base, struct wal_relay_start_msg, base);
+
+	struct fiber *writer_fiber = fiber_new("wal_relay", wal_relay_f);
+	if (writer_fiber == NULL)
+		return wal_relay_done_send(msg);
+	fiber_start(writer_fiber, msg);
+}
+
+/**
+ * Start a wal relay.
+ */
+int
+wal_relay(struct replica *replica, struct vclock *vclock, int fd)
+{
+	/*
+	 * Send wal relay start to wal thread and then wait for a
+	 * finish condition.
+	 * We are not able to do that job in synchronous manner with
+	 * cbus_call and fiber join because wal thread has no fiber pool
+	 * and then cbus handler not allowed to yield.
+	 */
+	struct wal_relay_start_msg msg;
+	memset(&msg, 0, sizeof(msg));
+	msg.vclock = vclock;
+	msg.fd = fd;
+	msg.replica = replica;
+	msg.done = false;
+	fiber_cond_create(&msg.stop_cond);
+	diag_create(&msg.diag);
+	static struct cmsg_hop start_route[] = {
+		{wal_relay_start, NULL}};
+	cmsg_init(&msg.base, start_route);
+	cpipe_push(&wal_thread.wal_pipe, &msg.base);
+	while (!msg.done)
+		fiber_cond_wait(&msg.stop_cond);
+	if (!diag_is_empty(&msg.diag))
+		diag_move(&msg.diag, &fiber()->diag);
+	return diag_is_empty(&fiber()->diag) ? 0: -1;
+}
 
 /**
  * After fork, the WAL writer thread disappears.
diff --git a/src/box/wal.h b/src/box/wal.h
index a9452f2bd..6e38da403 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -40,6 +40,7 @@
 struct fiber;
 struct wal_writer;
 struct tt_uuid;
+struct replica;
 
 enum wal_mode { WAL_NONE = 0, WAL_WRITE, WAL_FSYNC, WAL_MODE_MAX };
 
@@ -232,6 +233,12 @@ wal_write_vy_log(struct journal_entry *req);
 void
 wal_rotate_vy_log();
 
+/**
+ * Relay wal rows from vclock into fd.
+ */
+int
+wal_relay(struct replica *replica, struct vclock *vclock, int fd);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index e9ee6b0c8..f7678ff53 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -92,6 +92,19 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
 	xrow_header_decode_xc(row, (const char **) &in->rpos, in->rpos + len);
 }
 
+int
+coio_read_xrow_timeout(int fd, struct ibuf *in,
+		       struct xrow_header *row, ev_tstamp timeout)
+{
+	struct ev_io io;
+	coio_create(&io, fd);
+	try {
+		coio_read_xrow_timeout_xc(&io, in, row, timeout);
+	} catch(...) {
+		return -1;
+	}
+	return 0;
+}
 
 void
 coio_write_xrow(struct ev_io *coio, const struct xrow_header *row)
diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h
index 0eb7a8ace..04990eec3 100644
--- a/src/box/xrow_io.h
+++ b/src/box/xrow_io.h
@@ -45,6 +45,10 @@ void
 coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
 			  struct xrow_header *row, double timeout);
 
+int
+coio_read_xrow_timeout(int fd, struct ibuf *in,
+		       struct xrow_header *row, double timeout);
+
 void
 coio_write_xrow(struct ev_io *coio, const struct xrow_header *row);
 
diff --git a/src/errinj.h b/src/errinj.h
index 39de63d19..a2e8e95eb 100644
--- a/src/errinj.h
+++ b/src/errinj.h
@@ -123,6 +123,7 @@ struct errinj {
 	_(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT, {.iparam = -1}) \
 	_(ERRINJ_WAL_BREAK_LSN, ERRINJ_INT, {.iparam = -1}) \
 	_(ERRINJ_VY_COMPACTION_DELAY, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_WAL_RELAY_DISABLE_MEM, ERRINJ_BOOL, {.bparam = false}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 12303670e..5f025eea7 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -36,6 +36,8 @@ errinj.info()
     state: false
   ERRINJ_VYRUN_INDEX_GARBAGE:
     state: false
+  ERRINJ_WAL_RELAY_DISABLE_MEM:
+    state: false
   ERRINJ_VY_DELAY_PK_LOOKUP:
     state: false
   ERRINJ_VY_TASK_COMPLETE:
diff --git a/test/replication/force_recovery.result b/test/replication/force_recovery.result
index f50452858..fb9fac562 100644
--- a/test/replication/force_recovery.result
+++ b/test/replication/force_recovery.result
@@ -58,6 +58,13 @@ fio.unlink(xlog)
 ---
 - true
 ...
+errinj = box.error.injection
+---
+...
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
+---
+- ok
+...
 -- Check that even though box.cfg.force_recovery is set,
 -- replication will still fail due to LSN gap.
 box.cfg{force_recovery = true}
@@ -102,6 +109,10 @@ test_run:cmd("delete server test")
 test_run:cleanup_cluster()
 ---
 ...
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", false)
+---
+- ok
+...
 box.schema.user.revoke('guest', 'replication')
 ---
 ...
diff --git a/test/replication/force_recovery.test.lua b/test/replication/force_recovery.test.lua
index 54307814b..6c4a7f983 100644
--- a/test/replication/force_recovery.test.lua
+++ b/test/replication/force_recovery.test.lua
@@ -23,6 +23,8 @@ box.space.test:replace{1}
 box.snapshot()
 box.space.test:replace{2}
 fio.unlink(xlog)
+errinj = box.error.injection
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
 
 -- Check that even though box.cfg.force_recovery is set,
 -- replication will still fail due to LSN gap.
@@ -39,5 +41,6 @@ test_run:cmd("stop server test")
 test_run:cmd("cleanup server test")
 test_run:cmd("delete server test")
 test_run:cleanup_cluster()
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", false)
 box.schema.user.revoke('guest', 'replication')
 box.space.test:drop()
diff --git a/test/replication/replica_rejoin.result b/test/replication/replica_rejoin.result
index 87d626e20..62684e47c 100644
--- a/test/replication/replica_rejoin.result
+++ b/test/replication/replica_rejoin.result
@@ -210,6 +210,13 @@ fio = require('fio')
 box.cfg{checkpoint_count = checkpoint_count}
 ---
 ...
+errinj = box.error.injection
+---
+...
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
+---
+- ok
+...
 test_run:cmd("start server replica")
 ---
 - true
diff --git a/test/replication/replica_rejoin.test.lua b/test/replication/replica_rejoin.test.lua
index 9bf43eff8..40de0fc98 100644
--- a/test/replication/replica_rejoin.test.lua
+++ b/test/replication/replica_rejoin.test.lua
@@ -78,6 +78,8 @@ for i = 1, 3 do box.space.test:insert{i * 100} end
 fio = require('fio')
 #fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.xlog')) -- 1
 box.cfg{checkpoint_count = checkpoint_count}
+errinj = box.error.injection
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
 test_run:cmd("start server replica")
 test_run:cmd("switch replica")
 box.info.status -- orphan
diff --git a/test/replication/show_error_on_disconnect.result b/test/replication/show_error_on_disconnect.result
index af082203b..072739fb0 100644
--- a/test/replication/show_error_on_disconnect.result
+++ b/test/replication/show_error_on_disconnect.result
@@ -30,6 +30,13 @@ test_run:cmd("switch master_quorum2")
 ---
 - true
 ...
+errinj = box.error.injection
+---
+...
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
+---
+- ok
+...
 box.space.test:insert{1}
 ---
 - [1]
diff --git a/test/replication/show_error_on_disconnect.test.lua b/test/replication/show_error_on_disconnect.test.lua
index 40e9dbc5e..cdae48416 100644
--- a/test/replication/show_error_on_disconnect.test.lua
+++ b/test/replication/show_error_on_disconnect.test.lua
@@ -13,6 +13,8 @@ test_run:cmd("switch master_quorum1")
 repl = box.cfg.replication
 box.cfg{replication = ""}
 test_run:cmd("switch master_quorum2")
+errinj = box.error.injection
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
 box.space.test:insert{1}
 box.snapshot()
 box.space.test:insert{2}
diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result
index a6b577dbc..c920f0c3d 100644
--- a/test/vinyl/layout.result
+++ b/test/vinyl/layout.result
@@ -188,52 +188,42 @@ result
         BODY:
           tuple: [11, {}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [7, {2: 5}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [7, {2: 4}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [4, {0: 2, 2: 10}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [5, {0: 2, 2: 10, 9: 14}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [8, {1: 3, 2: 10, 8: 11}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [10, {0: 2, 9: 14}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [4, {2: 12}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [5, {2: 12, 9: 14}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [8, {1: 1, 2: 12, 8: 13}]
       - HEADER:
-          timestamp: <timestamp>
           type: INSERT
         BODY:
           tuple: [10, {9: 14}]
diff --git a/test/xlog/panic_on_wal_error.result b/test/xlog/panic_on_wal_error.result
index 22f14f912..e033c3046 100644
--- a/test/xlog/panic_on_wal_error.result
+++ b/test/xlog/panic_on_wal_error.result
@@ -121,6 +121,14 @@ box.cfg.force_recovery
 -- try to start the replica, ha-ha
 -- (replication should fail, some rows are missing)
 --
+--
+errinj = box.error.injection
+---
+...
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
+---
+- ok
+...
 test_run:cmd("start server replica")
 ---
 - true
@@ -161,6 +169,10 @@ test_run:cmd("cleanup server replica")
 ---
 - true
 ...
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", false)
+---
+- ok
+...
 --
 -- cleanup
 box.space.test:drop()
diff --git a/test/xlog/panic_on_wal_error.test.lua b/test/xlog/panic_on_wal_error.test.lua
index 2e95431c6..b1884fe22 100644
--- a/test/xlog/panic_on_wal_error.test.lua
+++ b/test/xlog/panic_on_wal_error.test.lua
@@ -57,6 +57,9 @@ box.cfg.force_recovery
 -- try to start the replica, ha-ha
 -- (replication should fail, some rows are missing)
 --
+--
+errinj = box.error.injection
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", true)
 test_run:cmd("start server replica")
 test_run:cmd("switch replica")
 fiber = require('fiber')
@@ -69,6 +72,7 @@ box.space.test:select{}
 test_run:cmd("switch default")
 test_run:cmd("stop server replica")
 test_run:cmd("cleanup server replica")
+errinj.set("ERRINJ_WAL_RELAY_DISABLE_MEM", false)
 --
 -- cleanup
 box.space.test:drop()
-- 
2.20.1

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [tarantool-patches] [PATCH] Relay logs from wal thread
  2019-01-22 14:18 [tarantool-patches] [PATCH] Relay logs from wal thread Georgy Kirichenko
@ 2019-01-29  8:53 ` Vladimir Davydov
  2019-01-29  9:49   ` Георгий Кириченко
  2019-02-08 17:09   ` [tarantool-patches] " Konstantin Osipov
  0 siblings, 2 replies; 7+ messages in thread
From: Vladimir Davydov @ 2019-01-29  8:53 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Jan 22, 2019 at 05:18:36PM +0300, Georgy Kirichenko wrote:
> Use wal thread to relay rows. Relay writer and reader fibers live
> in a wal thread. Writer fiber uses wal memory to send rows to the
> peer when it possible or spawns cords to recover rows from a file.
> 
> Wal writer stores rows into two memory buffers swapping by buffer
> threshold. Memory buffers are splitted into chunks with the
> same server id issued by chunk threshold. In order to search
> position in wal memory all chunks indexed by wal_mem_index
> array. Each wal_mem_index contains corresponding replica_id and
> vclock just before first row in the chunk as well as chunk buffer
> number, position and size of the chunk in memory.
> 
> Closes: #3794

It's too early to comment on the code. Here are some general issues that
I think need to be addressed first:

 - Moving relay code to WAL looks ugly as it turns the code into a messy
   bundle. Let's try to hide relay behind an interface. Currently, we
   only need one method - write() - but in future we might need more,
   for instance prepare() or commit(). May be, we could reuse journal
   struct for this?

 - Memory index looks confusing to me. Why would we need it at all?
   AFAIU it is only used to quickly position relay after switching to
   memory buffer, but it's not a big deal to linearly scan the buffer
   instead when that happens - it's a rare event and scanning memory
   should be fast enough. At least, we should start with it, because
   that would be clear and simple, and build any indexing later in
   separate patches provided we realize that we really need it, which I
   doubt.

 - We shouldn't stop/start relay thread when switching to/from memory
   source. Instead on SUBSCRIBE we should always start a relay thread,
   as we do now, and implement some machinery to suspend/resume it and
   switch to memory/disk when it falls in/out the memory buffer. All
   that machinery should live in relay.cc.

 - When in sync mode, WAL should write directly to the relay socket,
   without using any intermediate buffer. Buffer should only be used if
   the socket is blocked or the relay is running in asynchronous mode.
   There's no need in extra memory copying otherwise.

 - Maintaining a single memory buffer in WAL would complicate relay sync
   interface implementation. May be, we could use a separate buffer per
   each relay? That would be less efficient, obviously, because it would
   mean extra memory copying and usage, but that would simplify the code
   a great deal. Besides, we would need to use the buffers only for
   async relays - once a relay is switched to sync mode, we could write
   to its socket directly. So I think we should at least consider this
   option.

 - You removed the code that tried to advance GC consumers as rarely as
   possible, namely only when a WAL file is closed. I'd try to save it
   somehow, because without it every ACK would result in GC going to WAL
   if relay is running in async mode (because only WAL knows about WAL
   files; for others it's just a continuous stream). When a relay is
   running in sync mode, advancing GC on each ACK seems to be OK,
   because WAL won't be invoked then (as we never remove WAL files
   created after the last checkpoint).

 - I really don't like code duplication. I think we should reuse the
   code used for sending rows and receiving ACKs between sync and sync
   modes. Hiding sync relay implementation behind an interface would
   allow us to do that.

 - I don't see how this patch depends on #1025 (sending rows in
   batches). Let's implement relaying from memory first and do #1025
   later, when we see the full picture.

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [tarantool-patches] [PATCH] Relay logs from wal thread
  2019-01-29  8:53 ` Vladimir Davydov
@ 2019-01-29  9:49   ` Георгий Кириченко
  2019-01-29 10:44     ` Vladimir Davydov
  2019-02-08 17:09   ` [tarantool-patches] " Konstantin Osipov
  1 sibling, 1 reply; 7+ messages in thread
From: Георгий Кириченко @ 2019-01-29  9:49 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 5112 bytes --]

On Tuesday, January 29, 2019 11:53:18 AM MSK Vladimir Davydov wrote:
> On Tue, Jan 22, 2019 at 05:18:36PM +0300, Georgy Kirichenko wrote:
> > Use wal thread to relay rows. Relay writer and reader fibers live
> > in a wal thread. Writer fiber uses wal memory to send rows to the
> > peer when it possible or spawns cords to recover rows from a file.
> > 
> > Wal writer stores rows into two memory buffers swapping by buffer
> > threshold. Memory buffers are splitted into chunks with the
> > same server id issued by chunk threshold. In order to search
> > position in wal memory all chunks indexed by wal_mem_index
> > array. Each wal_mem_index contains corresponding replica_id and
> > vclock just before first row in the chunk as well as chunk buffer
> > number, position and size of the chunk in memory.
> > 
> > Closes: #3794
> 
> It's too early to comment on the code. Here are some general issues that
> I think need to be addressed first:
> 
>  - Moving relay code to WAL looks ugly as it turns the code into a messy
>    bundle. Let's try to hide relay behind an interface. Currently, we
>    only need one method - write() - but in future we might need more,
>    for instance prepare() or commit(). May be, we could reuse journal
>    struct for this?
It was done because WAL should control relays' ACK in case of synchronous 
replication. So, in likely case forwarding wals and reading ACK should be done
in wal thread. And if I left this code in a relay then wal-relay communication 
would be to complicated (expose internal structures like sockets, memory 
buffers, vclock and etc). Also I planned some future changes to move all wal-
processing logic into a wal.c to.
> 
>  - Memory index looks confusing to me. Why would we need it at all?
>    AFAIU it is only used to quickly position relay after switching to
>    memory buffer, but it's not a big deal to linearly scan the buffer
>    instead when that happens - it's a rare event and scanning memory
>    should be fast enough. At least, we should start with it, because
>    that would be clear and simple, and build any indexing later in
>    separate patches provided we realize that we really need it, which I
>    doubt.
It allows no to decode all logs as many times as we have relays connected to. 
Also it eliminates following of corresponding vclocks for each row.
> 
>  - We shouldn't stop/start relay thread when switching to/from memory
>    source. Instead on SUBSCRIBE we should always start a relay thread,
>    as we do now, and implement some machinery to suspend/resume it and
>    switch to memory/disk when it falls in/out the memory buffer.
In a good case (we do not have any outdated replicas) we do not even give any 
chance for relay thread to work.
>    All that machinery should live in relay.cc.
This leads to to complicated dependencies between wal and relay. AT least wal 
should expose it internal structures into a header file.

> 
>  - When in sync mode, WAL should write directly to the relay socket,
>    without using any intermediate buffer. Buffer should only be used if
>    the socket is blocked or the relay is running in asynchronous mode.
>    There's no need in extra memory copying otherwise.
Please explain why. I do not hing this can improve something. Also i think 
that this memory buffer should be used for writing files to.
In any case we should have a memory buffer to store wals because you do not 
know will relay block or not. But the biggest issue is to transfer state from 
just blocked sync relay to detached from memory reader.
> 
>  - Maintaining a single memory buffer in WAL would complicate relay sync
>    interface implementation. May be, we could use a separate buffer per
>    each relay? That would be less efficient, obviously, because it would
>    mean extra memory copying and usage, but that would simplify the code
>    a great deal. Besides, we would need to use the buffers only for
>    async relays - once a relay is switched to sync mode, we could write
>    to its socket directly. So I think we should at least consider this
>    option.
> 
>  - You removed the code that tried to advance GC consumers as rarely as
>    possible, namely only when a WAL file is closed. I'd try to save it
>    somehow, because without it every ACK would result in GC going to WAL
>    if relay is running in async mode (because only WAL knows about WAL
>    files; for others it's just a continuous stream). When a relay is
>    running in sync mode, advancing GC on each ACK seems to be OK,
>    because WAL won't be invoked then (as we never remove WAL files
>    created after the last checkpoint).
I think, we should move wal gc into a wal thread.
> 
>  - I really don't like code duplication. I think we should reuse the
>    code used for sending rows and receiving ACKs between sync and sync
>    modes. Hiding sync relay implementation behind an interface would
>    allow us to do that.
> 
>  - I don't see how this patch depends on #1025 (sending rows in
>    batches). Let's implement relaying from memory first and do #1025
>    later, when we see the full picture.


[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [tarantool-patches] [PATCH] Relay logs from wal thread
  2019-01-29  9:49   ` Георгий Кириченко
@ 2019-01-29 10:44     ` Vladimir Davydov
  2019-01-31  7:15       ` Георгий Кириченко
  0 siblings, 1 reply; 7+ messages in thread
From: Vladimir Davydov @ 2019-01-29 10:44 UTC (permalink / raw)
  To: Георгий
	Кириченко
  Cc: tarantool-patches

On Tue, Jan 29, 2019 at 12:49:53PM +0300, Георгий Кириченко wrote:
> On Tuesday, January 29, 2019 11:53:18 AM MSK Vladimir Davydov wrote:
> > On Tue, Jan 22, 2019 at 05:18:36PM +0300, Georgy Kirichenko wrote:
> > > Use wal thread to relay rows. Relay writer and reader fibers live
> > > in a wal thread. Writer fiber uses wal memory to send rows to the
> > > peer when it possible or spawns cords to recover rows from a file.
> > > 
> > > Wal writer stores rows into two memory buffers swapping by buffer
> > > threshold. Memory buffers are splitted into chunks with the
> > > same server id issued by chunk threshold. In order to search
> > > position in wal memory all chunks indexed by wal_mem_index
> > > array. Each wal_mem_index contains corresponding replica_id and
> > > vclock just before first row in the chunk as well as chunk buffer
> > > number, position and size of the chunk in memory.
> > > 
> > > Closes: #3794
> > 
> > It's too early to comment on the code. Here are some general issues that
> > I think need to be addressed first:
> > 
> >  - Moving relay code to WAL looks ugly as it turns the code into a messy
> >    bundle. Let's try to hide relay behind an interface. Currently, we
> >    only need one method - write() - but in future we might need more,
> >    for instance prepare() or commit(). May be, we could reuse journal
> >    struct for this?
> It was done because WAL should control relays' ACK in case of synchronous 
> replication. So, in likely case forwarding wals and reading ACK should be done
> in wal thread.

I don't deny that.

> And if I left this code in a relay then wal-relay communication 
> would be to complicated (expose internal structures like sockets, memory 
> buffers, vclock and etc).

Not necessarily. You could hide it all behind a vtab. For now, vtab
would be trivial - we only need write() virtual method (ACKs can be
received by a fiber started by this method when relay switches to sync).
Later on, we could extend it with methods that would allow WAL to wait
for remote commit.

> Also I planned some future changes to move all wal-
> processing logic into a wal.c to.

Please don't. We try to split the code into modules so that it's easier
for understanding. Mixing relay/recovery/WAL in one module doesn't sound
like a good idea to me.

> > 
> >  - Memory index looks confusing to me. Why would we need it at all?
> >    AFAIU it is only used to quickly position relay after switching to
> >    memory buffer, but it's not a big deal to linearly scan the buffer
> >    instead when that happens - it's a rare event and scanning memory
> >    should be fast enough. At least, we should start with it, because
> >    that would be clear and simple, and build any indexing later in
> >    separate patches provided we realize that we really need it, which I
> >    doubt.
> It allows no to decode all logs as many times as we have relays connected to. 

I don't understand. When relay is in sync, we should be simply writing a
stream of rows to a socket. No decoding or positioning should be
necessary. Am I missing something?

> Also it eliminates following of corresponding vclocks for each row.

But you know the vclock of each row in WAL and you can pass it to relay.
No reason to decode anything AFAIU.

> > 
> >  - We shouldn't stop/start relay thread when switching to/from memory
> >    source. Instead on SUBSCRIBE we should always start a relay thread,
> >    as we do now, and implement some machinery to suspend/resume it and
> >    switch to memory/disk when it falls in/out the memory buffer.
> In a good case (we do not have any outdated replicas) we do not even give any 
> chance for relay thread to work.

Starting/stopping relay thread on every lag looks just ugly. We have to
start it anyway when a replica subscribes (because it's highly unlikely
to fall in the in-memory buffer). I don't see any reason to stop it once
we are in sync - we can switch to async anytime.

> >    All that machinery should live in relay.cc.
> This leads to to complicated dependencies between wal and relay. AT least wal 
> should expose it internal structures into a header file.

How's that? WAL should only know about write() method. Relay registers
the method in WAL. Everything else is done by relay, including switching
between sync and async mode.

> 
> > 
> >  - When in sync mode, WAL should write directly to the relay socket,
> >    without using any intermediate buffer. Buffer should only be used if
> >    the socket is blocked or the relay is running in asynchronous mode.
> >    There's no need in extra memory copying otherwise.
> Please explain why. I do not hing this can improve something. Also i think 

That would eliminate extra memory copying in the best case scenario.

> that this memory buffer should be used for writing files to.
> In any case we should have a memory buffer to store wals because you do not 
> know will relay block or not. But the biggest issue is to transfer state from 

As I suggested below, let's try to maintain a buffer per each relay, not
a single buffer in WAL. Then we wouldn't need to write anything to
memory buffers if all relays are in sync and no socket is blocked. This
is the best case scenario, but we should converge to it pretty quickly
provided the network doesn't lag much. This would also allow us to hide
this logic in relay.cc.

> just blocked sync relay to detached from memory reader.
> > 
> >  - Maintaining a single memory buffer in WAL would complicate relay sync
> >    interface implementation. May be, we could use a separate buffer per
> >    each relay? That would be less efficient, obviously, because it would
> >    mean extra memory copying and usage, but that would simplify the code
> >    a great deal. Besides, we would need to use the buffers only for
> >    async relays - once a relay is switched to sync mode, we could write
> >    to its socket directly. So I think we should at least consider this
> >    option.
> > 
> >  - You removed the code that tried to advance GC consumers as rarely as
> >    possible, namely only when a WAL file is closed. I'd try to save it
> >    somehow, because without it every ACK would result in GC going to WAL
> >    if relay is running in async mode (because only WAL knows about WAL
> >    files; for others it's just a continuous stream). When a relay is
> >    running in sync mode, advancing GC on each ACK seems to be OK,
> >    because WAL won't be invoked then (as we never remove WAL files
> >    created after the last checkpoint).
> I think, we should move wal gc into a wal thread.

You can try, but until then I think we should try to leave that code
intact.

> > 
> >  - I really don't like code duplication. I think we should reuse the
> >    code used for sending rows and receiving ACKs between sync and sync
> >    modes. Hiding sync relay implementation behind an interface would
> >    allow us to do that.
> > 
> >  - I don't see how this patch depends on #1025 (sending rows in
> >    batches). Let's implement relaying from memory first and do #1025
> >    later, when we see the full picture.
> 

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [tarantool-patches] [PATCH] Relay logs from wal thread
  2019-01-29 10:44     ` Vladimir Davydov
@ 2019-01-31  7:15       ` Георгий Кириченко
  2019-01-31  8:01         ` Vladimir Davydov
  0 siblings, 1 reply; 7+ messages in thread
From: Георгий Кириченко @ 2019-01-31  7:15 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 9970 bytes --]

On Tuesday, January 29, 2019 1:44:16 PM MSK Vladimir Davydov wrote:
> On Tue, Jan 29, 2019 at 12:49:53PM +0300, Георгий Кириченко wrote:
> > On Tuesday, January 29, 2019 11:53:18 AM MSK Vladimir Davydov wrote:
> > > On Tue, Jan 22, 2019 at 05:18:36PM +0300, Georgy Kirichenko wrote:
> > > > Use wal thread to relay rows. Relay writer and reader fibers live
> > > > in a wal thread. Writer fiber uses wal memory to send rows to the
> > > > peer when it possible or spawns cords to recover rows from a file.
> > > > 
> > > > Wal writer stores rows into two memory buffers swapping by buffer
> > > > threshold. Memory buffers are splitted into chunks with the
> > > > same server id issued by chunk threshold. In order to search
> > > > position in wal memory all chunks indexed by wal_mem_index
> > > > array. Each wal_mem_index contains corresponding replica_id and
> > > > vclock just before first row in the chunk as well as chunk buffer
> > > > number, position and size of the chunk in memory.
> > > > 
> > > > Closes: #3794
> > > 
> > > It's too early to comment on the code. Here are some general issues that
> > > 
> > > I think need to be addressed first:
> > >  - Moving relay code to WAL looks ugly as it turns the code into a messy
> > >  
> > >    bundle. Let's try to hide relay behind an interface. Currently, we
> > >    only need one method - write() - but in future we might need more,
> > >    for instance prepare() or commit(). May be, we could reuse journal
> > >    struct for this?
> > 
> > It was done because WAL should control relays' ACK in case of synchronous
> > replication. So, in likely case forwarding wals and reading ACK should be
> > done in wal thread.
> 
> I don't deny that.
> 
> > And if I left this code in a relay then wal-relay communication
> > would be to complicated (expose internal structures like sockets, memory
> > buffers, vclock and etc).
> 
> Not necessarily. You could hide it all behind a vtab. For now, vtab
> would be trivial - we only need write() virtual method (ACKs can be
> received by a fiber started by this method when relay switches to sync).
> Later on, we could extend it with methods that would allow WAL to wait
> for remote commit.
I do not think wal should wait for remote commit but relay should inform wal 
about state. For example wal should track state of an relay even if the relay 
is reading from file in a different cord (and might exit at any time).
So we also add some v-functions to request memory chunks, promote in-wal relay 
vclock or attach to sync mode as you suggested.
> 
> > Also I planned some future changes to move all wal-
> > processing logic into a wal.c to.
> 
> Please don't. We try to split the code into modules so that it's easier
> for understanding. Mixing relay/recovery/WAL in one module doesn't sound
> like a good idea to me.
> 
> > >  - Memory index looks confusing to me. Why would we need it at all?
> > >  
> > >    AFAIU it is only used to quickly position relay after switching to
> > >    memory buffer, but it's not a big deal to linearly scan the buffer
> > >    instead when that happens - it's a rare event and scanning memory
> > >    should be fast enough. At least, we should start with it, because
> > >    that would be clear and simple, and build any indexing later in
> > >    separate patches provided we realize that we really need it, which I
> > >    doubt.
> > 
> > It allows no to decode all logs as many times as we have relays connected
> > to.
> I don't understand. When relay is in sync, we should be simply writing a
> stream of rows to a socket. No decoding or positioning should be
> necessary. Am I missing something?
If relay blocks for some time then you stop writing and then resume reading 
from some position relay has reached before and this position could be 
identified only by vclock because memory buffer might be reallocated at any time 
when relay sleeps.
Also, lets imagine that switching from relay to wal needs dT time in which wal 
writes N rows. So, if you do not store logs in memory then you would not be 
able to switch this relay in a sync mode. So we should have some rows backed 
in memory and allow relays to navigate in it. 

Other cause why I rejected direct writing to relay - in that case wal would be 
in charge to track relays' state, for me it looks ugly.
> 
> > Also it eliminates following of corresponding vclocks for each row.
> 
> But you know the vclock of each row in WAL and you can pass it to relay.
> No reason to decode anything AFAIU.
I know it only when i write entry to wal, if relay yielded for some time - the 
only possibility is to store memory buffers' starting vclock and then decode 
each row one-by-one to follow vclock.
> 
> > >  - We shouldn't stop/start relay thread when switching to/from memory
> > >  
> > >    source. Instead on SUBSCRIBE we should always start a relay thread,
> > >    as we do now, and implement some machinery to suspend/resume it and
> > >    switch to memory/disk when it falls in/out the memory buffer.
> > 
> > In a good case (we do not have any outdated replicas) we do not even give
> > any chance for relay thread to work.
> 
> Starting/stopping relay thread on every lag looks just ugly. We have to
> start it anyway when a replica subscribes (because it's highly unlikely
> to fall in the in-memory buffer). I don't see any reason to stop it once
> we are in sync - we can switch to async anytime.
Relay cord do not started on every lag but only if relay gone out of from the 
wal memory buffer. The buffer should be large enough to compensate all hardware 
fluctuations, and this grants that any replica able to process all rows from 
master would live in a wal memory without spawning threads.
> 
> > >    All that machinery should live in relay.cc.
> > 
> > This leads to to complicated dependencies between wal and relay. AT least
> > wal should expose it internal structures into a header file.
> 
> How's that? WAL should only know about write() method. Relay registers
> the method in WAL. Everything else is done by relay, including switching
> between sync and async mode.
Write method is not enough at all.
> 
> > >  - When in sync mode, WAL should write directly to the relay socket,
> > >  
> > >    without using any intermediate buffer. Buffer should only be used if
> > >    the socket is blocked or the relay is running in asynchronous mode.
> > >    There's no need in extra memory copying otherwise.
> > 
> > Please explain why. I do not hing this can improve something. Also i think
> 
> That would eliminate extra memory copying in the best case scenario.
Let me explain - we will form in-memory data that will be used for both to-file 
writes and as a relay source.
> 
> > that this memory buffer should be used for writing files to.
> > In any case we should have a memory buffer to store wals because you do
> > not
> > know will relay block or not. But the biggest issue is to transfer state
> > from
> As I suggested below, let's try to maintain a buffer per each relay, not
> a single buffer in WAL. Then we wouldn't need to write anything to
> memory buffers if all relays are in sync and no socket is blocked. This
> is the best case scenario, but we should converge to it pretty quickly
> provided the network doesn't lag much. This would also allow us to hide
> this logic in relay.cc.
Above I described why it is not possible to not to have wal memory buffer - 
with constant load relay would not be able to attach to sync mode as you 
suggested. So we have to maintain wal memory buffer.
There are some situations in that you model would not be functional.
For example if wal writes a lot (more than tcp write buffer in a some time 
slot) so each call will block socket and push relay sync down.
And if we maintain per relay buffers so you we should copy data as much as 
relays we have, even for file-reading relays do let then chance to catch the 
wal.
> 
> > just blocked sync relay to detached from memory reader.
> > 
> > >  - Maintaining a single memory buffer in WAL would complicate relay sync
> > >  
> > >    interface implementation. May be, we could use a separate buffer per
> > >    each relay? That would be less efficient, obviously, because it would
> > >    mean extra memory copying and usage, but that would simplify the code
> > >    a great deal. Besides, we would need to use the buffers only for
> > >    async relays - once a relay is switched to sync mode, we could write
> > >    to its socket directly. So I think we should at least consider this
> > >    option.
> > >  
> > >  - You removed the code that tried to advance GC consumers as rarely as
> > >  
> > >    possible, namely only when a WAL file is closed. I'd try to save it
> > >    somehow, because without it every ACK would result in GC going to WAL
> > >    if relay is running in async mode (because only WAL knows about WAL
> > >    files; for others it's just a continuous stream). When a relay is
> > >    running in sync mode, advancing GC on each ACK seems to be OK,
> > >    because WAL won't be invoked then (as we never remove WAL files
> > >    created after the last checkpoint).
> > 
> > I think, we should move wal gc into a wal thread.
> 
> You can try, but until then I think we should try to leave that code
> intact.
> 
> > >  - I really don't like code duplication. I think we should reuse the
> > >  
> > >    code used for sending rows and receiving ACKs between sync and sync
> > >    modes. Hiding sync relay implementation behind an interface would
> > >    allow us to do that.
> > >  
> > >  - I don't see how this patch depends on #1025 (sending rows in
> > >  
> > >    batches). Let's implement relaying from memory first and do #1025
> > >    later, when we see the full picture.


[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [tarantool-patches] [PATCH] Relay logs from wal thread
  2019-01-31  7:15       ` Георгий Кириченко
@ 2019-01-31  8:01         ` Vladimir Davydov
  0 siblings, 0 replies; 7+ messages in thread
From: Vladimir Davydov @ 2019-01-31  8:01 UTC (permalink / raw)
  To: Георгий
	Кириченко
  Cc: tarantool-patches

On Thu, Jan 31, 2019 at 10:15:49AM +0300, Георгий Кириченко wrote:
> On Tuesday, January 29, 2019 1:44:16 PM MSK Vladimir Davydov wrote:
> > On Tue, Jan 29, 2019 at 12:49:53PM +0300, Георгий Кириченко wrote:
> > > On Tuesday, January 29, 2019 11:53:18 AM MSK Vladimir Davydov wrote:
> > > > On Tue, Jan 22, 2019 at 05:18:36PM +0300, Georgy Kirichenko wrote:
> > > > > Use wal thread to relay rows. Relay writer and reader fibers live
> > > > > in a wal thread. Writer fiber uses wal memory to send rows to the
> > > > > peer when it possible or spawns cords to recover rows from a file.
> > > > > 
> > > > > Wal writer stores rows into two memory buffers swapping by buffer
> > > > > threshold. Memory buffers are splitted into chunks with the
> > > > > same server id issued by chunk threshold. In order to search
> > > > > position in wal memory all chunks indexed by wal_mem_index
> > > > > array. Each wal_mem_index contains corresponding replica_id and
> > > > > vclock just before first row in the chunk as well as chunk buffer
> > > > > number, position and size of the chunk in memory.
> > > > > 
> > > > > Closes: #3794
> > > > 
> > > > It's too early to comment on the code. Here are some general issues that
> > > > 
> > > > I think need to be addressed first:
> > > >  - Moving relay code to WAL looks ugly as it turns the code into a messy
> > > >  
> > > >    bundle. Let's try to hide relay behind an interface. Currently, we
> > > >    only need one method - write() - but in future we might need more,
> > > >    for instance prepare() or commit(). May be, we could reuse journal
> > > >    struct for this?
> > > 
> > > It was done because WAL should control relays' ACK in case of synchronous
> > > replication. So, in likely case forwarding wals and reading ACK should be
> > > done in wal thread.
> > 
> > I don't deny that.
> > 
> > > And if I left this code in a relay then wal-relay communication
> > > would be to complicated (expose internal structures like sockets, memory
> > > buffers, vclock and etc).
> > 
> > Not necessarily. You could hide it all behind a vtab. For now, vtab
> > would be trivial - we only need write() virtual method (ACKs can be
> > received by a fiber started by this method when relay switches to sync).
> > Later on, we could extend it with methods that would allow WAL to wait
> > for remote commit.
> I do not think wal should wait for remote commit but relay should inform wal 
> about state. For example wal should track state of an relay even if the relay 
> is reading from file in a different cord (and might exit at any time).
> So we also add some v-functions to request memory chunks, promote in-wal relay 
> vclock or attach to sync mode as you suggested.

If we maintain a single memory buffer in WAL, then yes, I guess, we do.
I'd try to keep things simple though and use one buffer per relay, but
if you really think it's a no-go, then please try to design such a vtab
that will neatly hide all the necessary methods, because mixing relay
with WAL logic makes the code less modular and hence more difficult for
understanding.

> > 
> > > Also I planned some future changes to move all wal-
> > > processing logic into a wal.c to.
> > 
> > Please don't. We try to split the code into modules so that it's easier
> > for understanding. Mixing relay/recovery/WAL in one module doesn't sound
> > like a good idea to me.
> > 
> > > >  - Memory index looks confusing to me. Why would we need it at all?
> > > >  
> > > >    AFAIU it is only used to quickly position relay after switching to
> > > >    memory buffer, but it's not a big deal to linearly scan the buffer
> > > >    instead when that happens - it's a rare event and scanning memory
> > > >    should be fast enough. At least, we should start with it, because
> > > >    that would be clear and simple, and build any indexing later in
> > > >    separate patches provided we realize that we really need it, which I
> > > >    doubt.
> > > 
> > > It allows no to decode all logs as many times as we have relays connected
> > > to.
> > I don't understand. When relay is in sync, we should be simply writing a
> > stream of rows to a socket. No decoding or positioning should be
> > necessary. Am I missing something?
> If relay blocks for some time then you stop writing and then resume reading 
> from some position relay has reached before and this position could be 
> identified only by vclock because memory buffer might be reallocated at any time 
> when relay sleeps.
> Also, lets imagine that switching from relay to wal needs dT time in which wal 
> writes N rows. So, if you do not store logs in memory then you would not be 
> able to switch this relay in a sync mode. So we should have some rows backed 
> in memory and allow relays to navigate in it. 

I'm not arguing against maintaining a memory buffer (or buffers) -
obviously it's absolutely necessary for switching from async to sync.
I just think that optimizing for rare case scenarios, like switching
to/from memory, is pointless. Ideally, we should switch to sync mode
once and never ever return to async. If network lags cause the socket
to block frequently, we'd better never switch to sync (may be, add a
configuration option for that). That said, IMO linerarly scanning the
memory buffer when switching to sync is perfectly fine. If we find it
to be a problem, we can implement some kind of indexing later, in a
separate patch.

> 
> Other cause why I rejected direct writing to relay - in that case wal would be 
> in charge to track relays' state, for me it looks ugly.
> > 
> > > Also it eliminates following of corresponding vclocks for each row.
> > 
> > But you know the vclock of each row in WAL and you can pass it to relay.
> > No reason to decode anything AFAIU.
> I know it only when i write entry to wal, if relay yielded for some time - the 
> only possibility is to store memory buffers' starting vclock and then decode 
> each row one-by-one to follow vclock.
> > 
> > > >  - We shouldn't stop/start relay thread when switching to/from memory
> > > >  
> > > >    source. Instead on SUBSCRIBE we should always start a relay thread,
> > > >    as we do now, and implement some machinery to suspend/resume it and
> > > >    switch to memory/disk when it falls in/out the memory buffer.
> > > 
> > > In a good case (we do not have any outdated replicas) we do not even give
> > > any chance for relay thread to work.
> > 
> > Starting/stopping relay thread on every lag looks just ugly. We have to
> > start it anyway when a replica subscribes (because it's highly unlikely
> > to fall in the in-memory buffer). I don't see any reason to stop it once
> > we are in sync - we can switch to async anytime.
> Relay cord do not started on every lag but only if relay gone out of from the 
> wal memory buffer. The buffer should be large enough to compensate all hardware 
> fluctuations, and this grants that any replica able to process all rows from 
> master would live in a wal memory without spawning threads.

But it just looks ugly to start/stop it. Better use cbus to
suspend/resume IMO. We will need this channel anyway to switch
to sync so why not reuse it for switching to async. Shouldn't
be difficult to implement.

> > 
> > > >    All that machinery should live in relay.cc.
> > > 
> > > This leads to to complicated dependencies between wal and relay. AT least
> > > wal should expose it internal structures into a header file.
> > 
> > How's that? WAL should only know about write() method. Relay registers
> > the method in WAL. Everything else is done by relay, including switching
> > between sync and async mode.
> Write method is not enough at all.
> > 
> > > >  - When in sync mode, WAL should write directly to the relay socket,
> > > >  
> > > >    without using any intermediate buffer. Buffer should only be used if
> > > >    the socket is blocked or the relay is running in asynchronous mode.
> > > >    There's no need in extra memory copying otherwise.
> > > 
> > > Please explain why. I do not hing this can improve something. Also i think
> > 
> > That would eliminate extra memory copying in the best case scenario.
> Let me explain - we will form in-memory data that will be used for both to-file 
> writes and as a relay source.

On your branch there's xlog_write_entry, which writes rows to an xlog
file, and wal_encode_entry, which stores rows in a buffer. The latter
was introduced by your patch. The two functions are independent. That
is, you did introduce extra memory copying while you could write to
sockets directly. Not a big deal from performance pov, I guess, but this
still raises questions why it was done like that, why we can't write to
sockets directly, why we should first write to an in-memory buffer and
then suck in data in relay fibers. Looks kinda roundabout IMO.

> > 
> > > that this memory buffer should be used for writing files to.
> > > In any case we should have a memory buffer to store wals because you do
> > > not
> > > know will relay block or not. But the biggest issue is to transfer state
> > > from
> > As I suggested below, let's try to maintain a buffer per each relay, not
> > a single buffer in WAL. Then we wouldn't need to write anything to
> > memory buffers if all relays are in sync and no socket is blocked. This
> > is the best case scenario, but we should converge to it pretty quickly
> > provided the network doesn't lag much. This would also allow us to hide
> > this logic in relay.cc.
> Above I described why it is not possible to not to have wal memory buffer - 
> with constant load relay would not be able to attach to sync mode as you 
> suggested. So we have to maintain wal memory buffer.
> There are some situations in that you model would not be functional.
> For example if wal writes a lot (more than tcp write buffer in a some time 
> slot) so each call will block socket and push relay sync down.

I never argued against a memory buffer...

> And if we maintain per relay buffers so you we should copy data as much as 
> relays we have, even for file-reading relays do let then chance to catch the 
> wal.

Only in the worst case scenario when we're switching from async to sync.
As I said above, I don't think this should happen often. If it does, the
network is too slow for sync and the user should either tune tcp buffer
size or disable sync replication completely.

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [tarantool-patches] Re: [PATCH] Relay logs from wal thread
  2019-01-29  8:53 ` Vladimir Davydov
  2019-01-29  9:49   ` Георгий Кириченко
@ 2019-02-08 17:09   ` Konstantin Osipov
  1 sibling, 0 replies; 7+ messages in thread
From: Konstantin Osipov @ 2019-02-08 17:09 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Vladimir Davydov <vdavydov.dev@gmail.com> [19/01/29 11:57]:
> It's too early to comment on the code. Here are some general issues that
> I think need to be addressed first:
> 
>  - Moving relay code to WAL looks ugly as it turns the code into a messy
>    bundle. Let's try to hide relay behind an interface. Currently, we
>    only need one method - write() - but in future we might need more,
>    for instance prepare() or commit(). May be, we could reuse journal
>    struct for this?

Agree.

> 
>  - Memory index looks confusing to me. Why would we need it at all?
>    AFAIU it is only used to quickly position relay after switching to
>    memory buffer, but it's not a big deal to linearly scan the buffer
>    instead when that happens - it's a rare event and scanning memory
>    should be fast enough. At least, we should start with it, because
>    that would be clear and simple, and build any indexing later in
>    separate patches provided we realize that we really need it, which I
>    doubt.

I agree it could go in a separate patch. I disagree we need an
index: cheaper to have an index than to ensure it is never needed.

>  - We shouldn't stop/start relay thread when switching to/from memory
>    source. Instead on SUBSCRIBE we should always start a relay thread,
>    as we do now, and implement some machinery to suspend/resume it and
>    switch to memory/disk when it falls in/out the memory buffer. All
>    that machinery should live in relay.cc.

I agree. The relay thread should simply suspend itself when
switching to bsync.

>  - When in sync mode, WAL should write directly to the relay socket,
>    without using any intermediate buffer. Buffer should only be used if
>    the socket is blocked or the relay is running in asynchronous mode.
>    There's no need in extra memory copying otherwise.

This is not always possible. The socket doesn't have to be ready
all the time. I agree we could try to write to the socket first.
But we still need a buffer, and I think we need a single one, I
see no technical issue with having a single ring buffer with multiple
cursors. If a cursor falls behind the valid read position in the
buffer, it's invalidated and the relay is switched to async mode.
> 
>  - Maintaining a single memory buffer in WAL would complicate relay sync
>    interface implementation. May be, we could use a separate buffer per
>    each relay? That would be less efficient, obviously, because it would
>    mean extra memory copying and usage, but that would simplify the code
>    a great deal. Besides, we would need to use the buffers only for
>    async relays - once a relay is switched to sync mode, we could write
>    to its socket directly. So I think we should at least consider this
>    option.

Disagree, we have to solve the difficult problem of such buffer. If we have
multiple buffers, we have to solve it multiple times. Having a
single buffer for all replicas is equivalent to maintaining all
these replicas in a quorum - as soon as a replica position falls
behind valid read position in the buffer, it means it's out of the
sync replication quorum and our replication group has changed,
this should be reflected in the write ahead log.

>  - You removed the code that tried to advance GC consumers as rarely as
>    possible, namely only when a WAL file is closed. I'd try to save it
>    somehow, because without it every ACK would result in GC going to WAL
>    if relay is running in async mode (because only WAL knows about WAL
>    files; for others it's just a continuous stream). When a relay is
>    running in sync mode, advancing GC on each ACK seems to be OK,
>    because WAL won't be invoked then (as we never remove WAL files
>    created after the last checkpoint).

I agree.

>  - I really don't like code duplication. I think we should reuse the
>    code used for sending rows and receiving ACKs between sync and sync
>    modes. Hiding sync relay implementation behind an interface would
>    allow us to do that.

I agree.
> 
>  - I don't see how this patch depends on #1025 (sending rows in
>    batches). Let's implement relaying from memory first and do #1025
>    later, when we see the full picture.

I agree.

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2019-02-08 17:09 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-01-22 14:18 [tarantool-patches] [PATCH] Relay logs from wal thread Georgy Kirichenko
2019-01-29  8:53 ` Vladimir Davydov
2019-01-29  9:49   ` Георгий Кириченко
2019-01-29 10:44     ` Vladimir Davydov
2019-01-31  7:15       ` Георгий Кириченко
2019-01-31  8:01         ` Vladimir Davydov
2019-02-08 17:09   ` [tarantool-patches] " Konstantin Osipov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox