Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH 0/7] Replication: In-memory replication
@ 2019-08-13  6:27 Georgy Kirichenko
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 1/7] Refactoring: wal writer fiber and queue Georgy Kirichenko
                   ` (7 more replies)
  0 siblings, 8 replies; 18+ messages in thread
From: Georgy Kirichenko @ 2019-08-13  6:27 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

This patchset introduces from-wal-memory-replication and consist of 4
preparation patches, one patch that introduces memory-replication and 2
follow ups that clear unused facility from tarantool code.

First patch introduces a wal writer fiber and write queue what is needed
in order to have an ability to yield during wal writes. Without this
functionality there is no possibility to make a cbus call from wal
writer functions.

Second patch moves wal dir tracking into gc state. This is required to
simplify relay state tracking as in-memory relay would not have any glue
about wal files boundaries.

Third patch removes wal boundaries tracking from relay

Forth patch prepares wal to write all data in a memory buffer and then
write the data to a file. This memory buffer contains raw encoded data
and xrow_header index to support navigation thorough the data.

The next patch allows a relay to attach to relay and call send callback
on each row stored in a wal memory buffer.

Two subsequent patches removes wal_watcher and on_close_log triggers as
unused.

Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-3794-memory-replication
Issue: https://github.com/tarantool/tarantool/issues/3794

Georgy Kirichenko (7):
  Refactoring: wal writer fiber and queue
  Refactoring: Track wal files using gc state.
  Replication: Relay does not rely on xlog boundaries
  Replication: wal memory buffer
  Replication: in memory replication
  Refactoring: remove wal_watcher routines
  Refactoring: get rid of on_close_log

 src/box/CMakeLists.txt                |   1 +
 src/box/box.cc                        |  25 +-
 src/box/gc.c                          | 131 ++++-
 src/box/gc.h                          |  34 +-
 src/box/lua/info.c                    |  25 +-
 src/box/memtx_engine.c                |   2 +-
 src/box/recovery.cc                   |  10 +-
 src/box/recovery.h                    |   5 +-
 src/box/relay.cc                      | 263 +++------
 src/box/vy_log.c                      |   4 +-
 src/box/wal.c                         | 742 +++++++++++++++-----------
 src/box/wal.h                         |  97 +---
 src/box/wal_mem.c                     | 273 ++++++++++
 src/box/wal_mem.h                     | 166 ++++++
 src/box/xlog.c                        |  87 ++-
 src/box/xlog.h                        |  11 +-
 test/replication/gc_no_space.result   |   4 +-
 test/replication/gc_no_space.test.lua |   4 +-
 18 files changed, 1208 insertions(+), 676 deletions(-)
 create mode 100644 src/box/wal_mem.c
 create mode 100644 src/box/wal_mem.h

-- 
2.22.0

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] [PATCH 1/7] Refactoring: wal writer fiber and queue
  2019-08-13  6:27 [tarantool-patches] [PATCH 0/7] Replication: In-memory replication Georgy Kirichenko
@ 2019-08-13  6:27 ` Georgy Kirichenko
  2019-08-16 13:53   ` [tarantool-patches] " Konstantin Osipov
  2019-08-21 10:18   ` [tarantool-patches] " Vladimir Davydov
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 2/7] Refactoring: Track wal files using gc state Georgy Kirichenko
                   ` (6 subsequent siblings)
  7 siblings, 2 replies; 18+ messages in thread
From: Georgy Kirichenko @ 2019-08-13  6:27 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

As wal processes all writes in a cbus loop fiber it isn't possible to
yield while write. The patch introduces a wal write queue and a wal write
fiber which fetch a batch from queue and writes it out. Also checkpoint
requests are going now throw the queue to synchronize a tx checkpoint
status with wal.

This patch enables to put all garbage state into one gc object living in
tx cord and to asl gc to free space from wal in case of no space
error.
---
 src/box/wal.c | 187 +++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 146 insertions(+), 41 deletions(-)

diff --git a/src/box/wal.c b/src/box/wal.c
index 58a58e5b5..5d8dcc4f7 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -92,6 +92,10 @@ struct wal_writer
 	/** A memory pool for messages. */
 	struct mempool msg_pool;
 	/* ----------------- wal ------------------- */
+	/** A write queue. */
+	struct stailq write_queue;
+	/** A write queue condition. */
+	struct fiber_cond write_cond;
 	/** A setting from instance configuration - rows_per_wal */
 	int64_t wal_max_rows;
 	/** A setting from instance configuration - wal_max_size */
@@ -158,19 +162,40 @@ struct wal_writer
 	struct rlist watchers;
 };
 
+enum wal_msg_type {
+	WAL_MSG_WRITE = 0,
+	WAL_MSG_CHECKPOINT = 1
+};
+
 struct wal_msg {
 	struct cmsg base;
-	/** Approximate size of this request when encoded. */
-	size_t approx_len;
-	/** Input queue, on output contains all committed requests. */
-	struct stailq commit;
-	/**
-	 * In case of rollback, contains the requests which must
-	 * be rolled back.
-	 */
-	struct stailq rollback;
-	/** vclock after the batch processed. */
-	struct vclock vclock;
+	/** A link to a wal writer queue. */
+	struct stailq_entry in_queue;
+	/** Wal messgae type. */
+	enum wal_msg_type type;
+	union {
+		struct {
+			/** Approximate size of this request when encoded. */
+			size_t approx_len;
+			/** Input queue, on output contains all committed requests. */
+			struct stailq commit;
+			/**
+			 * In case of rollback, contains the requests which must
+			 * be rolled back.
+			 */
+			struct stailq rollback;
+			/** vclock after the batch processed. */
+			struct vclock vclock;
+		};
+		struct {
+			/** A checkpoint structure. */
+			struct wal_checkpoint *checkpoint;
+			/** Fiber issued the batch. */
+			struct fiber *fiber;
+			/** return code. */
+			int *rc;
+		};
+	};
 };
 
 /**
@@ -197,7 +222,10 @@ static void
 tx_schedule_commit(struct cmsg *msg);
 
 static struct cmsg_hop wal_request_route[] = {
-	{wal_write_to_disk, &wal_writer_singleton.tx_prio_pipe},
+	{wal_write_to_disk, NULL}
+};
+
+static struct cmsg_hop wal_response_route[] = {
 	{tx_schedule_commit, NULL},
 };
 
@@ -214,7 +242,9 @@ wal_msg_create(struct wal_msg *batch)
 static struct wal_msg *
 wal_msg(struct cmsg *msg)
 {
-	return msg->route == wal_request_route ? (struct wal_msg *) msg : NULL;
+	return msg->route == wal_request_route &&
+	       ((struct wal_msg *)msg)->type == WAL_MSG_WRITE ?
+	       (struct wal_msg *) msg : NULL;
 }
 
 /** Write a request to a log in a single transaction. */
@@ -271,18 +301,22 @@ tx_schedule_commit(struct cmsg *msg)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
 	struct wal_msg *batch = (struct wal_msg *) msg;
-	/*
-	 * Move the rollback list to the writer first, since
-	 * wal_msg memory disappears after the first
-	 * iteration of tx_schedule_queue loop.
-	 */
-	if (! stailq_empty(&batch->rollback)) {
-		/* Closes the input valve. */
-		stailq_concat(&writer->rollback, &batch->rollback);
+	if (batch->type == WAL_MSG_WRITE) {
+		/*
+		 * Move the rollback list to the writer first, since
+		 * wal_msg memory disappears after the first
+		 * iteration of tx_schedule_queue loop.
+		 */
+		if (! stailq_empty(&batch->rollback)) {
+			/* Closes the input valve. */
+			stailq_concat(&writer->rollback, &batch->rollback);
+		}
+		/* Update the tx vclock to the latest written by wal. */
+		vclock_copy(&replicaset.vclock, &batch->vclock);
+		tx_schedule_queue(&batch->commit);
+	} else {
+		fiber_wakeup(batch->fiber);
 	}
-	/* Update the tx vclock to the latest written by wal. */
-	vclock_copy(&replicaset.vclock, &batch->vclock);
-	tx_schedule_queue(&batch->commit);
 	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
 }
 
@@ -378,6 +412,9 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 
 	mempool_create(&writer->msg_pool, &cord()->slabc,
 		       sizeof(struct wal_msg));
+
+	stailq_create(&writer->write_queue);
+	fiber_cond_create(&writer->write_cond);
 }
 
 /** Destroy a WAL writer structure. */
@@ -389,7 +426,7 @@ wal_writer_destroy(struct wal_writer *writer)
 
 /** WAL writer thread routine. */
 static int
-wal_writer_f(va_list ap);
+wal_cord_f(va_list ap);
 
 static int
 wal_open_f(struct cbus_call_msg *msg)
@@ -475,7 +512,7 @@ wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 			  on_checkpoint_threshold);
 
 	/* Start WAL thread. */
-	if (cord_costart(&writer->cord, "wal", wal_writer_f, NULL) != 0)
+	if (cord_costart(&writer->cord, "wal", wal_cord_f, NULL) != 0)
 		return -1;
 
 	/* Create a pipe to WAL thread. */
@@ -533,19 +570,19 @@ wal_sync(void)
 	cbus_flush(&writer->wal_pipe, &writer->tx_prio_pipe, NULL);
 }
 
-static int
-wal_begin_checkpoint_f(struct cbus_call_msg *data)
+static void
+wal_begin_checkpoint_f(struct wal_msg *wal_msg)
 {
-	struct wal_checkpoint *msg = (struct wal_checkpoint *) data;
 	struct wal_writer *writer = &wal_writer_singleton;
+
 	if (writer->in_rollback.route != NULL) {
 		/*
 		 * We're rolling back a failed write and so
 		 * can't make a checkpoint - see the comment
 		 * in wal_begin_checkpoint() for the explanation.
 		 */
-		diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
-		return -1;
+		*wal_msg->rc = -1;
+		return;
 	}
 	/*
 	 * Avoid closing the current WAL if it has no rows (empty).
@@ -559,9 +596,9 @@ wal_begin_checkpoint_f(struct cbus_call_msg *data)
 		 * The next WAL will be created on the first write.
 		 */
 	}
-	vclock_copy(&msg->vclock, &writer->vclock);
-	msg->wal_size = writer->checkpoint_wal_size;
-	return 0;
+	vclock_copy(&wal_msg->checkpoint->vclock, &writer->vclock);
+	wal_msg->checkpoint->wal_size = writer->checkpoint_wal_size;
+	*wal_msg->rc = 0;
 }
 
 int
@@ -584,13 +621,29 @@ wal_begin_checkpoint(struct wal_checkpoint *checkpoint)
 		diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
 		return -1;
 	}
+	struct wal_msg *batch = (struct wal_msg *)mempool_alloc(&writer->msg_pool);
+	if (batch == NULL) {
+		diag_set(OutOfMemory, sizeof(struct wal_msg),
+			 "region", "struct wal_msg");
+		return -1;
+	}
+	int rc = -1;
+	wal_msg_create(batch);
+	/* Issue a fake journal entry and catch batch vclock. */
+	batch->type = WAL_MSG_CHECKPOINT;
+	batch->checkpoint = checkpoint;
+	batch->fiber = fiber();
+	batch->rc = &rc;
+
+	cpipe_push(&writer->wal_pipe, &batch->base);
+
 	bool cancellable = fiber_set_cancellable(false);
-	int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
-			   &checkpoint->base, wal_begin_checkpoint_f, NULL,
-			   TIMEOUT_INFINITY);
+	fiber_yield();
 	fiber_set_cancellable(cancellable);
-	if (rc != 0)
+	if (rc != 0) {
+		diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
 		return -1;
+	}
 	return 0;
 }
 
@@ -922,10 +975,18 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 }
 
 static void
-wal_write_to_disk(struct cmsg *msg)
+wal_write_to_disk(struct cmsg *base)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
-	struct wal_msg *wal_msg = (struct wal_msg *) msg;
+	struct wal_msg *wal_msg = container_of(base, struct wal_msg, base);
+	if (stailq_empty(&writer->write_queue))
+		fiber_cond_signal(&writer->write_cond);
+	stailq_add_tail(&writer->write_queue, &wal_msg->in_queue);
+}
+
+static void
+wal_write_batch(struct wal_writer *writer, struct wal_msg *wal_msg)
+{
 	struct error *error;
 
 	/*
@@ -1068,9 +1129,43 @@ done:
 	wal_notify_watchers(writer, WAL_EVENT_WRITE);
 }
 
-/** WAL writer main loop.  */
+/*
+ * Wal write fiber function.
+ */
 static int
 wal_writer_f(va_list ap)
+{
+	struct wal_writer *writer = va_arg(ap, struct wal_writer *);
+	while (!fiber_is_cancelled()) {
+		if (stailq_empty(&writer->write_queue)) {
+			fiber_cond_wait(&writer->write_cond);
+			continue;
+		}
+		/* Fetch and dispatch a request. */
+		struct wal_msg *batch =
+			stailq_shift_entry(&writer->write_queue,
+					   struct wal_msg, in_queue);
+
+		switch (batch->type) {
+		case WAL_MSG_WRITE: wal_write_batch(writer, batch);
+			break;
+		case WAL_MSG_CHECKPOINT: wal_begin_checkpoint_f(batch);
+			break;
+		default:
+			assert(false);
+		}
+
+		/* Push a response back to tx cord. */
+		cmsg_init(&batch->base, wal_response_route);
+		cpipe_push(&writer->tx_prio_pipe, &batch->base);
+	}
+
+	return 0;
+}
+
+/** WAL writer main fiber. */
+static int
+wal_cord_f(va_list ap)
 {
 	(void) ap;
 	struct wal_writer *writer = &wal_writer_singleton;
@@ -1078,6 +1173,12 @@ wal_writer_f(va_list ap)
 	/** Initialize eio in this thread */
 	coio_enable();
 
+	struct fiber *wal_fiber = fiber_new("wal_writer", wal_writer_f);
+	if (wal_fiber == NULL)
+		panic("Could not create a wal fiber");
+	fiber_set_joinable(wal_fiber, true);
+	fiber_start(wal_fiber, writer);
+
 	struct cbus_endpoint endpoint;
 	cbus_endpoint_create(&endpoint, "wal", fiber_schedule_cb, fiber());
 	/*
@@ -1089,6 +1190,9 @@ wal_writer_f(va_list ap)
 
 	cbus_loop(&endpoint);
 
+	fiber_cancel(wal_fiber);
+	fiber_join(wal_fiber);
+
 	/*
 	 * Create a new empty WAL on shutdown so that we don't
 	 * have to rescan the last WAL to find the instance vclock.
@@ -1157,6 +1261,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 			goto fail;
 		}
 		wal_msg_create(batch);
+		batch->type = WAL_MSG_WRITE;
 		/*
 		 * Sic: first add a request, then push the batch,
 		 * since cpipe_push() may pass the batch to WAL
-- 
2.22.0

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] [PATCH 2/7] Refactoring: Track wal files using gc state.
  2019-08-13  6:27 [tarantool-patches] [PATCH 0/7] Replication: In-memory replication Georgy Kirichenko
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 1/7] Refactoring: wal writer fiber and queue Georgy Kirichenko
@ 2019-08-13  6:27 ` Georgy Kirichenko
  2019-08-21 10:44   ` Vladimir Davydov
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 3/7] Replication: Relay does not rely on xlog boundaries Georgy Kirichenko
                   ` (5 subsequent siblings)
  7 siblings, 1 reply; 18+ messages in thread
From: Georgy Kirichenko @ 2019-08-13  6:27 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Move wal files tracking from wal into gc struct because current garbage
collection was complicated and illogical:
1. wal tracks xlogs ->
   relay catches on_close_log and sends events to gc ->
   gc tracks the least relay signature and sends vclock to wal
   wal collect logs
   gc collects checkpoints independently
2. in case of no space errors wal collects logs (making some checkpoints
     pointless) ->
   wal notifies gc
   gc deactivates outdated relays (this is a BUG because relay may catch
   the currenct state after deactivating)
This flow does not allow in memory replication because relay would not
have xlog boundaries yet.
The flow after the patch is more straightforward:
   wal informs gc about log open/close events
   gc tracks both checkpoints and logs
   relay catches on_close_log and send events to gc (this will be
   changed after in memroy replication patch)
   wal requests gc to free space in case of no space error
   gc could consistently free checkpoints and logs.

As relay notifies tx about ACK state changes gc would be able to track
the oldest used used wal and perform garbage collection as well as relay
deactivating.
---
 src/box/box.cc                        |  25 ++-
 src/box/gc.c                          |  74 ++++++--
 src/box/gc.h                          |  32 +++-
 src/box/lua/info.c                    |  25 ++-
 src/box/memtx_engine.c                |   2 +-
 src/box/recovery.cc                   |   7 +-
 src/box/recovery.h                    |   3 +-
 src/box/vy_log.c                      |   4 +-
 src/box/wal.c                         | 262 ++++++++++++--------------
 src/box/wal.h                         |  23 +--
 src/box/xlog.c                        |  10 +-
 src/box/xlog.h                        |   2 +-
 test/replication/gc_no_space.result   |   4 +-
 test/replication/gc_no_space.test.lua |   4 +-
 14 files changed, 250 insertions(+), 227 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 80249919e..7cc2cf83f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1649,7 +1649,8 @@ box_process_vote(struct ballot *ballot)
 {
 	ballot->is_ro = cfg_geti("read_only") != 0;
 	vclock_copy(&ballot->vclock, &replicaset.vclock);
-	vclock_copy(&ballot->gc_vclock, &gc.vclock);
+	if (xdir_first_vclock(&gc.wal_dir, &ballot->gc_vclock) == -1)
+		vclock_copy(&ballot->gc_vclock, &replicaset.vclock);
 }
 
 /** Insert a new cluster into _schema */
@@ -1938,7 +1939,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	 * so we must reflect this in replicaset vclock to
 	 * not attempt to apply these rows twice.
 	 */
-	recovery_scan(recovery, &replicaset.vclock, &gc.vclock);
+	recovery_scan(recovery, &replicaset.vclock);
 	say_info("instance vclock %s", vclock_to_string(&replicaset.vclock));
 
 	if (wal_dir_lock >= 0) {
@@ -2039,12 +2040,6 @@ tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
 	cbus_process(endpoint);
 }
 
-static void
-on_wal_garbage_collection(const struct vclock *vclock)
-{
-	gc_advance(vclock);
-}
-
 static void
 on_wal_checkpoint_threshold(void)
 {
@@ -2080,6 +2075,15 @@ box_is_configured(void)
 	return is_box_configured;
 }
 
+static void
+on_wal_log_action(enum wal_log_action action, const struct vclock *vclock)
+{
+	if ((action & WAL_LOG_CLOSE) != 0)
+		gc_close_log(vclock);
+	if ((action & WAL_LOG_OPEN) != 0)
+		gc_open_log(vclock);
+}
+
 static inline void
 box_cfg_xc(void)
 {
@@ -2093,7 +2097,7 @@ box_cfg_xc(void)
 	rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX);
 	rmean_error = rmean_new(rmean_error_strings, RMEAN_ERROR_LAST);
 
-	gc_init();
+	gc_init(cfg_gets("wal_dir"));
 	engine_init();
 	schema_init();
 	replication_init();
@@ -2105,7 +2109,8 @@ box_cfg_xc(void)
 	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
 	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
-		     wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
+		     wal_max_size, &INSTANCE_UUID,
+		     on_wal_log_action,
 		     on_wal_checkpoint_threshold) != 0) {
 		diag_raise();
 	}
diff --git a/src/box/gc.c b/src/box/gc.c
index a2c963e0a..944a6f3b2 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -57,6 +57,7 @@
 #include "engine.h"		/* engine_collect_garbage() */
 #include "wal.h"		/* wal_collect_garbage() */
 #include "checkpoint_schedule.h"
+#include "replication.h"
 
 struct gc_state gc;
 
@@ -103,13 +104,16 @@ gc_checkpoint_delete(struct gc_checkpoint *checkpoint)
 }
 
 void
-gc_init(void)
+gc_init(const char *wal_dir_name)
 {
 	/* Don't delete any files until recovery is complete. */
 	gc.min_checkpoint_count = INT_MAX;
 
-	vclock_create(&gc.vclock);
 	rlist_create(&gc.checkpoints);
+	xdir_create(&gc.wal_dir, wal_dir_name, XLOG, &INSTANCE_UUID,
+		    &xlog_opts_default);
+	xdir_scan(&gc.wal_dir);
+	gc.log_opened = false;
 	gc_tree_new(&gc.consumers);
 	fiber_cond_create(&gc.cleanup_cond);
 	checkpoint_schedule_cfg(&gc.checkpoint_schedule, 0, 0);
@@ -135,6 +139,7 @@ gc_free(void)
 	 * running when this function is called.
 	 */
 
+	xdir_destroy(&gc.wal_dir);
 	/* Free checkpoints. */
 	struct gc_checkpoint *checkpoint, *next_checkpoint;
 	rlist_foreach_entry_safe(checkpoint, &gc.checkpoints, in_checkpoints,
@@ -196,11 +201,10 @@ gc_run_cleanup(void)
 	if (vclock == NULL ||
 	    vclock_sum(vclock) > vclock_sum(&checkpoint->vclock))
 		vclock = &checkpoint->vclock;
-
-	if (vclock_sum(vclock) > vclock_sum(&gc.vclock)) {
-		vclock_copy(&gc.vclock, vclock);
-		run_wal_gc = true;
-	}
+	int cmp = vclock_compare(vclock, &replicaset.vclock);
+	if (gc.log_opened || !(cmp == 0 || cmp == 1))
+		vclock = vclockset_psearch(&gc.wal_dir.index, vclock);
+	run_wal_gc = vclock != NULL;
 
 	if (!run_engine_gc && !run_wal_gc)
 		return; /* nothing to do */
@@ -221,7 +225,8 @@ gc_run_cleanup(void)
 	if (run_engine_gc)
 		engine_collect_garbage(&checkpoint->vclock);
 	if (run_wal_gc)
-		wal_collect_garbage(vclock);
+		xdir_collect_garbage(&gc.wal_dir, vclock_sum(vclock),
+				     XDIR_GC_ASYNC);
 }
 
 static int
@@ -273,20 +278,32 @@ gc_wait_cleanup(void)
 		fiber_cond_wait(&gc.cleanup_cond);
 }
 
-void
-gc_advance(const struct vclock *vclock)
+int
+gc_force_wal_cleanup()
 {
-	/*
-	 * In case of emergency ENOSPC, the WAL thread may delete
-	 * WAL files needed to restore from backup checkpoints,
-	 * which would be kept by the garbage collector otherwise.
-	 * Bring the garbage collector vclock up to date.
-	 */
-	vclock_copy(&gc.vclock, vclock);
+	/* Get the last checkpoint vclock. */
+	struct gc_checkpoint *checkpoint;
+	checkpoint = rlist_last_entry(&gc.checkpoints,
+				      struct gc_checkpoint, in_checkpoints);
+	/* Check there are files beforte the last checkpoint. */
+	if (!xdir_has_garbage(&gc.wal_dir, vclock_sum(&checkpoint->vclock)))
+		return -1;
+
+	/* And delete the oldest. */
+	xdir_collect_garbage(&gc.wal_dir, vclock_sum(&checkpoint->vclock),
+			     XDIR_GC_ASYNC | XDIR_GC_REMOVE_ONE);
+
+	/* Get the oldest vclock. */
+	struct vclock vclock;
+	if (xdir_first_vclock(&gc.wal_dir, &vclock) < 0) {
+		/* Last wal was deleted, so use last checkpoint's vclock. */
+		vclock_copy(&vclock, &checkpoint->vclock);
+	}
 
+	/* Deactivate all consumers before the oldest vclock. */
 	struct gc_consumer *consumer = gc_tree_first(&gc.consumers);
 	while (consumer != NULL &&
-	       vclock_sum(&consumer->vclock) < vclock_sum(vclock)) {
+		vclock_sum(&consumer->vclock) < vclock_sum(&vclock)) {
 		struct gc_consumer *next = gc_tree_next(&gc.consumers,
 							consumer);
 		assert(!consumer->is_inactive);
@@ -299,6 +316,7 @@ gc_advance(const struct vclock *vclock)
 		consumer = next;
 	}
 	gc_schedule_cleanup();
+	return 0;
 }
 
 void
@@ -358,6 +376,26 @@ gc_add_checkpoint(const struct vclock *vclock)
 	gc_schedule_cleanup();
 }
 
+void
+gc_open_log(const struct vclock *vclock)
+{
+	struct vclock last_vclock;
+	if (xdir_last_vclock(&gc.wal_dir, &last_vclock) == -1 ||
+	    vclock_compare(vclock, &last_vclock) != 0)
+		xdir_add_vclock(&gc.wal_dir, vclock);
+	assert(!gc.log_opened);
+	gc.log_opened = true;
+}
+
+void
+gc_close_log(const struct vclock *vclock)
+{
+	(void) vclock;
+	assert(gc.log_opened);
+	gc.log_opened = false;
+	gc_schedule_cleanup();
+}
+
 static int
 gc_do_checkpoint(void)
 {
diff --git a/src/box/gc.h b/src/box/gc.h
index 5790ebcc6..9b38a0c06 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -39,6 +39,7 @@
 #include "vclock.h"
 #include "trivia/util.h"
 #include "checkpoint_schedule.h"
+#include "xlog.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -103,8 +104,6 @@ typedef rb_tree(struct gc_consumer) gc_tree_t;
 
 /** Garbage collection state. */
 struct gc_state {
-	/** VClock of the oldest WAL row available on the instance. */
-	struct vclock vclock;
 	/**
 	 * Minimal number of checkpoints to preserve.
 	 * Configured by box.cfg.checkpoint_count.
@@ -121,6 +120,10 @@ struct gc_state {
 	 * to the tail. Linked by gc_checkpoint::in_checkpoints.
 	 */
 	struct rlist checkpoints;
+	/** Directory to track log files. */
+	struct xdir wal_dir;
+	/** True if log is opened. */
+	bool log_opened;
 	/** Registered consumers, linked by gc_consumer::node. */
 	gc_tree_t consumers;
 	/** Fiber responsible for periodic checkpointing. */
@@ -198,7 +201,7 @@ gc_last_checkpoint(void)
  * Initialize the garbage collection state.
  */
 void
-gc_init(void);
+gc_init(const char *wal_dir_name);
 
 /**
  * Destroy the garbage collection state.
@@ -206,13 +209,6 @@ gc_init(void);
 void
 gc_free(void);
 
-/**
- * Advance the garbage collector vclock to the given position.
- * Deactivate WAL consumers that need older data.
- */
-void
-gc_advance(const struct vclock *vclock);
-
 /**
  * Update the minimal number of checkpoints to preserve.
  * Called when box.cfg.checkpoint_count is updated.
@@ -239,6 +235,18 @@ gc_set_checkpoint_interval(double interval);
 void
 gc_add_checkpoint(const struct vclock *vclock);
 
+/**
+ * Tell gc a log was opened.
+ */
+void
+gc_open_log(const struct vclock *vclock);
+
+/**
+ * Tell gc a log was closed.
+ */
+void
+gc_close_log(const struct vclock *vclock);
+
 /**
  * Make a checkpoint.
  *
@@ -333,6 +341,10 @@ gc_consumer_iterator_init(struct gc_consumer_iterator *it)
 struct gc_consumer *
 gc_consumer_iterator_next(struct gc_consumer_iterator *it);
 
+/** Delete the oldest wal file before the last checkpoint. */
+int
+gc_force_wal_cleanup();
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index d0e553b1d..62755a825 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -371,14 +371,6 @@ lbox_info_gc_call(struct lua_State *L)
 
 	lua_newtable(L);
 
-	lua_pushstring(L, "vclock");
-	lbox_pushvclock(L, &gc.vclock);
-	lua_settable(L, -3);
-
-	lua_pushstring(L, "signature");
-	luaL_pushint64(L, vclock_sum(&gc.vclock));
-	lua_settable(L, -3);
-
 	lua_pushstring(L, "checkpoint_is_in_progress");
 	lua_pushboolean(L, gc.checkpoint_is_in_progress);
 	lua_settable(L, -3);
@@ -386,9 +378,13 @@ lbox_info_gc_call(struct lua_State *L)
 	lua_pushstring(L, "checkpoints");
 	lua_newtable(L);
 
+	struct vclock *checkpoint_vclock = NULL;
+
 	count = 0;
 	struct gc_checkpoint *checkpoint;
 	gc_foreach_checkpoint(checkpoint) {
+		if (checkpoint_vclock == NULL)
+			checkpoint_vclock = &checkpoint->vclock;
 		lua_createtable(L, 0, 2);
 
 		lua_pushstring(L, "vclock");
@@ -416,12 +412,15 @@ lbox_info_gc_call(struct lua_State *L)
 	lua_pushstring(L, "consumers");
 	lua_newtable(L);
 
+	struct vclock *gc_vclock = NULL;
 	struct gc_consumer_iterator consumers;
 	gc_consumer_iterator_init(&consumers);
 
 	count = 0;
 	struct gc_consumer *consumer;
 	while ((consumer = gc_consumer_iterator_next(&consumers)) != NULL) {
+		if (gc_vclock == NULL)
+			gc_vclock = &consumer->vclock;
 		lua_createtable(L, 0, 3);
 
 		lua_pushstring(L, "name");
@@ -439,7 +438,17 @@ lbox_info_gc_call(struct lua_State *L)
 		lua_rawseti(L, -2, ++count);
 	}
 	lua_settable(L, -3);
+	if (gc_vclock == NULL ||
+	    vclock_sum(gc_vclock) > vclock_sum(checkpoint_vclock))
+		gc_vclock = checkpoint_vclock;
+
+	lua_pushstring(L, "vclock");
+	lbox_pushvclock(L, gc_vclock);
+	lua_settable(L, -3);
 
+	lua_pushstring(L, "signature");
+	luaL_pushint64(L, vclock_sum(gc_vclock));
+	lua_settable(L, -3);
 	return 1;
 }
 
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 59ad16823..9b3f2233e 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -568,7 +568,7 @@ checkpoint_f(va_list ap)
 	}
 
 	struct xlog snap;
-	if (xdir_create_xlog(&ckpt->dir, &snap, &ckpt->vclock) != 0)
+	if (xdir_create_xlog(&ckpt->dir, &snap, &ckpt->vclock, NULL) != 0)
 		return -1;
 
 	say_info("saving snapshot `%s'", snap.filename);
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index d122d618a..45c4f6820 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -118,22 +118,17 @@ recovery_new(const char *wal_dirname, bool force_recovery,
 }
 
 void
-recovery_scan(struct recovery *r, struct vclock *end_vclock,
-	      struct vclock *gc_vclock)
+recovery_scan(struct recovery *r, struct vclock *end_vclock)
 {
 	xdir_scan_xc(&r->wal_dir);
 
 	if (xdir_last_vclock(&r->wal_dir, end_vclock) < 0 ||
 	    vclock_compare(end_vclock, &r->vclock) < 0) {
 		/* No xlogs after last checkpoint. */
-		vclock_copy(gc_vclock, &r->vclock);
 		vclock_copy(end_vclock, &r->vclock);
 		return;
 	}
 
-	if (xdir_first_vclock(&r->wal_dir, gc_vclock) < 0)
-		unreachable();
-
 	/* Scan the last xlog to find end vclock. */
 	struct xlog_cursor cursor;
 	if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0)
diff --git a/src/box/recovery.h b/src/box/recovery.h
index 6e68abc0b..2a03cfc2f 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -75,8 +75,7 @@ recovery_delete(struct recovery *r);
  * WAL directory.
  */
 void
-recovery_scan(struct recovery *r,  struct vclock *end_vclock,
-	      struct vclock *gc_vclock);
+recovery_scan(struct recovery *r, struct vclock *end_vclock);
 
 void
 recovery_follow_local(struct recovery *r, struct xstream *stream,
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index cb291f3c8..d97eff9b8 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -949,7 +949,7 @@ vy_log_open(struct xlog *xlog)
 	}
 
 	if (xdir_create_xlog(&vy_log.dir, xlog,
-			     &vy_log.last_checkpoint) < 0)
+			     &vy_log.last_checkpoint, NULL) < 0)
 		goto fail;
 
 	struct xrow_header row;
@@ -2585,7 +2585,7 @@ vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery)
 	rlist_foreach_entry(lsm, &recovery->lsms, in_recovery) {
 		/* Create the log file on the first write. */
 		if (!xlog_is_open(&xlog) &&
-		    xdir_create_xlog(&vy_log.dir, &xlog, vclock) != 0)
+		    xdir_create_xlog(&vy_log.dir, &xlog, vclock, NULL) != 0)
 			goto err_create_xlog;
 
 		if (vy_log_append_lsm(&xlog, lsm) != 0)
diff --git a/src/box/wal.c b/src/box/wal.c
index 5d8dcc4f7..a09ab7187 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -43,6 +43,7 @@
 #include "cbus.h"
 #include "coio_task.h"
 #include "replication.h"
+#include "gc.h"
 
 enum {
 	/**
@@ -78,7 +79,7 @@ struct wal_writer
 {
 	struct journal base;
 	/* ----------------- tx ------------------- */
-	wal_on_garbage_collection_f on_garbage_collection;
+	wal_on_log_action_f on_log_action;
 	wal_on_checkpoint_threshold_f on_checkpoint_threshold;
 	/**
 	 * The rollback queue. An accumulator for all requests
@@ -123,12 +124,7 @@ struct wal_writer
 	 * with this LSN and LSN becomes "real".
 	 */
 	struct vclock vclock;
-	/**
-	 * VClock of the most recent successfully created checkpoint.
-	 * The WAL writer must not delete WAL files that are needed to
-	 * recover from it even if it is running out of disk space.
-	 */
-	struct vclock checkpoint_vclock;
+	struct vclock prev_vclock;
 	/** Total size of WAL files written since the last checkpoint. */
 	int64_t checkpoint_wal_size;
 	/**
@@ -341,28 +337,6 @@ tx_schedule_rollback(struct cmsg *msg)
 			     container_of(msg, struct wal_msg, base));
 }
 
-
-/**
- * This message is sent from WAL to TX when the WAL thread hits
- * ENOSPC and has to delete some backup WAL files to continue.
- * The TX thread uses this message to shoot off WAL consumers
- * that needed deleted WAL files.
- */
-struct tx_notify_gc_msg {
-	struct cmsg base;
-	/** VClock of the oldest WAL row preserved by WAL. */
-	struct vclock vclock;
-};
-
-static void
-tx_notify_gc(struct cmsg *msg)
-{
-	struct wal_writer *writer = &wal_writer_singleton;
-	struct vclock *vclock = &((struct tx_notify_gc_msg *)msg)->vclock;
-	writer->on_garbage_collection(vclock);
-	free(msg);
-}
-
 static void
 tx_notify_checkpoint(struct cmsg *msg)
 {
@@ -380,7 +354,7 @@ static void
 wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 		  const char *wal_dirname, int64_t wal_max_rows,
 		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-		  wal_on_garbage_collection_f on_garbage_collection,
+		  wal_on_log_action_f on_log_action,
 		  wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	writer->wal_mode = wal_mode;
@@ -404,10 +378,10 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	writer->checkpoint_triggered = false;
 
 	vclock_create(&writer->vclock);
-	vclock_create(&writer->checkpoint_vclock);
+	vclock_clear(&writer->prev_vclock);
 	rlist_create(&writer->watchers);
 
-	writer->on_garbage_collection = on_garbage_collection;
+	writer->on_log_action = on_log_action;
 	writer->on_checkpoint_threshold = on_checkpoint_threshold;
 
 	mempool_create(&writer->msg_pool, &cord()->slabc,
@@ -428,6 +402,40 @@ wal_writer_destroy(struct wal_writer *writer)
 static int
 wal_cord_f(va_list ap);
 
+struct tx_log_action_msg {
+	struct cmsg base;
+	enum wal_log_action action;
+	struct vclock vclock;
+};
+
+static void
+tx_on_log_action(struct cmsg *base)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct tx_log_action_msg *msg =
+		container_of(base, struct tx_log_action_msg, base);
+	writer->on_log_action(msg->action, &msg->vclock);
+	free(msg);
+}
+
+/* Issue notification wal closed log. */
+static inline void
+wal_notify_log_action(struct wal_writer *writer, enum wal_log_action action)
+{
+	struct tx_log_action_msg *msg = malloc(sizeof(*msg));
+	if (msg == NULL) {
+		say_error("Failed to allocate log close message");
+		return;
+	}
+	static struct cmsg_hop route[] = {
+		{ tx_on_log_action, NULL },
+	};
+	cmsg_init(&msg->base, route);
+	msg->action = action;
+	vclock_copy(&msg->vclock, &writer->vclock);
+	cpipe_push(&writer->tx_prio_pipe, &msg->base);
+}
+
 static int
 wal_open_f(struct cbus_call_msg *msg)
 {
@@ -436,7 +444,10 @@ wal_open_f(struct cbus_call_msg *msg)
 	const char *path = xdir_format_filename(&writer->wal_dir,
 				vclock_sum(&writer->vclock), NONE);
 	assert(!xlog_is_open(&writer->current_wal));
-	return xlog_open(&writer->current_wal, path, &writer->wal_dir.opts);
+	int rc = xlog_open(&writer->current_wal, path, &writer->wal_dir.opts);
+	if (rc == 0)
+		wal_notify_log_action(writer, WAL_LOG_OPEN);
+	return rc;
 }
 
 /**
@@ -500,7 +511,7 @@ wal_open(struct wal_writer *writer)
 int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-	 wal_on_garbage_collection_f on_garbage_collection,
+	 wal_on_log_action_f on_log_action,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	assert(wal_max_rows > 1);
@@ -508,7 +519,7 @@ wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	/* Initialize the state. */
 	struct wal_writer *writer = &wal_writer_singleton;
 	wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
-			  wal_max_size, instance_uuid, on_garbage_collection,
+			  wal_max_size, instance_uuid, on_log_action,
 			  on_checkpoint_threshold);
 
 	/* Start WAL thread. */
@@ -528,12 +539,9 @@ wal_enable(void)
 
 	/* Initialize the writer vclock from the recovery state. */
 	vclock_copy(&writer->vclock, &replicaset.vclock);
+	if (vclock_sum(&replicaset.vclock) > 0)
+		vclock_copy(&writer->prev_vclock, &replicaset.vclock);
 
-	/*
-	 * Scan the WAL directory to build an index of all
-	 * existing WAL files. Required for garbage collection,
-	 * see wal_collect_garbage().
-	 */
 	if (xdir_scan(&writer->wal_dir))
 		return -1;
 
@@ -590,8 +598,8 @@ wal_begin_checkpoint_f(struct wal_msg *wal_msg)
 	if (xlog_is_open(&writer->current_wal) &&
 	    vclock_sum(&writer->current_wal.meta.vclock) !=
 	    vclock_sum(&writer->vclock)) {
-
 		xlog_close(&writer->current_wal, false);
+		wal_notify_log_action(writer, WAL_LOG_CLOSE);
 		/*
 		 * The next WAL will be created on the first write.
 		 */
@@ -662,7 +670,6 @@ wal_commit_checkpoint_f(struct cbus_call_msg *data)
 	 * when checkpointing started from the current value
 	 * rather than just setting it to 0.
 	 */
-	vclock_copy(&writer->checkpoint_vclock, &msg->vclock);
 	assert(writer->checkpoint_wal_size >= msg->wal_size);
 	writer->checkpoint_wal_size -= msg->wal_size;
 	writer->checkpoint_triggered = false;
@@ -673,10 +680,8 @@ void
 wal_commit_checkpoint(struct wal_checkpoint *checkpoint)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
-	if (writer->wal_mode == WAL_NONE) {
-		vclock_copy(&writer->checkpoint_vclock, &checkpoint->vclock);
+	if (writer->wal_mode == WAL_NONE)
 		return;
-	}
 	bool cancellable = fiber_set_cancellable(false);
 	cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
 		  &checkpoint->base, wal_commit_checkpoint_f, NULL,
@@ -714,54 +719,6 @@ wal_set_checkpoint_threshold(int64_t threshold)
 	fiber_set_cancellable(cancellable);
 }
 
-struct wal_gc_msg
-{
-	struct cbus_call_msg base;
-	const struct vclock *vclock;
-};
-
-static int
-wal_collect_garbage_f(struct cbus_call_msg *data)
-{
-	struct wal_writer *writer = &wal_writer_singleton;
-	const struct vclock *vclock = ((struct wal_gc_msg *)data)->vclock;
-
-	if (!xlog_is_open(&writer->current_wal) &&
-	    vclock_sum(vclock) >= vclock_sum(&writer->vclock)) {
-		/*
-		 * The last available WAL file has been sealed and
-		 * all registered consumers have done reading it.
-		 * We can delete it now.
-		 */
-	} else {
-		/*
-		 * Find the most recent WAL file that contains rows
-		 * required by registered consumers and delete all
-		 * older WAL files.
-		 */
-		vclock = vclockset_psearch(&writer->wal_dir.index, vclock);
-	}
-	if (vclock != NULL)
-		xdir_collect_garbage(&writer->wal_dir, vclock_sum(vclock),
-				     XDIR_GC_ASYNC);
-
-	return 0;
-}
-
-void
-wal_collect_garbage(const struct vclock *vclock)
-{
-	struct wal_writer *writer = &wal_writer_singleton;
-	if (writer->wal_mode == WAL_NONE)
-		return;
-	struct wal_gc_msg msg;
-	msg.vclock = vclock;
-	bool cancellable = fiber_set_cancellable(false);
-	cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg.base,
-		  wal_collect_garbage_f, NULL, TIMEOUT_INFINITY);
-	fiber_set_cancellable(cancellable);
-}
-
 static void
 wal_notify_watchers(struct wal_writer *writer, unsigned events);
 
@@ -789,6 +746,7 @@ wal_opt_rotate(struct wal_writer *writer)
 	if (xlog_is_open(&writer->current_wal) &&
 	    (writer->current_wal.rows >= writer->wal_max_rows ||
 	     writer->current_wal.offset >= writer->wal_max_size)) {
+		wal_notify_log_action(writer, WAL_LOG_CLOSE);
 		/*
 		 * We can not handle xlog_close()
 		 * failure in any reasonable way.
@@ -801,20 +759,69 @@ wal_opt_rotate(struct wal_writer *writer)
 		return 0;
 
 	if (xdir_create_xlog(&writer->wal_dir, &writer->current_wal,
-			     &writer->vclock) != 0) {
+			     &writer->vclock, &writer->prev_vclock) != 0) {
 		diag_log();
 		return -1;
 	}
-	/*
-	 * Keep track of the new WAL vclock. Required for garbage
-	 * collection, see wal_collect_garbage().
-	 */
-	xdir_add_vclock(&writer->wal_dir, &writer->vclock);
+	vclock_copy(&writer->prev_vclock, &writer->vclock);
 
+	wal_notify_log_action(writer, WAL_LOG_OPEN);
 	wal_notify_watchers(writer, WAL_EVENT_ROTATE);
 	return 0;
 }
 
+struct gc_force_wal_msg {
+	struct cmsg base;
+	struct fiber_cond done_cond;
+	bool done;
+	int rc;
+};
+
+static void
+wal_gc_wal_force_done(struct cmsg *base)
+{
+	struct gc_force_wal_msg *msg = container_of(base,
+						   struct gc_force_wal_msg,
+						   base);
+	msg->done = true;
+	fiber_cond_signal(&msg->done_cond);
+}
+
+static int
+tx_gc_force_wal_f(va_list ap)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct gc_force_wal_msg *msg = va_arg(ap, struct gc_force_wal_msg *);
+	static struct cmsg_hop respond_route[] = {
+		{wal_gc_wal_force_done, NULL}
+	};
+	msg->rc = gc_force_wal_cleanup();
+	cmsg_init(&msg->base, respond_route);
+	cpipe_push(&writer->wal_pipe, &msg->base);
+	return 0;
+}
+
+void
+tx_gc_wal_force(struct cmsg *base)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct gc_force_wal_msg *msg = container_of(base,
+						   struct gc_force_wal_msg,
+						   base);
+	struct fiber *gc_fiber = fiber_new("wal_gc_fiber", tx_gc_force_wal_f);
+	if (gc_fiber == NULL) {
+		struct cmsg_hop respond_route[] = {
+			{wal_gc_wal_force_done, NULL}
+		};
+		msg->rc = -1;
+		cmsg_init(&msg->base, respond_route);
+		cpipe_push(&writer->wal_pipe, &msg->base);
+		return;
+	}
+	fiber_start(gc_fiber, msg);
+	return;
+}
+
 /**
  * Make sure there's enough disk space to append @len bytes
  * of data to the current WAL.
@@ -825,17 +832,10 @@ wal_opt_rotate(struct wal_writer *writer)
 static int
 wal_fallocate(struct wal_writer *writer, size_t len)
 {
-	bool warn_no_space = true, notify_gc = false;
 	struct xlog *l = &writer->current_wal;
 	struct errinj *errinj = errinj(ERRINJ_WAL_FALLOCATE, ERRINJ_INT);
 	int rc = 0;
 
-	/*
-	 * Max LSN that can be collected in case of ENOSPC -
-	 * we must not delete WALs necessary for recovery.
-	 */
-	int64_t gc_lsn = vclock_sum(&writer->checkpoint_vclock);
-
 	/*
 	 * The actual write size can be greater than the sum size
 	 * of encoded rows (compression, fixheaders). Double the
@@ -856,45 +856,23 @@ retry:
 	}
 	if (errno != ENOSPC)
 		goto error;
-	if (!xdir_has_garbage(&writer->wal_dir, gc_lsn))
-		goto error;
-
-	if (warn_no_space) {
-		say_crit("ran out of disk space, try to delete old WAL files");
-		warn_no_space = false;
-	}
-
-	xdir_collect_garbage(&writer->wal_dir, gc_lsn, XDIR_GC_REMOVE_ONE);
-	notify_gc = true;
-	goto retry;
+	static struct cmsg_hop gc_wal_force_route[] = {
+		{tx_gc_wal_force, NULL}
+	};
+	struct gc_force_wal_msg msg;
+	msg.done = false;
+	fiber_cond_create(&msg.done_cond);
+	cmsg_init(&msg.base, gc_wal_force_route);
+	cpipe_push(&writer->tx_prio_pipe, &msg.base);
+
+	while (!msg.done)
+		fiber_cond_wait(&msg.done_cond);
+	if (msg.rc == 0)
+		goto retry;
 error:
 	diag_log();
 	rc = -1;
 out:
-	/*
-	 * Notify the TX thread if the WAL thread had to delete
-	 * some WAL files to proceed so that TX can shoot off WAL
-	 * consumers that still need those files.
-	 *
-	 * We allocate the message with malloc() and we ignore
-	 * allocation failures, because this is a pretty rare
-	 * event and a failure to send this message isn't really
-	 * critical.
-	 */
-	if (notify_gc) {
-		static struct cmsg_hop route[] = {
-			{ tx_notify_gc, NULL },
-		};
-		struct tx_notify_gc_msg *msg = malloc(sizeof(*msg));
-		if (msg != NULL) {
-			if (xdir_first_vclock(&writer->wal_dir,
-					      &msg->vclock) < 0)
-				vclock_copy(&msg->vclock, &writer->vclock);
-			cmsg_init(&msg->base, route);
-			cpipe_push(&writer->tx_prio_pipe, &msg->base);
-		} else
-			say_warn("failed to allocate gc notification message");
-	}
 	return rc;
 }
 
@@ -1204,7 +1182,7 @@ wal_cord_f(va_list ap)
 			    &writer->current_wal.meta.vclock) > 0)) {
 		struct xlog l;
 		if (xdir_create_xlog(&writer->wal_dir, &l,
-				     &writer->vclock) == 0)
+				     &writer->vclock, &writer->prev_vclock) == 0)
 			xlog_close(&l, false);
 		else
 			diag_log();
diff --git a/src/box/wal.h b/src/box/wal.h
index 4e500d2a3..1a7156d97 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -52,14 +52,16 @@ extern int wal_dir_lock;
 extern "C" {
 #endif /* defined(__cplusplus) */
 
+enum wal_log_action {
+	WAL_LOG_OPEN		= (1 << 0),
+	WAL_LOG_CLOSE		= (1 << 1)
+};
+
 /**
- * Callback invoked in the TX thread when the WAL thread runs out
- * of disk space and has to delete some old WAL files to continue.
- * It is supposed to shoot off WAL consumers that need the deleted
- * files. The vclock of the oldest WAL row still available on the
- * instance is passed in @vclock.
+ * Callback invoked in the TX thread when WAL handles a log file.
  */
-typedef void (*wal_on_garbage_collection_f)(const struct vclock *vclock);
+typedef void (*wal_on_log_action_f)(enum wal_log_action acion,
+				    const struct vclock *vclock);
 
 /**
  * Callback invoked in the TX thread when the total size of WAL
@@ -74,7 +76,7 @@ typedef void (*wal_on_checkpoint_threshold_f)(void);
 int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-	 wal_on_garbage_collection_f on_garbage_collection,
+	 wal_on_log_action_f on_log_action,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold);
 
 /**
@@ -218,13 +220,6 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
 void
 wal_set_checkpoint_threshold(int64_t threshold);
 
-/**
- * Remove WAL files that are not needed by consumers reading
- * rows at @vclock or newer.
- */
-void
-wal_collect_garbage(const struct vclock *vclock);
-
 void
 wal_init_vy_log();
 
diff --git a/src/box/xlog.c b/src/box/xlog.c
index 8254cce20..54a7b5246 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -965,20 +965,12 @@ xdir_touch_xlog(struct xdir *dir, const struct vclock *vclock)
  */
 int
 xdir_create_xlog(struct xdir *dir, struct xlog *xlog,
-		 const struct vclock *vclock)
+		 const struct vclock *vclock, const struct vclock *prev_vclock)
 {
 	int64_t signature = vclock_sum(vclock);
 	assert(signature >= 0);
 	assert(!tt_uuid_is_nil(dir->instance_uuid));
 
-	/*
-	 * For WAL dir: store vclock of the previous xlog file
-	 * to check for gaps on recovery.
-	 */
-	const struct vclock *prev_vclock = NULL;
-	if (dir->type == XLOG && !vclockset_empty(&dir->index))
-		prev_vclock = vclockset_last(&dir->index);
-
 	struct xlog_meta meta;
 	xlog_meta_create(&meta, dir->filetype, dir->instance_uuid,
 			 vclock, prev_vclock);
diff --git a/src/box/xlog.h b/src/box/xlog.h
index a48b05fc4..6a71e0cbd 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -424,7 +424,7 @@ xdir_touch_xlog(struct xdir *dir, const struct vclock *vclock);
  */
 int
 xdir_create_xlog(struct xdir *dir, struct xlog *xlog,
-		 const struct vclock *vclock);
+		 const struct vclock *vclock, const struct vclock *prev_vclock);
 
 /**
  * Create new xlog writer based on fd.
diff --git a/test/replication/gc_no_space.result b/test/replication/gc_no_space.result
index e860ab00f..bcf175282 100644
--- a/test/replication/gc_no_space.result
+++ b/test/replication/gc_no_space.result
@@ -250,7 +250,7 @@ gc = box.info.gc()
 ---
 - 2
 ...
-gc.signature == gc.checkpoints[2].signature
+gc.signature == gc.checkpoints[1].signature
 ---
 - true
 ...
@@ -272,7 +272,7 @@ gc = box.info.gc()
 ---
 - 2
 ...
-gc.signature == gc.checkpoints[2].signature
+gc.signature == gc.checkpoints[1].signature
 ---
 - true
 ...
diff --git a/test/replication/gc_no_space.test.lua b/test/replication/gc_no_space.test.lua
index 98ccd401b..bf4049fbb 100644
--- a/test/replication/gc_no_space.test.lua
+++ b/test/replication/gc_no_space.test.lua
@@ -106,7 +106,7 @@ check_snap_count(2)
 gc = box.info.gc()
 #gc.consumers -- 0
 #gc.checkpoints -- 2
-gc.signature == gc.checkpoints[2].signature
+gc.signature == gc.checkpoints[1].signature
 
 s:drop()
 box.schema.user.revoke('guest', 'replication')
@@ -116,4 +116,4 @@ test_run:cleanup_cluster()
 test_run:cmd("restart server default")
 gc = box.info.gc()
 #gc.checkpoints -- 2
-gc.signature == gc.checkpoints[2].signature
+gc.signature == gc.checkpoints[1].signature
-- 
2.22.0

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] [PATCH 3/7] Replication: Relay does not rely on xlog boundaries
  2019-08-13  6:27 [tarantool-patches] [PATCH 0/7] Replication: In-memory replication Georgy Kirichenko
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 1/7] Refactoring: wal writer fiber and queue Georgy Kirichenko
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 2/7] Refactoring: Track wal files using gc state Georgy Kirichenko
@ 2019-08-13  6:27 ` Georgy Kirichenko
  2019-08-21 11:35   ` Vladimir Davydov
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 4/7] Replication: wal memory buffer Georgy Kirichenko
                   ` (4 subsequent siblings)
  7 siblings, 1 reply; 18+ messages in thread
From: Georgy Kirichenko @ 2019-08-13  6:27 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

When relay got an ACK then it matches received vclock against xlog file
boundaries detected using on_close_log trigger and send a consumer
advance message. However, for each ACK relay send a status update
message to the tx cord which could be used for gc purposes.
This patch removes any knowledge about xlog boundaries from relay
because there would not any xlog files in case of in-memory replication.
As gc now tracks all xlog files then it is able to handle garbage files
using relay status updates.

Note: after parallel applier there is no more one ACK per transaction
so it should not be too expensive to advance a consumer on each status
update. However I think it could be improved, for instance with tracking
the next wal file vclock.
---
 src/box/gc.c     |  58 +++++++++++++++++++++------
 src/box/relay.cc | 102 +----------------------------------------------
 2 files changed, 48 insertions(+), 112 deletions(-)

diff --git a/src/box/gc.c b/src/box/gc.c
index 944a6f3b2..9771e407a 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -66,6 +66,17 @@ gc_cleanup_fiber_f(va_list);
 static int
 gc_checkpoint_fiber_f(va_list);
 
+/*
+ * A shortcut function which checks if one vclock equal
+ * or greather than another.
+ */
+static inline bool
+gc_vclock_ge(const struct vclock *first, const struct vclock *second)
+{
+	int cmp = vclock_compare(first, second);
+	return (cmp == 0) || (cmp == 1);
+}
+
 /**
  * Comparator used for ordering gc_consumer objects by signature
  * in a binary tree.
@@ -201,8 +212,7 @@ gc_run_cleanup(void)
 	if (vclock == NULL ||
 	    vclock_sum(vclock) > vclock_sum(&checkpoint->vclock))
 		vclock = &checkpoint->vclock;
-	int cmp = vclock_compare(vclock, &replicaset.vclock);
-	if (gc.log_opened || !(cmp == 0 || cmp == 1))
+	if (gc.log_opened || !gc_vclock_ge(vclock, &replicaset.vclock))
 		vclock = vclockset_psearch(&gc.wal_dir.index, vclock);
 	run_wal_gc = vclock != NULL;
 
@@ -558,9 +568,23 @@ gc_consumer_register(const struct vclock *vclock, const char *format, ...)
 	va_start(ap, format);
 	vsnprintf(consumer->name, GC_NAME_MAX, format, ap);
 	va_end(ap);
-
-	vclock_copy(&consumer->vclock, vclock);
-	gc_tree_insert(&gc.consumers, consumer);
+	/* Found vclock of a wal file which contains a vclock from relay. */
+	struct vclock *track_vclock = vclockset_psearch(&gc.wal_dir.index,
+							vclock);
+	if (track_vclock == NULL && !gc.log_opened &&
+	    gc_vclock_ge(vclock, &replicaset.vclock))  {
+		/*
+		 * There is no wal file in index containing the vclock
+		 * which is possible when a consumer is up to date with
+		 * the last checkpoint and there were no subsequent writes.
+		 */
+		track_vclock = &replicaset.vclock;
+	}
+	if (vclock != NULL) {
+		vclock_copy(&consumer->vclock, track_vclock);
+		gc_tree_insert(&gc.consumers, consumer);
+	} else
+		consumer->is_inactive = true;
 	return consumer;
 }
 
@@ -580,20 +604,30 @@ gc_consumer_advance(struct gc_consumer *consumer, const struct vclock *vclock)
 	if (consumer->is_inactive)
 		return;
 
-	int64_t signature = vclock_sum(vclock);
-	int64_t prev_signature = vclock_sum(&consumer->vclock);
-
-	assert(signature >= prev_signature);
-	if (signature == prev_signature)
-		return; /* nothing to do */
+	/*
+	 * In some rare cases relay could downgrade vclock, for instance in
+	 * case of replica data loss, and there is nothing we could do.
+	 */
+	if (!gc_vclock_ge(vclock, &consumer->vclock))
+		return;
 
+	/* Detect which wal file contains ack-ed relay position. */
+	if (!gc.log_opened && gc_vclock_ge(vclock, &replicaset.vclock)) {
+		/*
+		 * Relay is up to date with this instance and there is no
+		 * wal file (and no writes) after the last checkpoint.
+		 */
+		vclock = &replicaset.vclock;
+	} else
+		vclock = vclockset_psearch(&gc.wal_dir.index, vclock);
+	assert(vclock != NULL);
 	/*
 	 * Do not update the tree unless the tree invariant
 	 * is violated.
 	 */
 	struct gc_consumer *next = gc_tree_next(&gc.consumers, consumer);
 	bool update_tree = (next != NULL &&
-			    signature >= vclock_sum(&next->vclock));
+			    vclock_sum(vclock) >= vclock_sum(&next->vclock));
 
 	if (update_tree)
 		gc_tree_remove(&gc.consumers, consumer);
diff --git a/src/box/relay.cc b/src/box/relay.cc
index efa3373f9..a1b841291 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -66,23 +66,6 @@ struct relay_status_msg {
 	struct vclock vclock;
 };
 
-/**
- * Cbus message to update replica gc state in tx thread.
- */
-struct relay_gc_msg {
-	/** Parent */
-	struct cmsg msg;
-	/**
-	 * Link in the list of pending gc messages,
-	 * see relay::pending_gc.
-	 */
-	struct stailq_entry in_pending;
-	/** Relay instance */
-	struct relay *relay;
-	/** Vclock to advance to */
-	struct vclock vclock;
-};
-
 /** State of a replication relay. */
 struct relay {
 	/** The thread in which we relay data to the replica. */
@@ -239,12 +222,6 @@ relay_exit(struct relay *relay)
 static void
 relay_stop(struct relay *relay)
 {
-	struct relay_gc_msg *gc_msg, *next_gc_msg;
-	stailq_foreach_entry_safe(gc_msg, next_gc_msg,
-				  &relay->pending_gc, in_pending) {
-		free(gc_msg);
-	}
-	stailq_create(&relay->pending_gc);
 	if (relay->r != NULL)
 		recovery_delete(relay->r);
 	relay->r = NULL;
@@ -368,6 +345,7 @@ tx_status_update(struct cmsg *msg)
 {
 	struct relay_status_msg *status = (struct relay_status_msg *)msg;
 	vclock_copy(&status->relay->tx.vclock, &status->vclock);
+	gc_consumer_advance(status->relay->replica->gc, &status->vclock);
 	static const struct cmsg_hop route[] = {
 		{relay_status_update, NULL}
 	};
@@ -375,73 +353,6 @@ tx_status_update(struct cmsg *msg)
 	cpipe_push(&status->relay->relay_pipe, msg);
 }
 
-/**
- * Update replica gc state in tx thread.
- */
-static void
-tx_gc_advance(struct cmsg *msg)
-{
-	struct relay_gc_msg *m = (struct relay_gc_msg *)msg;
-	gc_consumer_advance(m->relay->replica->gc, &m->vclock);
-	free(m);
-}
-
-static void
-relay_on_close_log_f(struct trigger *trigger, void * /* event */)
-{
-	static const struct cmsg_hop route[] = {
-		{tx_gc_advance, NULL}
-	};
-	struct relay *relay = (struct relay *)trigger->data;
-	struct relay_gc_msg *m = (struct relay_gc_msg *)malloc(sizeof(*m));
-	if (m == NULL) {
-		say_warn("failed to allocate relay gc message");
-		return;
-	}
-	cmsg_init(&m->msg, route);
-	m->relay = relay;
-	vclock_copy(&m->vclock, &relay->r->vclock);
-	/*
-	 * Do not invoke garbage collection until the replica
-	 * confirms that it has received data stored in the
-	 * sent xlog.
-	 */
-	stailq_add_tail_entry(&relay->pending_gc, m, in_pending);
-}
-
-/**
- * Invoke pending garbage collection requests.
- *
- * This function schedules the most recent gc message whose
- * vclock is less than or equal to the given one. Older
- * messages are discarded as their job will be done by the
- * scheduled message anyway.
- */
-static inline void
-relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
-{
-	struct relay_gc_msg *curr, *next, *gc_msg = NULL;
-	stailq_foreach_entry_safe(curr, next, &relay->pending_gc, in_pending) {
-		/*
-		 * We may delete a WAL file only if its vclock is
-		 * less than or equal to the vclock acknowledged by
-		 * the replica. Even if the replica's signature is
-		 * is greater, but the vclocks are incomparable, we
-		 * must not delete the WAL, because there may still
-		 * be rows not applied by the replica in it while
-		 * the greater signatures is due to changes pulled
-		 * from other members of the cluster.
-		 */
-		if (vclock_compare(&curr->vclock, vclock) > 0)
-			break;
-		stailq_shift(&relay->pending_gc);
-		free(gc_msg);
-		gc_msg = curr;
-	}
-	if (gc_msg != NULL)
-		cpipe_push(&relay->tx_pipe, &gc_msg->msg);
-}
-
 static void
 relay_set_error(struct relay *relay, struct error *e)
 {
@@ -543,12 +454,6 @@ relay_subscribe_f(va_list ap)
 	cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
 		  &relay->relay_pipe, NULL, NULL, cbus_process);
 
-	/* Setup garbage collection trigger. */
-	struct trigger on_close_log = {
-		RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL
-	};
-	trigger_add(&r->on_close_log, &on_close_log);
-
 	/* Setup WAL watcher for sending new rows to the replica. */
 	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
 			relay_process_wal_event, cbus_process);
@@ -612,8 +517,6 @@ relay_subscribe_f(va_list ap)
 		vclock_copy(&relay->status_msg.vclock, send_vclock);
 		relay->status_msg.relay = relay;
 		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
-		/* Collect xlog files received by the replica. */
-		relay_schedule_pending_gc(relay, send_vclock);
 	}
 
 	/*
@@ -625,8 +528,7 @@ relay_subscribe_f(va_list ap)
 	diag_log();
 	say_crit("exiting the relay loop");
 
-	/* Clear garbage collector trigger and WAL watcher. */
-	trigger_clear(&on_close_log);
+	/* Clear WAL watcher. */
 	wal_clear_watcher(&relay->wal_watcher, cbus_process);
 
 	/* Join ack reader fiber. */
-- 
2.22.0

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] [PATCH 4/7] Replication: wal memory buffer
  2019-08-13  6:27 [tarantool-patches] [PATCH 0/7] Replication: In-memory replication Georgy Kirichenko
                   ` (2 preceding siblings ...)
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 3/7] Replication: Relay does not rely on xlog boundaries Georgy Kirichenko
@ 2019-08-13  6:27 ` Georgy Kirichenko
  2019-08-21 11:57   ` Vladimir Davydov
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 5/7] Replication: in memory replication Georgy Kirichenko
                   ` (3 subsequent siblings)
  7 siblings, 1 reply; 18+ messages in thread
From: Georgy Kirichenko @ 2019-08-13  6:27 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Introduce a wal memory buffer which contains logged transactions. Wal
writes all rows into the memory buffer and then flushes new data to disk.
Wal memory consist of rotated pairs of xrow header array and encoded xrow
data buffer.
---
 src/box/CMakeLists.txt |   1 +
 src/box/wal.c          |  38 ++++--
 src/box/wal_mem.c      | 273 +++++++++++++++++++++++++++++++++++++++++
 src/box/wal_mem.h      | 166 +++++++++++++++++++++++++
 src/box/xlog.c         |  77 +++++++++---
 src/box/xlog.h         |   9 ++
 6 files changed, 536 insertions(+), 28 deletions(-)
 create mode 100644 src/box/wal_mem.c
 create mode 100644 src/box/wal_mem.h

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 9bba37bcb..bd31b07df 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -125,6 +125,7 @@ add_library(box STATIC
     bind.c
     execute.c
     wal.c
+    wal_mem.c
     call.c
     merger.c
     ${lua_sources}
diff --git a/src/box/wal.c b/src/box/wal.c
index a09ab7187..6cdb0db15 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -44,6 +44,7 @@
 #include "coio_task.h"
 #include "replication.h"
 #include "gc.h"
+#include "wal_mem.h"
 
 enum {
 	/**
@@ -156,6 +157,8 @@ struct wal_writer
 	 * Used for replication relays.
 	 */
 	struct rlist watchers;
+	/** Wal memory buffer. */
+	struct wal_mem wal_mem;
 };
 
 enum wal_msg_type {
@@ -936,6 +939,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 	int64_t tsn = 0;
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
+		(*row)->tm = ev_now(loop());
 		if ((*row)->replica_id == 0) {
 			(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
 				      vclock_get(base, instance_id);
@@ -1027,25 +1031,37 @@ wal_write_batch(struct wal_writer *writer, struct wal_msg *wal_msg)
 	int rc;
 	struct journal_entry *entry;
 	struct stailq_entry *last_committed = NULL;
+	wal_mem_svp(&writer->wal_mem, &writer->vclock);
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
 		wal_assign_lsn(&vclock_diff, &writer->vclock,
 			       entry->rows, entry->rows + entry->n_rows);
 		entry->res = vclock_sum(&vclock_diff) +
 			     vclock_sum(&writer->vclock);
-		rc = xlog_write_entry(l, entry);
-		if (rc < 0)
+		if (wal_mem_write(&writer->wal_mem, entry->rows,
+				  entry->rows + entry->n_rows) < 0) {
+			wal_mem_svp_reset(&writer->wal_mem);
 			goto done;
-		if (rc > 0) {
-			writer->checkpoint_wal_size += rc;
-			last_committed = &entry->fifo;
-			vclock_merge(&writer->vclock, &vclock_diff);
 		}
-		/* rc == 0: the write is buffered in xlog_tx */
 	}
-	rc = xlog_flush(l);
-	if (rc < 0)
-		goto done;
 
+	struct iovec iov[SMALL_OBUF_IOV_MAX];
+	int iovcnt;
+	iovcnt = wal_mem_svp_data(&writer->wal_mem, iov);
+	xlog_tx_begin(l);
+	if (xlog_write_iov(l, iov, iovcnt,
+			   wal_mem_svp_row_count(&writer->wal_mem)) < 0) {
+		xlog_tx_rollback(l);
+		wal_mem_svp_reset(&writer->wal_mem);
+		goto done;
+	}
+	rc = xlog_tx_commit(l);
+	if (rc == 0)
+		/* Data is buffered but not yet flushed. */
+		rc = xlog_flush(l);
+	if (rc < 0) {
+		wal_mem_svp_reset(&writer->wal_mem);
+		goto done;
+	}
 	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
 	vclock_merge(&writer->vclock, &vclock_diff);
@@ -1147,6 +1163,7 @@ wal_cord_f(va_list ap)
 {
 	(void) ap;
 	struct wal_writer *writer = &wal_writer_singleton;
+	wal_mem_create(&writer->wal_mem);
 
 	/** Initialize eio in this thread */
 	coio_enable();
@@ -1195,6 +1212,7 @@ wal_cord_f(va_list ap)
 		xlog_close(&vy_log_writer.xlog, false);
 
 	cpipe_destroy(&writer->tx_prio_pipe);
+	wal_mem_destroy(&writer->wal_mem);
 	return 0;
 }
 
diff --git a/src/box/wal_mem.c b/src/box/wal_mem.c
new file mode 100644
index 000000000..fdfc6f93d
--- /dev/null
+++ b/src/box/wal_mem.c
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "wal_mem.h"
+
+#include "fiber.h"
+#include "errinj.h"
+
+enum {
+	/* Initial size for rows storage. */
+	WAL_MEM_BUF_INITIAL_ROW_COUNT = 4096,
+	/* Initial size for data storage. */
+	WAL_MEM_BUF_INITIAL_DATA_SIZE = 65536,
+	/* How many rows we will place in one buffer. */
+	WAL_MEM_BUF_ROWS_LIMIT = 8192,
+	/* How many data we will place in one buffer. */
+	WAL_MEM_BUF_DATA_LIMIT = 1 << 19,
+};
+
+void
+wal_mem_create(struct wal_mem *wal_mem)
+{
+	int i;
+	for (i = 0; i < WAL_MEM_BUF_COUNT; ++i) {
+		ibuf_create(&wal_mem->buf[i].rows, &cord()->slabc,
+			    WAL_MEM_BUF_INITIAL_ROW_COUNT *
+			    sizeof(struct xrow_header));
+		obuf_create(&wal_mem->buf[i].data, &cord()->slabc,
+			    WAL_MEM_BUF_INITIAL_DATA_SIZE);
+	}
+	wal_mem->last_buf_index = 0;
+	wal_mem->first_buf_index = 0;
+}
+
+void
+wal_mem_destroy(struct wal_mem *wal_mem)
+{
+	int i;
+	for (i = 0; i < WAL_MEM_BUF_COUNT; ++i) {
+		ibuf_destroy(&wal_mem->buf[i].rows);
+		obuf_destroy(&wal_mem->buf[i].data);
+	}
+}
+
+/*
+ * Switch to the next buffer if required and discard outdated data.
+ */
+static struct wal_mem_buf *
+wal_mem_rotate(struct wal_mem *wal_mem)
+{
+	struct wal_mem_buf *mem_buf = wal_mem->buf +
+				      wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+	if (ibuf_used(&mem_buf->rows) < WAL_MEM_BUF_ROWS_LIMIT *
+					sizeof(struct xrow_header) &&
+	    obuf_size(&mem_buf->data) < WAL_MEM_BUF_DATA_LIMIT)
+		return mem_buf;
+	/* Switch to the next buffer (a target to append new data). */
+	++wal_mem->last_buf_index;
+	mem_buf = wal_mem->buf + wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+	if (wal_mem->last_buf_index - wal_mem->first_buf_index <
+	    WAL_MEM_BUF_COUNT) {
+		/* The buffer is unused, nothing to do. */
+		return mem_buf;
+	}
+	/* Discard data and adjust first buffer index. */
+	ibuf_reset(&mem_buf->rows);
+	obuf_reset(&mem_buf->data);
+	++wal_mem->first_buf_index;
+	return mem_buf;
+}
+
+void
+wal_mem_svp(struct wal_mem *wal_mem, struct vclock *vclock)
+{
+	struct wal_mem_buf *mem_buf = wal_mem_rotate(wal_mem);
+	/* Check if the current buffer is empty and setup vclock. */
+	if (ibuf_used(&mem_buf->rows) == 0)
+		vclock_copy(&mem_buf->vclock, vclock);
+	wal_mem->tx_first_row_index = ibuf_used(&mem_buf->rows) /
+				      sizeof(struct wal_mem_buf_row);
+	wal_mem->tx_first_row_svp = obuf_create_svp(&mem_buf->data);
+}
+
+/* Commit a wal memory transaction and build an iovec with encoded data. */
+int
+wal_mem_svp_data(struct wal_mem *wal_mem, struct iovec *iovec)
+{
+	struct wal_mem_buf *mem_buf = wal_mem->buf +
+				      wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+	if (wal_mem->tx_first_row_svp.used == obuf_size(&mem_buf->data))
+		return 0;
+
+	int iov_cnt = 1 + obuf_iovcnt(&mem_buf->data) -
+		      wal_mem->tx_first_row_svp.pos;
+	memcpy(iovec, mem_buf->data.iov + wal_mem->tx_first_row_svp.pos,
+	       sizeof(struct iovec) * iov_cnt);
+	iovec[0].iov_base += wal_mem->tx_first_row_svp.iov_len;
+	iovec[0].iov_len -= wal_mem->tx_first_row_svp.iov_len;
+	return iov_cnt;
+}
+
+/* Truncate all the data written in the current transaction. */
+void
+wal_mem_svp_reset(struct wal_mem *wal_mem)
+{
+	struct wal_mem_buf *mem_buf = wal_mem->buf +
+				      wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+	mem_buf->rows.wpos = mem_buf->rows.rpos + wal_mem->tx_first_row_index *
+						  sizeof(struct wal_mem_buf_row);
+	obuf_rollback_to_svp(&mem_buf->data, &wal_mem->tx_first_row_svp);
+}
+
+int
+wal_mem_write(struct wal_mem *wal_mem, struct xrow_header **begin,
+	      struct xrow_header **end)
+{
+	struct wal_mem_buf *mem_buf = wal_mem->buf +
+				      wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+
+	/* Save rollback values. */
+	size_t old_rows_size = ibuf_used(&mem_buf->rows);
+	struct obuf_svp data_svp = obuf_create_svp(&mem_buf->data);
+
+	/* Allocate space for row descriptors. */
+	struct wal_mem_buf_row *mem_row =
+		(struct wal_mem_buf_row *)
+		ibuf_alloc(&mem_buf->rows, (end - begin) *
+					   sizeof(struct wal_mem_buf_row));
+	if (mem_row == NULL) {
+		diag_set(OutOfMemory, (end - begin) *
+				      sizeof(struct wal_mem_buf_row),
+			 "region", "wal memory rows");
+		goto error;
+	}
+	/* Append rows. */
+	struct xrow_header **row;
+	for (row = begin; row < end; ++row, ++mem_row) {
+		struct errinj *inj = errinj(ERRINJ_WAL_BREAK_LSN, ERRINJ_INT);
+		if (inj != NULL && inj->iparam == (*row)->lsn) {
+			(*row)->lsn = inj->iparam - 1;
+			say_warn("injected broken lsn: %lld",
+				 (long long) (*row)->lsn);
+		}
+
+		/* Reserve space. */
+		char *data = obuf_reserve(&mem_buf->data, xrow_approx_len(*row));
+		if (data == NULL) {
+			diag_set(OutOfMemory, xrow_approx_len(*row),
+				 "region", "wal memory data");
+			goto error;
+		}
+
+		struct iovec iov[XROW_BODY_IOVMAX];
+		/*
+		 * xrow_header_encode allocates fiber gc space only for row
+		 * header.
+		 */
+		int iov_cnt = xrow_header_encode(*row, 0, iov, 0);
+		if (iov_cnt < 0)
+			goto error;
+		/* Copy row header. */
+		data = obuf_alloc(&mem_buf->data, iov[0].iov_len);
+		memcpy(data, iov[0].iov_base, iov[0].iov_len);
+		/* Initialize row descriptor. */
+		mem_row->xrow = **row;
+		mem_row->data = data;
+		mem_row->size = iov[0].iov_len;
+		/* Write bodies and patch location. */
+		int i;
+		for (i = 1; i < iov_cnt; ++i) {
+			/* Append xrow bodies and patch xrow pointers. */
+			data = obuf_alloc(&mem_buf->data, iov[i].iov_len);
+			memcpy(data, iov[i].iov_base, iov[i].iov_len);
+			mem_row->xrow.body[i - 1].iov_base = data;
+			mem_row->size += iov[i].iov_len;
+		}
+	}
+	return 0;
+
+error:
+	/* Restore buffer state. */
+	mem_buf->rows.wpos = mem_buf->rows.rpos + old_rows_size;
+	obuf_rollback_to_svp(&mem_buf->data, &data_svp);
+	return -1;
+}
+
+int
+wal_mem_cursor_create(struct wal_mem *wal_mem,
+		      struct wal_mem_cursor *wal_mem_cursor,
+		      struct vclock *vclock)
+{
+	uint64_t buf_index;
+	for (buf_index = wal_mem->first_buf_index;
+	     buf_index <= wal_mem->last_buf_index;
+	     ++buf_index) {
+		struct wal_mem_buf *mem_buf = wal_mem->buf +
+				      wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+		int rc = vclock_compare(&mem_buf->vclock, vclock);
+		if (rc != 0 && rc != -1)
+			break;
+	}
+	if (buf_index == wal_mem->first_buf_index)
+		return -1;
+	wal_mem_cursor->buf_index = buf_index - 1;
+	wal_mem_cursor->row_index = 0;
+	return 0;
+}
+
+int
+wal_mem_cursor_next(struct wal_mem *wal_mem,
+		    struct wal_mem_cursor *wal_mem_cursor,
+		    struct xrow_header **row,
+		    void **data,
+		    size_t *size)
+{
+	if (wal_mem->first_buf_index > wal_mem_cursor->buf_index) {
+		/* Buffer was discarded. */
+		return -1;
+	}
+
+	struct wal_mem_buf *mem_buf;
+	size_t last_row_index;
+
+next_buffer:
+	mem_buf = wal_mem->buf +
+		  wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+	last_row_index = ibuf_used(&mem_buf->rows) /
+			 sizeof(struct wal_mem_buf_row);
+	if (last_row_index == wal_mem_cursor->row_index) {
+		/* No more rows in the current buffer. */
+		if (wal_mem->last_buf_index == wal_mem_cursor->buf_index)
+			/* No more rows in the memory. */
+			return 1;
+		wal_mem_cursor->row_index = 0;
+		++wal_mem_cursor->buf_index;
+		goto next_buffer;
+	}
+	struct wal_mem_buf_row *buf_row =
+		(struct wal_mem_buf_row *)mem_buf->rows.rpos +
+		wal_mem_cursor->row_index;
+	*row = &buf_row->xrow;
+	*data = buf_row->data;
+	*size = buf_row->size;
+	return 0;
+}
diff --git a/src/box/wal_mem.h b/src/box/wal_mem.h
new file mode 100644
index 000000000..d26d00157
--- /dev/null
+++ b/src/box/wal_mem.h
@@ -0,0 +1,166 @@
+#ifndef TARANTOOL_WAL_MEM_H_INCLUDED
+#define TARANTOOL_WAL_MEM_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <stdint.h>
+
+#include "small/ibuf.h"
+#include "small/obuf.h"
+#include "xrow.h"
+#include "vclock.h"
+
+enum {
+	/*
+	 * Wal memory object contains some count of rotating data buffers.
+	 * Estimated decrease in amount of stored row is about
+	 * 1/(COUNT OF BUFFERS). However the bigger value makes rotation
+	 * more frequent, the decrease would be smoother and size of
+	 * a wal memory more stable.
+	 */
+	WAL_MEM_BUF_COUNT = 8,
+};
+
+/*
+ * A wal memory row descriptor which contains decoded xrow header and
+ * encoded data pointer and size.
+ */
+struct wal_mem_buf_row {
+	/* Decoded xrow header. */
+	struct xrow_header xrow;
+	/* Pointer to the xrow encoded raw data. */
+	void *data;
+	/* xrow raw data size. */
+	size_t size;
+};
+
+/*
+ * Wal memory data buffer which contains
+ *  a vclock just before the first contained row,
+ *  an ibuf with row descriptors
+ *  an obuf with encoded data
+ */
+struct wal_mem_buf {
+	/* vclock just before the first row. */
+	struct vclock vclock;
+	/* A row descriptor array. */
+	struct ibuf rows;
+	/* Data storage for encoded row data. */
+	struct obuf data;
+};
+
+/*
+ * Wal memory contains WAL_MEM_BUF_COUNT wal memory buffers which are
+ * organized in a ring. In order to track Wal memory tracks the first and
+ * the last used buffers indexes (generation) and those indexes are not wrapped
+ * around the ring. Each rotation increases the last buffer index and
+ * each buffer discard increases the first buffer index. To evaluate effective
+ * index in an wal memory array a modulo operation (or mask) should be used.
+ */
+struct wal_mem {
+	/* An index of the first used buffer. */
+	uint64_t first_buf_index;
+	/* An index of the last used buffer. */
+	uint64_t last_buf_index;
+	/* A memory buffer array. */
+	struct wal_mem_buf buf[WAL_MEM_BUF_COUNT];
+	/* The first row index written in the current transaction. */
+	uint32_t tx_first_row_index;
+	/* The first row data svp written in the current transaction. */
+	struct obuf_svp tx_first_row_svp;
+};
+
+/* Create a wal memory. */
+void
+wal_mem_create(struct wal_mem *wal_mem);
+
+/* Destroy wal memory structure. */
+void
+wal_mem_destroy(struct wal_mem *wal_mem);
+
+/*
+ * Rotate a wal memory if required and save the current wal memory write
+ * position.
+ */
+void
+wal_mem_svp(struct wal_mem *wal_mem, struct vclock *vclock);
+
+/* Retrieve data after last svp. */
+int
+wal_mem_svp_data(struct wal_mem *wal_mem, struct iovec *iovec);
+
+/* Truncate all the data written after the last svp. */
+void
+wal_mem_svp_reset(struct wal_mem *wal_mem);
+
+/* Count of rows written since the last svp. */
+static inline int
+wal_mem_svp_row_count(struct wal_mem *wal_mem)
+{
+	struct wal_mem_buf *mem_buf = wal_mem->buf +
+				      wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+	return ibuf_used(&mem_buf->rows) / sizeof(struct wal_mem_buf_row) -
+	       wal_mem->tx_first_row_index;
+}
+
+/*
+ * Append xrow array to a wal memory. The array is placed into one
+ * wal memory buffer and each row takes a continuous space in a data buffer.
+ * continuously.
+ * Return
+ *  0 for Ok
+ *  -1 in case of error
+ */
+int
+wal_mem_write(struct wal_mem *wal_mem, struct xrow_header **begin,
+	      struct xrow_header **end);
+
+/* Wal memory cursor to track a position in a wal memory. */
+struct wal_mem_cursor {
+	/* Current memory buffer index. */
+	uint64_t buf_index;
+	/* Current row index. */
+	uint32_t row_index;
+};
+
+/* Create a wal memory cursor from the wal memory current position. */
+int
+wal_mem_cursor_create(struct wal_mem *wal_mem,
+		      struct wal_mem_cursor *wal_mem_cursor,
+		      struct vclock *vclock);
+
+int
+wal_mem_cursor_next(struct wal_mem *wal_mem,
+		    struct wal_mem_cursor *wal_mem_cursor,
+		    struct xrow_header **row,
+		    void **data,
+		    size_t *size);
+
+#endif /* TARANTOOL_WAL_MEM_H_INCLUDED */
diff --git a/src/box/xlog.c b/src/box/xlog.c
index 54a7b5246..363f93ef5 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -1267,14 +1267,8 @@ xlog_tx_write(struct xlog *log)
 	return written;
 }
 
-/*
- * Add a row to a log and possibly flush the log.
- *
- * @retval  -1 error, check diag.
- * @retval >=0 the number of bytes written to buffer.
- */
-ssize_t
-xlog_write_row(struct xlog *log, const struct xrow_header *packet)
+static int
+xlog_write_prepare(struct xlog *log)
 {
 	/*
 	 * Automatically reserve space for a fixheader when adding
@@ -1288,17 +1282,17 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
 			return -1;
 		}
 	}
+	return 0;
+}
 
+/*
+ * Append iov to a log internal buffer.
+ */
+static ssize_t
+xlog_writev(struct xlog *log, struct iovec *iov, int iovcnt)
+{
 	struct obuf_svp svp = obuf_create_svp(&log->obuf);
-	size_t page_offset = obuf_size(&log->obuf);
-	/** encode row into iovec */
-	struct iovec iov[XROW_IOVMAX];
-	/** don't write sync to the disk */
-	int iovcnt = xrow_header_encode(packet, 0, iov, 0);
-	if (iovcnt < 0) {
-		obuf_rollback_to_svp(&log->obuf, &svp);
-		return -1;
-	}
+	size_t old_size = obuf_size(&log->obuf);
 	for (int i = 0; i < iovcnt; ++i) {
 		struct errinj *inj = errinj(ERRINJ_WAL_WRITE_PARTIAL,
 					    ERRINJ_INT);
@@ -1317,10 +1311,33 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
 			return -1;
 		}
 	}
+	return obuf_size(&log->obuf) - old_size;
+}
+
+/*
+ * Add a row to a log and possibly flush the log.
+ *
+ * @retval  -1 error, check diag.
+ * @retval >=0 the number of bytes written to buffer.
+ */
+ssize_t
+xlog_write_row(struct xlog *log, const struct xrow_header *packet)
+{
+	if (xlog_write_prepare(log) != 0)
+		return -1;
+
+	/** encode row into iovec */
+	struct iovec iov[XROW_IOVMAX];
+	/** don't write sync to the disk */
+	int iovcnt = xrow_header_encode(packet, 0, iov, 0);
+	if (iovcnt < 0)
+		return -1;
 	assert(iovcnt <= XROW_IOVMAX);
+	ssize_t row_size = xlog_writev(log, iov, iovcnt);
+	if (row_size < 0)
+		return -1;
 	log->tx_rows++;
 
-	size_t row_size = obuf_size(&log->obuf) - page_offset;
 	if (log->is_autocommit &&
 	    obuf_size(&log->obuf) >= XLOG_TX_AUTOCOMMIT_THRESHOLD &&
 	    xlog_tx_write(log) < 0)
@@ -1329,6 +1346,30 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
 	return row_size;
 }
 
+/*
+ * Add an io vector to a log and possibly flush the log.
+ *
+ * @retval  -1 error, check diag.
+ * @retval >=0 the number of bytes written to buffer.
+ */
+ssize_t
+xlog_write_iov(struct xlog *log, struct iovec *iov, int iovcnt, int rows)
+{
+	if (xlog_write_prepare(log) != 0)
+		return -1;
+
+	ssize_t row_size = xlog_writev(log, iov, iovcnt);
+	if (row_size < 0)
+		return -1;
+	log->tx_rows += rows;
+
+	if (log->is_autocommit &&
+	    obuf_size(&log->obuf) >= XLOG_TX_AUTOCOMMIT_THRESHOLD &&
+	    xlog_tx_write(log) < 0)
+		return -1;
+
+	return row_size;
+}
 /**
  * Begin a multi-statement xlog transaction. All xrow objects
  * of a single transaction share the same header and checksum
diff --git a/src/box/xlog.h b/src/box/xlog.h
index 6a71e0cbd..713e14e2b 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -501,6 +501,15 @@ xlog_fallocate(struct xlog *log, size_t size);
 ssize_t
 xlog_write_row(struct xlog *log, const struct xrow_header *packet);
 
+/**
+ * Write a io vector to xlog,
+ *
+ * @retval count of writen bytes
+ * @retval -1 for error
+ */
+ssize_t
+xlog_write_iov(struct xlog *xlog, struct iovec *iov, int iovcnt, int rows);
+
 /**
  * Prevent xlog row buffer offloading, should be use
  * at transaction start to write transaction in one xlog tx
-- 
2.22.0

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] [PATCH 5/7] Replication: in memory replication
  2019-08-13  6:27 [tarantool-patches] [PATCH 0/7] Replication: In-memory replication Georgy Kirichenko
                   ` (3 preceding siblings ...)
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 4/7] Replication: wal memory buffer Georgy Kirichenko
@ 2019-08-13  6:27 ` Georgy Kirichenko
  2019-08-21 13:52   ` Vladimir Davydov
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 6/7] Refactoring: remove wal_watcher routines Georgy Kirichenko
                   ` (2 subsequent siblings)
  7 siblings, 1 reply; 18+ messages in thread
From: Georgy Kirichenko @ 2019-08-13  6:27 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Relay uses both xlog files and wal memory relaying to feed a replica
with transaction data. New workflow is to read xlog files until the last
one and then switch to wal memory. If relay is out of wal in-memory data
then relay returns to a file mode. A heartbeat is sent only while
in-memory mode from wal thread because there are always data in files.

Closes #3794
---
 src/box/gc.c      |   3 +
 src/box/gc.h      |   2 +
 src/box/relay.cc  | 161 +++++++++++++++++++++++-----------------------
 src/box/wal.c     | 154 +++++++++++++++++++++++++++++++++++++++++++-
 src/box/wal.h     |   6 ++
 src/box/wal_mem.c |  10 +--
 6 files changed, 249 insertions(+), 87 deletions(-)

diff --git a/src/box/gc.c b/src/box/gc.c
index 9771e407a..c36c2ace4 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -126,6 +126,8 @@ gc_init(const char *wal_dir_name)
 	xdir_scan(&gc.wal_dir);
 	gc.log_opened = false;
 	gc_tree_new(&gc.consumers);
+	xdir_create(&gc.xdir, wal_dir_name, XLOG, &INSTANCE_UUID,
+		    &xlog_opts_default);
 	fiber_cond_create(&gc.cleanup_cond);
 	checkpoint_schedule_cfg(&gc.checkpoint_schedule, 0, 0);
 
@@ -166,6 +168,7 @@ gc_free(void)
 		gc_consumer_delete(consumer);
 		consumer = next;
 	}
+	xdir_destroy(&gc.xdir);
 }
 
 /**
diff --git a/src/box/gc.h b/src/box/gc.h
index 9b38a0c06..f28b716b5 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -125,6 +125,8 @@ struct gc_state {
 	/** True if log is opened. */
 	bool log_opened;
 	/** Registered consumers, linked by gc_consumer::node. */
+	/** xdir to track wal files. */
+	struct xdir xdir;
 	gc_tree_t consumers;
 	/** Fiber responsible for periodic checkpointing. */
 	struct fiber *checkpoint_fiber;
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a1b841291..05fc0f691 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -84,8 +84,6 @@ struct relay {
 	struct replica *replica;
 	/** WAL event watcher. */
 	struct wal_watcher wal_watcher;
-	/** Relay reader cond. */
-	struct fiber_cond reader_cond;
 	/** Relay diagnostics. */
 	struct diag diag;
 	/** Vclock recieved from replica. */
@@ -116,6 +114,9 @@ struct relay {
 	/** Relay sync state. */
 	enum relay_state state;
 
+	struct vclock relay_vclock;
+	char *wal_dir;
+
 	struct {
 		/* Align to prevent false-sharing with tx thread */
 		alignas(CACHELINE_SIZE)
@@ -165,7 +166,6 @@ relay_new(struct replica *replica)
 		return NULL;
 	}
 	relay->replica = replica;
-	fiber_cond_create(&relay->reader_cond);
 	diag_create(&relay->diag);
 	stailq_create(&relay->pending_gc);
 	relay->state = RELAY_OFF;
@@ -215,7 +215,8 @@ relay_exit(struct relay *relay)
 	 * cursor, which must be closed in the same thread
 	 * that opened it (it uses cord's slab allocator).
 	 */
-	recovery_delete(relay->r);
+	if (relay->r != NULL)
+		recovery_delete(relay->r);
 	relay->r = NULL;
 }
 
@@ -239,7 +240,6 @@ relay_delete(struct relay *relay)
 {
 	if (relay->state == RELAY_FOLLOW)
 		relay_stop(relay);
-	fiber_cond_destroy(&relay->reader_cond);
 	diag_destroy(&relay->diag);
 	TRASH(relay);
 	free(relay);
@@ -361,31 +361,6 @@ relay_set_error(struct relay *relay, struct error *e)
 		diag_add_error(&relay->diag, e);
 }
 
-static void
-relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
-{
-	struct relay *relay = container_of(watcher, struct relay, wal_watcher);
-	if (fiber_is_cancelled()) {
-		/*
-		 * The relay is exiting. Rescanning the WAL at this
-		 * point would be pointless and even dangerous,
-		 * because the relay could have written a packet
-		 * fragment to the socket before being cancelled
-		 * so that writing another row to the socket would
-		 * lead to corrupted replication stream and, as
-		 * a result, permanent replication breakdown.
-		 */
-		return;
-	}
-	try {
-		recover_remaining_wals(relay->r, &relay->stream, NULL,
-				       (events & WAL_EVENT_ROTATE) != 0);
-	} catch (Exception *e) {
-		relay_set_error(relay, e);
-		fiber_cancel(fiber());
-	}
-}
-
 /*
  * Relay reader fiber function.
  * Read xrow encoded vclocks sent by the replica.
@@ -408,7 +383,24 @@ relay_reader_f(va_list ap)
 			/* vclock is followed while decoding, zeroing it. */
 			vclock_create(&relay->recv_vclock);
 			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
-			fiber_cond_signal(&relay->reader_cond);
+			if (relay->status_msg.msg.route != NULL)
+				continue;
+			struct vclock *send_vclock;
+			if (relay->version_id < version_id(1, 7, 4))
+				send_vclock = &relay->r->vclock;
+			else
+				send_vclock = &relay->recv_vclock;
+			if (vclock_sum(&relay->status_msg.vclock) ==
+			    vclock_sum(send_vclock))
+				continue;
+			static const struct cmsg_hop route[] = {
+				{tx_status_update, NULL}
+			};
+			cmsg_init(&relay->status_msg.msg, route);
+			vclock_copy(&relay->status_msg.vclock,
+				    send_vclock);
+			relay->status_msg.relay = relay;
+			cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
 		}
 	} catch (Exception *e) {
 		relay_set_error(relay, e);
@@ -434,6 +426,27 @@ relay_send_heartbeat(struct relay *relay)
 	}
 }
 
+static int
+relay_send_cb(struct xrow_header *row, void *data)
+{
+	try {
+		struct relay *relay = (struct relay *)data;
+		relay_send_row(&relay->stream, row);
+		return 0;
+	} catch (Exception *e) {
+		return -1;
+	}
+}
+
+static void
+relay_endpoint_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
+{
+	(void) loop;
+	(void) events;
+	struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data;
+	cbus_process(endpoint);
+}
+
 /**
  * A libev callback invoked when a relay client socket is ready
  * for read. This currently only happens when the client closes
@@ -443,21 +456,16 @@ static int
 relay_subscribe_f(va_list ap)
 {
 	struct relay *relay = va_arg(ap, struct relay *);
-	struct recovery *r = relay->r;
 
 	coio_enable();
 	relay_set_cord_name(relay->io.fd);
 
 	/* Create cpipe to tx for propagating vclock. */
 	cbus_endpoint_create(&relay->endpoint, tt_sprintf("relay_%p", relay),
-			     fiber_schedule_cb, fiber());
+			     relay_endpoint_cb, &relay->endpoint);
 	cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
 		  &relay->relay_pipe, NULL, NULL, cbus_process);
 
-	/* Setup WAL watcher for sending new rows to the replica. */
-	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
-			relay_process_wal_event, cbus_process);
-
 	/* Start fiber for receiving replica acks. */
 	char name[FIBER_NAME_MAX];
 	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
@@ -473,50 +481,39 @@ relay_subscribe_f(va_list ap)
 	 */
 	relay_send_heartbeat(relay);
 
-	/*
-	 * Run the event loop until the connection is broken
-	 * or an error occurs.
-	 */
 	while (!fiber_is_cancelled()) {
-		double timeout = replication_timeout;
-		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
-					    ERRINJ_DOUBLE);
-		if (inj != NULL && inj->dparam != 0)
-			timeout = inj->dparam;
-
-		fiber_cond_wait_deadline(&relay->reader_cond,
-					 relay->last_row_time + timeout);
-
 		/*
-		 * The fiber can be woken by IO cancel, by a timeout of
-		 * status messaging or by an acknowledge to status message.
-		 * Handle cbus messages first.
+		 * Start relaying data. First we recover all existing wal
+		 * files then try to continue data streaming from wal memory.
+		 *
+		 * NOTE: it looks more efficient if we try to relay from wal
+		 * first but it breaks tests behavior working around collected
+		 * logs. As it has only little impact we could fix it in
+		 * future.
 		 */
-		cbus_process(&relay->endpoint);
-		/* Check for a heartbeat timeout. */
-		if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
-			relay_send_heartbeat(relay);
+		try {
+			relay->r = recovery_new(relay->wal_dir, false,
+					        &relay->relay_vclock);
+			auto relay_guard = make_scoped_guard([&] {
+				recovery_delete(relay->r);
+				relay->r = NULL;
+			});
+			recover_remaining_wals(relay->r, &relay->stream,
+					       NULL, true);
+		} catch (Exception *e) {
+			e->log();
+			relay_set_error(relay, e);
+			break;
+		}
 		/*
-		 * Check that the vclock has been updated and the previous
-		 * status message is delivered
+		 * All known 'file' data were sent it is high time to
+		 * switch to wal memory relaying.
 		 */
-		if (relay->status_msg.msg.route != NULL)
-			continue;
-		struct vclock *send_vclock;
-		if (relay->version_id < version_id(1, 7, 4))
-			send_vclock = &r->vclock;
-		else
-			send_vclock = &relay->recv_vclock;
-		if (vclock_sum(&relay->status_msg.vclock) ==
-		    vclock_sum(send_vclock))
-			continue;
-		static const struct cmsg_hop route[] = {
-			{tx_status_update, NULL}
+		if (wal_relay(&relay->relay_vclock, relay_send_cb, relay,
+			  tt_sprintf("relay_%p", relay)) != 0) {
+			relay_set_error(relay, diag_last_error(&fiber()->diag));
+			break;
 		};
-		cmsg_init(&relay->status_msg.msg, route);
-		vclock_copy(&relay->status_msg.vclock, send_vclock);
-		relay->status_msg.relay = relay;
-		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
 	}
 
 	/*
@@ -528,9 +525,6 @@ relay_subscribe_f(va_list ap)
 	diag_log();
 	say_crit("exiting the relay loop");
 
-	/* Clear WAL watcher. */
-	wal_clear_watcher(&relay->wal_watcher, cbus_process);
-
 	/* Join ack reader fiber. */
 	fiber_cancel(reader);
 	fiber_join(reader);
@@ -557,6 +551,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	 * unless it has already been registered by initial
 	 * join.
 	 */
+	vclock_copy(&relay->relay_vclock, replica_clock);
 	if (replica->gc == NULL) {
 		replica->gc = gc_consumer_register(replica_clock, "replica %s",
 						   tt_uuid_str(&replica->uuid));
@@ -571,15 +566,16 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	});
 
 	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
-	relay->r = recovery_new(cfg_gets("wal_dir"), false,
-			        replica_clock);
+	relay->wal_dir = strdup(cfg_gets("wal_dir"));
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
+	relay->r = NULL;
 
 	int rc = cord_costart(&relay->cord, "subscribe",
 			      relay_subscribe_f, relay);
 	if (rc == 0)
 		rc = cord_cojoin(&relay->cord);
+	free(relay->wal_dir);
 	if (rc != 0)
 		diag_raise();
 }
@@ -616,7 +612,10 @@ static void
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
-	assert(iproto_type_is_dml(packet->type));
+	if (packet->type != IPROTO_OK) {
+		assert(iproto_type_is_dml(packet->type));
+		vclock_follow_xrow(&relay->relay_vclock, packet);
+	}
 	/*
 	 * Transform replica local requests to IPROTO_NOP so as to
 	 * promote vclock on the replica without actually modifying
diff --git a/src/box/wal.c b/src/box/wal.c
index 6cdb0db15..0457f3d46 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -159,6 +159,8 @@ struct wal_writer
 	struct rlist watchers;
 	/** Wal memory buffer. */
 	struct wal_mem wal_mem;
+	/** Wal memory condition. */
+	struct fiber_cond wal_mem_cond;
 };
 
 enum wal_msg_type {
@@ -1065,6 +1067,7 @@ wal_write_batch(struct wal_writer *writer, struct wal_msg *wal_msg)
 	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
 	vclock_merge(&writer->vclock, &vclock_diff);
+	fiber_cond_broadcast(&writer->wal_mem_cond);
 
 	/*
 	 * Notify TX if the checkpoint threshold has been exceeded.
@@ -1163,7 +1166,9 @@ wal_cord_f(va_list ap)
 {
 	(void) ap;
 	struct wal_writer *writer = &wal_writer_singleton;
+	fiber_cond_create(&writer->wal_mem_cond);
 	wal_mem_create(&writer->wal_mem);
+	wal_mem_svp(&writer->wal_mem, &writer->vclock);
 
 	/** Initialize eio in this thread */
 	coio_enable();
@@ -1457,7 +1462,8 @@ wal_set_watcher(struct wal_watcher *watcher, const char *name,
 		{ wal_watcher_notify_perform, &watcher->wal_pipe };
 	watcher->route[1] = (struct cmsg_hop)
 		{ wal_watcher_notify_complete, NULL };
-	cbus_pair("wal", name, &watcher->wal_pipe, &watcher->watcher_pipe,
+
+	  cbus_pair("wal", name, &watcher->wal_pipe, &watcher->watcher_pipe,
 		  wal_watcher_attach, watcher, process_cb);
 }
 
@@ -1494,3 +1500,149 @@ wal_atfork()
 	if (xlog_is_open(&vy_log_writer.xlog))
 		xlog_atfork(&vy_log_writer.xlog);
 }
+
+struct wal_relay_msg {
+	struct cmsg base;
+	struct cpipe wal_pipe;
+	struct cpipe relay_pipe;
+
+	struct vclock *vclock;
+	wal_relay_cb on_wal_relay;
+	void *cb_data;
+	struct fiber *fiber;
+	struct cmsg cancel_msg;
+	struct fiber_cond done_cond;
+	bool done;
+	int rc;
+	struct diag diag;
+};
+
+static void
+wal_relay_done(struct cmsg *base)
+{
+	struct wal_relay_msg *msg =
+		container_of(base, struct wal_relay_msg, base);
+	msg->done = true;
+	fiber_cond_signal(&msg->done_cond);
+}
+
+static int
+wal_relay_f(va_list ap)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct wal_relay_msg *msg = va_arg(ap, struct wal_relay_msg *);
+	struct vclock *vclock = msg->vclock;
+	wal_relay_cb on_wal_relay = msg->on_wal_relay;
+	void *cb_data = msg->cb_data;
+
+	double last_row_time = ev_monotonic_now(loop());
+
+	struct wal_mem_cursor cursor;
+	if (wal_mem_cursor_create(&writer->wal_mem, &cursor, vclock) != 0)
+		goto done;
+	while (!fiber_is_cancelled()) {
+		struct xrow_header *row;
+		void *data;
+		size_t size;
+		int rc = wal_mem_cursor_next(&writer->wal_mem, &cursor,
+					     &row, &data, &size);
+		if (rc < 0) {
+			/* Outdated cursor. */
+			break;
+		}
+		if (rc == 0 && vclock_get(vclock, row->replica_id) >= row->lsn)
+			continue;
+		if (rc > 0) {
+			double timeout = replication_timeout;
+			struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
+						    ERRINJ_DOUBLE);
+			if (inj != NULL && inj->dparam != 0)
+				timeout = inj->dparam;
+
+			/*
+			 * Nothing to send so wait for the next row
+			 * and send a hearth beat if timeout exceeded.
+			 */
+			fiber_cond_wait_deadline(&writer->wal_mem_cond,
+						 last_row_time + timeout);
+			if (fiber_is_cancelled())
+				break;
+			if (ev_monotonic_now(loop()) - last_row_time >
+			    timeout) {
+				struct xrow_header hearth_beat;
+				xrow_encode_timestamp(&hearth_beat, instance_id,
+						      ev_now(loop()));
+				row = &hearth_beat;
+			} else
+				continue;
+		}
+		last_row_time = ev_monotonic_now(loop());
+		if (on_wal_relay(row, cb_data) != 0) {
+			diag_move(&fiber()->diag, &msg->diag);
+			break;
+		}
+	}
+	static struct cmsg_hop done_route[] = {
+		{wal_relay_done, NULL}
+	};
+done:
+	cmsg_init(&msg->base, done_route);
+	cpipe_push(&msg->relay_pipe, &msg->base);
+	msg->fiber = NULL;
+	return 0;
+}
+
+static void
+wal_relay_attach(void *data)
+{
+	struct wal_relay_msg *msg = (struct wal_relay_msg *)data;
+	msg->fiber = fiber_new("wal relay fiber", wal_relay_f);
+	fiber_start(msg->fiber, msg);
+}
+
+static void
+wal_relay_cancel(struct cmsg *base)
+{
+	struct wal_relay_msg *msg = container_of(base, struct wal_relay_msg,
+						 cancel_msg);
+	if (msg->fiber != NULL)
+		fiber_cancel(msg->fiber);
+}
+
+int
+wal_relay(struct vclock *vclock, wal_relay_cb on_wal_relay, void *cb_data,
+	  const char *endpoint_name)
+{
+	struct wal_relay_msg wal_relay_msg;
+	wal_relay_msg.vclock = vclock;
+	wal_relay_msg.on_wal_relay = on_wal_relay;
+	wal_relay_msg.cb_data = cb_data;
+	diag_create(&wal_relay_msg.diag);
+	wal_relay_msg.cancel_msg.route = NULL;
+
+	fiber_cond_create(&wal_relay_msg.done_cond);
+	wal_relay_msg.done = false;
+
+	cbus_pair("wal", endpoint_name, &wal_relay_msg.wal_pipe,
+		  &wal_relay_msg.relay_pipe,
+		  wal_relay_attach, &wal_relay_msg, cbus_process);
+
+	while (!wal_relay_msg.done) {
+		if (fiber_is_cancelled() &&
+		    wal_relay_msg.cancel_msg.route == NULL) {
+			static struct cmsg_hop cancel_route[]= {
+				{wal_relay_cancel, NULL}};
+			cmsg_init(&wal_relay_msg.cancel_msg, cancel_route);
+			cpipe_push(&wal_relay_msg.wal_pipe, &wal_relay_msg.cancel_msg);
+		}
+		fiber_cond_wait(&wal_relay_msg.done_cond);
+	}
+
+	cbus_unpair(&wal_relay_msg.wal_pipe, &wal_relay_msg.relay_pipe,
+		    NULL, NULL, cbus_process);
+	if (!diag_is_empty(&wal_relay_msg.diag)) {
+		diag_move(&wal_relay_msg.diag, &fiber()->diag);
+		return -1;
+	}
+	return 0;
+}
diff --git a/src/box/wal.h b/src/box/wal.h
index 1a7156d97..bd298cebe 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -235,6 +235,12 @@ wal_write_vy_log(struct journal_entry *req);
 void
 wal_rotate_vy_log();
 
+typedef int (*wal_relay_cb)(struct xrow_header *header, void *data);
+
+int
+wal_relay(struct vclock *vclock, wal_relay_cb on_wal_relay, void *cb_data,
+	  const char *endpoint_name);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/wal_mem.c b/src/box/wal_mem.c
index fdfc6f93d..b01604d55 100644
--- a/src/box/wal_mem.c
+++ b/src/box/wal_mem.c
@@ -247,14 +247,13 @@ wal_mem_cursor_next(struct wal_mem *wal_mem,
 	}
 
 	struct wal_mem_buf *mem_buf;
-	size_t last_row_index;
 
 next_buffer:
 	mem_buf = wal_mem->buf +
-		  wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
-	last_row_index = ibuf_used(&mem_buf->rows) /
-			 sizeof(struct wal_mem_buf_row);
-	if (last_row_index == wal_mem_cursor->row_index) {
+		  wal_mem_cursor->buf_index % WAL_MEM_BUF_COUNT;
+	size_t buf_row_count = ibuf_used(&mem_buf->rows) /
+			       sizeof(struct wal_mem_buf_row);
+	if (buf_row_count == wal_mem_cursor->row_index) {
 		/* No more rows in the current buffer. */
 		if (wal_mem->last_buf_index == wal_mem_cursor->buf_index)
 			/* No more rows in the memory. */
@@ -269,5 +268,6 @@ next_buffer:
 	*row = &buf_row->xrow;
 	*data = buf_row->data;
 	*size = buf_row->size;
+	++wal_mem_cursor->row_index;
 	return 0;
 }
-- 
2.22.0

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] [PATCH 6/7] Refactoring: remove wal_watcher routines
  2019-08-13  6:27 [tarantool-patches] [PATCH 0/7] Replication: In-memory replication Georgy Kirichenko
                   ` (4 preceding siblings ...)
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 5/7] Replication: in memory replication Georgy Kirichenko
@ 2019-08-13  6:27 ` Georgy Kirichenko
  2019-08-21 13:52   ` Vladimir Davydov
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 7/7] Refactoring: get rid of on_close_log Georgy Kirichenko
  2019-08-16 13:47 ` [tarantool-patches] Re: [PATCH 0/7] Replication: In-memory replication Konstantin Osipov
  7 siblings, 1 reply; 18+ messages in thread
From: Georgy Kirichenko @ 2019-08-13  6:27 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

As relay uses wal memory replication there is no need in this facility
yet.
---
 src/box/relay.cc |   2 -
 src/box/wal.c    | 127 -----------------------------------------------
 src/box/wal.h    |  68 -------------------------
 3 files changed, 197 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 05fc0f691..1c2332878 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -82,8 +82,6 @@ struct relay {
 	struct vclock stop_vclock;
 	/** Remote replica */
 	struct replica *replica;
-	/** WAL event watcher. */
-	struct wal_watcher wal_watcher;
 	/** Relay diagnostics. */
 	struct diag diag;
 	/** Vclock recieved from replica. */
diff --git a/src/box/wal.c b/src/box/wal.c
index 0457f3d46..2b9a3c805 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -724,9 +724,6 @@ wal_set_checkpoint_threshold(int64_t threshold)
 	fiber_set_cancellable(cancellable);
 }
 
-static void
-wal_notify_watchers(struct wal_writer *writer, unsigned events);
-
 /**
  * If there is no current WAL, try to open it, and close the
  * previous WAL. We close the previous WAL only after opening
@@ -771,7 +768,6 @@ wal_opt_rotate(struct wal_writer *writer)
 	vclock_copy(&writer->prev_vclock, &writer->vclock);
 
 	wal_notify_log_action(writer, WAL_LOG_OPEN);
-	wal_notify_watchers(writer, WAL_EVENT_ROTATE);
 	return 0;
 }
 
@@ -1123,7 +1119,6 @@ done:
 		wal_writer_begin_rollback(writer);
 	}
 	fiber_gc();
-	wal_notify_watchers(writer, WAL_EVENT_WRITE);
 }
 
 /*
@@ -1364,128 +1359,6 @@ wal_rotate_vy_log()
 	fiber_set_cancellable(cancellable);
 }
 
-static void
-wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
-{
-	assert(!rlist_empty(&watcher->next));
-
-	struct wal_watcher_msg *msg = &watcher->msg;
-	if (msg->cmsg.route != NULL) {
-		/*
-		 * If the notification message is still en route,
-		 * mark the watcher to resend it as soon as it
-		 * returns to WAL so as not to lose any events.
-		 */
-		watcher->pending_events |= events;
-		return;
-	}
-
-	msg->events = events;
-	cmsg_init(&msg->cmsg, watcher->route);
-	cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
-}
-
-static void
-wal_watcher_notify_perform(struct cmsg *cmsg)
-{
-	struct wal_watcher_msg *msg = (struct wal_watcher_msg *) cmsg;
-	struct wal_watcher *watcher = msg->watcher;
-	unsigned events = msg->events;
-
-	watcher->cb(watcher, events);
-}
-
-static void
-wal_watcher_notify_complete(struct cmsg *cmsg)
-{
-	struct wal_watcher_msg *msg = (struct wal_watcher_msg *) cmsg;
-	struct wal_watcher *watcher = msg->watcher;
-
-	cmsg->route = NULL;
-
-	if (rlist_empty(&watcher->next)) {
-		/* The watcher is about to be destroyed. */
-		return;
-	}
-
-	if (watcher->pending_events != 0) {
-		/*
-		 * Resend the message if we got notified while
-		 * it was en route, see wal_watcher_notify().
-		 */
-		wal_watcher_notify(watcher, watcher->pending_events);
-		watcher->pending_events = 0;
-	}
-}
-
-static void
-wal_watcher_attach(void *arg)
-{
-	struct wal_watcher *watcher = (struct wal_watcher *) arg;
-	struct wal_writer *writer = &wal_writer_singleton;
-
-	assert(rlist_empty(&watcher->next));
-	rlist_add_tail_entry(&writer->watchers, watcher, next);
-
-	/*
-	 * Notify the watcher right after registering it
-	 * so that it can process existing WALs.
-	 */
-	wal_watcher_notify(watcher, WAL_EVENT_ROTATE);
-}
-
-static void
-wal_watcher_detach(void *arg)
-{
-	struct wal_watcher *watcher = (struct wal_watcher *) arg;
-
-	assert(!rlist_empty(&watcher->next));
-	rlist_del_entry(watcher, next);
-}
-
-void
-wal_set_watcher(struct wal_watcher *watcher, const char *name,
-		void (*watcher_cb)(struct wal_watcher *, unsigned events),
-		void (*process_cb)(struct cbus_endpoint *))
-{
-	assert(journal_is_initialized(&wal_writer_singleton.base));
-
-	rlist_create(&watcher->next);
-	watcher->cb = watcher_cb;
-	watcher->msg.watcher = watcher;
-	watcher->msg.events = 0;
-	watcher->msg.cmsg.route = NULL;
-	watcher->pending_events = 0;
-
-	assert(lengthof(watcher->route) == 2);
-	watcher->route[0] = (struct cmsg_hop)
-		{ wal_watcher_notify_perform, &watcher->wal_pipe };
-	watcher->route[1] = (struct cmsg_hop)
-		{ wal_watcher_notify_complete, NULL };
-
-	  cbus_pair("wal", name, &watcher->wal_pipe, &watcher->watcher_pipe,
-		  wal_watcher_attach, watcher, process_cb);
-}
-
-void
-wal_clear_watcher(struct wal_watcher *watcher,
-		  void (*process_cb)(struct cbus_endpoint *))
-{
-	assert(journal_is_initialized(&wal_writer_singleton.base));
-
-	cbus_unpair(&watcher->wal_pipe, &watcher->watcher_pipe,
-		    wal_watcher_detach, watcher, process_cb);
-}
-
-static void
-wal_notify_watchers(struct wal_writer *writer, unsigned events)
-{
-	struct wal_watcher *watcher;
-	rlist_foreach_entry(watcher, &writer->watchers, next)
-		wal_watcher_notify(watcher, events);
-}
-
-
 /**
  * After fork, the WAL writer thread disappears.
  * Make sure that atexit() handlers in the child do
diff --git a/src/box/wal.h b/src/box/wal.h
index bd298cebe..ab24a93cb 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -91,12 +91,6 @@ wal_enable(void);
 void
 wal_free(void);
 
-struct wal_watcher_msg {
-	struct cmsg cmsg;
-	struct wal_watcher *watcher;
-	unsigned events;
-};
-
 enum wal_event {
 	/** A row is written to the current WAL. */
 	WAL_EVENT_WRITE		= (1 << 0),
@@ -104,68 +98,6 @@ enum wal_event {
 	WAL_EVENT_ROTATE	= (1 << 1),
 };
 
-struct wal_watcher {
-	/** Link in wal_writer::watchers. */
-	struct rlist next;
-	/** The watcher callback function. */
-	void (*cb)(struct wal_watcher *, unsigned events);
-	/** Pipe from the watcher to WAL. */
-	struct cpipe wal_pipe;
-	/** Pipe from WAL to the watcher. */
-	struct cpipe watcher_pipe;
-	/** Cbus route used for notifying the watcher. */
-	struct cmsg_hop route[2];
-	/** Message sent to notify the watcher. */
-	struct wal_watcher_msg msg;
-	/**
-	 * Bit mask of WAL events that happened while
-	 * the notification message was en route.
-	 * It indicates that the message must be resend
-	 * right upon returning to WAL.
-	 */
-	unsigned pending_events;
-};
-
-/**
- * Subscribe to WAL events.
- *
- * The caller will receive a notification after a WAL write with
- * unspecified but reasonable latency. The first notification is
- * sent right after registering the watcher so that the caller
- * can process WALs written before the function was called.
- *
- * Note WAL notifications are delivered via cbus hence the caller
- * must have set up the cbus endpoint and started the event loop.
- * Alternatively, one can pass a callback invoking cbus_process()
- * to this function.
- *
- * @param watcher     WAL watcher to register.
- * @param name        Name of the cbus endpoint at the caller's cord.
- * @param watcher_cb  Callback to invoke from the caller's cord
- *                    upon receiving a WAL event. Apart from the
- *                    watcher itself, it takes a bit mask of events.
- *                    Events are described in wal_event enum.
- * @param process_cb  Function called to process cbus messages
- *                    while the watcher is being attached or NULL
- *                    if the cbus loop is running elsewhere.
- */
-void
-wal_set_watcher(struct wal_watcher *watcher, const char *name,
-		void (*watcher_cb)(struct wal_watcher *, unsigned events),
-		void (*process_cb)(struct cbus_endpoint *));
-
-/**
- * Unsubscribe from WAL events.
- *
- * @param watcher     WAL watcher to unregister.
- * @param process_cb  Function invoked to process cbus messages
- *                    while the watcher is being detached or NULL
- *                    if the cbus loop is running elsewhere.
- */
-void
-wal_clear_watcher(struct wal_watcher *watcher,
-		  void (*process_cb)(struct cbus_endpoint *));
-
 void
 wal_atfork();
 
-- 
2.22.0

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] [PATCH 7/7] Refactoring: get rid of on_close_log
  2019-08-13  6:27 [tarantool-patches] [PATCH 0/7] Replication: In-memory replication Georgy Kirichenko
                   ` (5 preceding siblings ...)
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 6/7] Refactoring: remove wal_watcher routines Georgy Kirichenko
@ 2019-08-13  6:27 ` Georgy Kirichenko
  2019-08-21 13:52   ` Vladimir Davydov
  2019-08-16 13:47 ` [tarantool-patches] Re: [PATCH 0/7] Replication: In-memory replication Konstantin Osipov
  7 siblings, 1 reply; 18+ messages in thread
From: Georgy Kirichenko @ 2019-08-13  6:27 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

We do not use this one
---
 src/box/recovery.cc | 3 ---
 src/box/recovery.h  | 2 --
 2 files changed, 5 deletions(-)

diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index 45c4f6820..fa0426726 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -111,7 +111,6 @@ recovery_new(const char *wal_dirname, bool force_recovery,
 	xdir_check_xc(&r->wal_dir);
 
 	r->watcher = NULL;
-	rlist_create(&r->on_close_log);
 
 	guard.is_active = false;
 	return r;
@@ -151,7 +150,6 @@ recovery_close_log(struct recovery *r)
 			 r->cursor.name);
 	}
 	xlog_cursor_close(&r->cursor, false);
-	trigger_run_xc(&r->on_close_log, NULL);
 }
 
 static void
@@ -213,7 +211,6 @@ recovery_delete(struct recovery *r)
 {
 	recovery_stop_local(r);
 
-	trigger_destroy(&r->on_close_log);
 	xdir_destroy(&r->wal_dir);
 	if (xlog_cursor_is_open(&r->cursor)) {
 		/*
diff --git a/src/box/recovery.h b/src/box/recovery.h
index 2a03cfc2f..1026507c2 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -56,8 +56,6 @@ struct recovery {
 	 * them locally.
 	 */
 	struct fiber *watcher;
-	/** List of triggers invoked when the current WAL is closed. */
-	struct rlist on_close_log;
 };
 
 struct recovery *
-- 
2.22.0

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] Re: [PATCH 0/7] Replication: In-memory replication
  2019-08-13  6:27 [tarantool-patches] [PATCH 0/7] Replication: In-memory replication Georgy Kirichenko
                   ` (6 preceding siblings ...)
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 7/7] Refactoring: get rid of on_close_log Georgy Kirichenko
@ 2019-08-16 13:47 ` Konstantin Osipov
  7 siblings, 0 replies; 18+ messages in thread
From: Konstantin Osipov @ 2019-08-16 13:47 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/08/13 10:30]:

Georgy,

Could you please benchmark this? Does latency improve? How does
this patch change CPU utilization with intensive replication, 1,
2, 3 replicas?

Results could be quite good (I hope).


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] Re: [PATCH 1/7] Refactoring: wal writer fiber and queue
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 1/7] Refactoring: wal writer fiber and queue Georgy Kirichenko
@ 2019-08-16 13:53   ` Konstantin Osipov
  2019-08-20 10:57     ` Георгий Кириченко
  2019-08-21 10:18   ` [tarantool-patches] " Vladimir Davydov
  1 sibling, 1 reply; 18+ messages in thread
From: Konstantin Osipov @ 2019-08-16 13:53 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/08/13 10:30]:
> As wal processes all writes in a cbus loop fiber it isn't possible to
> yield while write. The patch introduces a wal write queue and a wal write
> fiber which fetch a batch from queue and writes it out. Also checkpoint
> requests are going now throw the queue to synchronize a tx checkpoint
though 
> status with wal.

I don't understand this patch.

First, WAL already runs in a fiber, and you can yield.

cord_costart creates a new fiber, please check out the implementation.

this is done by this commit:

    commit f4625e64eb99c17910d3f0bcd323e5d82b6d5b31
    Author: Vladimir Davydov <vdavydov.dev@gmail.com>
    Date:   Wed Jul 4 20:20:55 2018 +0300

    vinyl: use cbus for communication between scheduler and worker threads

    We need cbus for forwarding deferred DELETE statements generated in a
    worker thread during primary index compaction to the tx thread where
    they can be inserted into secondary indexes. Since pthread mutex/cond
    and cbus are incompatible by their nature, let's rework communication
    channel between the tx and worker threads using cbus.

    Needed for #2129

Check out the commit, it changes cord_start() to cord_costart().


Second, checkpoint message already flushed the queue. So I don't
understand why you need a switch over message type.

Let's discuss online.


-- 
Konstantin Osipov, Moscow, Russia

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] Re: [PATCH 1/7] Refactoring: wal writer fiber and queue
  2019-08-16 13:53   ` [tarantool-patches] " Konstantin Osipov
@ 2019-08-20 10:57     ` Георгий Кириченко
  0 siblings, 0 replies; 18+ messages in thread
From: Георгий Кириченко @ 2019-08-20 10:57 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 1697 bytes --]

On Friday, August 16, 2019 4:53:32 PM MSK Konstantin Osipov wrote:
> * Georgy Kirichenko <georgy@tarantool.org> [19/08/13 10:30]:
> > As wal processes all writes in a cbus loop fiber it isn't possible to
> > yield while write. The patch introduces a wal write queue and a wal write
> > fiber which fetch a batch from queue and writes it out. Also checkpoint
> > requests are going now throw the queue to synchronize a tx checkpoint
> 
> though
> 
> > status with wal.
> 
> I don't understand this patch.
> 
> First, WAL already runs in a fiber, and you can yield.
wal could not yield because the same fiber processes cbus events. So there is 
no chance to wake this fiber up because there is no running fiber to process 
cbus input.
> 
> cord_costart creates a new fiber, please check out the implementation.
> 
> this is done by this commit:
> 
>     commit f4625e64eb99c17910d3f0bcd323e5d82b6d5b31
>     Author: Vladimir Davydov <vdavydov.dev@gmail.com>
>     Date:   Wed Jul 4 20:20:55 2018 +0300
> 
>     vinyl: use cbus for communication between scheduler and worker threads
> 
>     We need cbus for forwarding deferred DELETE statements generated in a
>     worker thread during primary index compaction to the tx thread where
>     they can be inserted into secondary indexes. Since pthread mutex/cond
>     and cbus are incompatible by their nature, let's rework communication
>     channel between the tx and worker threads using cbus.
> 
>     Needed for #2129
> 
> Check out the commit, it changes cord_start() to cord_costart().
> 
> 
> Second, checkpoint message already flushed the queue. So I don't
> understand why you need a switch over message type.
> 
> Let's discuss online.


[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH 1/7] Refactoring: wal writer fiber and queue
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 1/7] Refactoring: wal writer fiber and queue Georgy Kirichenko
  2019-08-16 13:53   ` [tarantool-patches] " Konstantin Osipov
@ 2019-08-21 10:18   ` Vladimir Davydov
  1 sibling, 0 replies; 18+ messages in thread
From: Vladimir Davydov @ 2019-08-21 10:18 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Aug 13, 2019 at 09:27:39AM +0300, Georgy Kirichenko wrote:
> As wal processes all writes in a cbus loop fiber it isn't possible to
> yield while write. The patch introduces a wal write queue and a wal write
> fiber which fetch a batch from queue and writes it out. Also checkpoint
> requests are going now throw the queue to synchronize a tx checkpoint
> status with wal.
> 
> This patch enables to put all garbage state into one gc object living in
> tx cord and to asl gc to free space from wal in case of no space
> error.
> ---
>  src/box/wal.c | 187 +++++++++++++++++++++++++++++++++++++++-----------
>  1 file changed, 146 insertions(+), 41 deletions(-)
> 
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 58a58e5b5..5d8dcc4f7 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -92,6 +92,10 @@ struct wal_writer
>  	/** A memory pool for messages. */
>  	struct mempool msg_pool;
>  	/* ----------------- wal ------------------- */
> +	/** A write queue. */
> +	struct stailq write_queue;
> +	/** A write queue condition. */
> +	struct fiber_cond write_cond;
>  	/** A setting from instance configuration - rows_per_wal */
>  	int64_t wal_max_rows;
>  	/** A setting from instance configuration - wal_max_size */
> @@ -158,19 +162,40 @@ struct wal_writer
>  	struct rlist watchers;
>  };
>  
> +enum wal_msg_type {
> +	WAL_MSG_WRITE = 0,
> +	WAL_MSG_CHECKPOINT = 1
> +};
> +
>  struct wal_msg {
>  	struct cmsg base;
> -	/** Approximate size of this request when encoded. */
> -	size_t approx_len;
> -	/** Input queue, on output contains all committed requests. */
> -	struct stailq commit;
> -	/**
> -	 * In case of rollback, contains the requests which must
> -	 * be rolled back.
> -	 */
> -	struct stailq rollback;
> -	/** vclock after the batch processed. */
> -	struct vclock vclock;
> +	/** A link to a wal writer queue. */
> +	struct stailq_entry in_queue;
> +	/** Wal messgae type. */
> +	enum wal_msg_type type;
> +	union {
> +		struct {
> +			/** Approximate size of this request when encoded. */
> +			size_t approx_len;
> +			/** Input queue, on output contains all committed requests. */
> +			struct stailq commit;
> +			/**
> +			 * In case of rollback, contains the requests which must
> +			 * be rolled back.
> +			 */
> +			struct stailq rollback;
> +			/** vclock after the batch processed. */
> +			struct vclock vclock;
> +		};
> +		struct {
> +			/** A checkpoint structure. */
> +			struct wal_checkpoint *checkpoint;
> +			/** Fiber issued the batch. */
> +			struct fiber *fiber;
> +			/** return code. */
> +			int *rc;
> +		};
> +	};
>  };

I'd introduce a base message class rather than using a union + enum.
Would look more readable that way IMHO:

	struct wal_msg {
		struct cmsg base;
		void (*process)(struct wal_msg *);
		...
	};

	struct wal_write_msg {
		struct wal_msg base;
		...
	};

	struct wal_checkpoint_msg {
		struct wal_msg base;
		...
	};

However, I must admit that this nesting of messages does look kinda
ugly. We have cbus message callbacks and now we effectively introduce
wal message callbacks (using a function pointer or a enum, doesn't
matter).

What we want to do here is notify TX of ENOSPC so that it can delete
old WAL files. Until old WAL files are deleted, the WAL must stop
processing any requests. Is there a way to achieve that without
introducing a new fiber? I guess we could instead introduce a separate
queue and append all write messages to it until we receive a message
from the TX thread that it's done removing old WAL files.

> @@ -271,18 +301,22 @@ tx_schedule_commit(struct cmsg *msg)
>  {
>  	struct wal_writer *writer = &wal_writer_singleton;
>  	struct wal_msg *batch = (struct wal_msg *) msg;
> -	/*
> -	 * Move the rollback list to the writer first, since
> -	 * wal_msg memory disappears after the first
> -	 * iteration of tx_schedule_queue loop.
> -	 */
> -	if (! stailq_empty(&batch->rollback)) {
> -		/* Closes the input valve. */
> -		stailq_concat(&writer->rollback, &batch->rollback);
> +	if (batch->type == WAL_MSG_WRITE) {
> +		/*
> +		 * Move the rollback list to the writer first, since
> +		 * wal_msg memory disappears after the first
> +		 * iteration of tx_schedule_queue loop.
> +		 */
> +		if (! stailq_empty(&batch->rollback)) {
> +			/* Closes the input valve. */
> +			stailq_concat(&writer->rollback, &batch->rollback);
> +		}
> +		/* Update the tx vclock to the latest written by wal. */
> +		vclock_copy(&replicaset.vclock, &batch->vclock);
> +		tx_schedule_queue(&batch->commit);
> +	} else {
> +		fiber_wakeup(batch->fiber);

Yeah, this branching dependent on the message type looks convoluted.
While we have just two kinds of messages, it's fine, but should we add
another one, and the code would turn into a mess. Callbacks would look
better IMHO.

>  	}
> -	/* Update the tx vclock to the latest written by wal. */
> -	vclock_copy(&replicaset.vclock, &batch->vclock);
> -	tx_schedule_queue(&batch->commit);
>  	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
>  }
>  
> @@ -922,10 +975,18 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>  }
>  
>  static void
> -wal_write_to_disk(struct cmsg *msg)
> +wal_write_to_disk(struct cmsg *base)
>  {
>  	struct wal_writer *writer = &wal_writer_singleton;
> -	struct wal_msg *wal_msg = (struct wal_msg *) msg;
> +	struct wal_msg *wal_msg = container_of(base, struct wal_msg, base);
> +	if (stailq_empty(&writer->write_queue))
> +		fiber_cond_signal(&writer->write_cond);
> +	stailq_add_tail(&writer->write_queue, &wal_msg->in_queue);

The name of this function doesn't reflect what it does anymore.

> +}
> +
> +static void
> +wal_write_batch(struct wal_writer *writer, struct wal_msg *wal_msg)
> +{
>  	struct error *error;
>  
>  	/*

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH 2/7] Refactoring: Track wal files using gc state.
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 2/7] Refactoring: Track wal files using gc state Georgy Kirichenko
@ 2019-08-21 10:44   ` Vladimir Davydov
  0 siblings, 0 replies; 18+ messages in thread
From: Vladimir Davydov @ 2019-08-21 10:44 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Aug 13, 2019 at 09:27:40AM +0300, Georgy Kirichenko wrote:
> Move wal files tracking from wal into gc struct because current garbage
> collection was complicated and illogical:
> 1. wal tracks xlogs ->
>    relay catches on_close_log and sends events to gc ->
>    gc tracks the least relay signature and sends vclock to wal
>    wal collect logs
>    gc collects checkpoints independently
> 2. in case of no space errors wal collects logs (making some checkpoints
>      pointless) ->
>    wal notifies gc
>    gc deactivates outdated relays (this is a BUG because relay may catch
>    the currenct state after deactivating)
> This flow does not allow in memory replication because relay would not
> have xlog boundaries yet.
> The flow after the patch is more straightforward:
>    wal informs gc about log open/close events
>    gc tracks both checkpoints and logs
>    relay catches on_close_log and send events to gc (this will be
>    changed after in memroy replication patch)
>    wal requests gc to free space in case of no space error
>    gc could consistently free checkpoints and logs.
> 
> As relay notifies tx about ACK state changes gc would be able to track
> the oldest used used wal and perform garbage collection as well as relay
> deactivating.

Overall, I feel like it must be a responsibility of the WAL thread to
delete old files, because the TX thread deals with a stream of rows, it
doesn't care how this stream is split into files or whether it's split
at all. For example, we could use a ring buffer to store WAL records
rather than a bunch of files. Moving garbage collection to TX will make
any such transformation difficult to achieve. I'm inclined to think that
instead of moving WAL file tracking and removal from the WAL thread to
the TX thread, we should move garbage collection infrastructure to the
WAL thread. Judging by the following patches, there's a link between
a relay and a WAL anyway so perhaps we could use it for updating the
garbage collector state right in the WAL? Just a thought, not insisting.

> diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
> index 59ad16823..9b3f2233e 100644
> --- a/src/box/memtx_engine.c
> +++ b/src/box/memtx_engine.c
> @@ -568,7 +568,7 @@ checkpoint_f(va_list ap)
>  	}
>  
>  	struct xlog snap;
> -	if (xdir_create_xlog(&ckpt->dir, &snap, &ckpt->vclock) != 0)
> +	if (xdir_create_xlog(&ckpt->dir, &snap, &ckpt->vclock, NULL) != 0)
>  		return -1;
>  
>  	say_info("saving snapshot `%s'", snap.filename);
> diff --git a/src/box/recovery.cc b/src/box/recovery.cc
> index d122d618a..45c4f6820 100644
> --- a/src/box/recovery.cc
> +++ b/src/box/recovery.cc
> @@ -118,22 +118,17 @@ recovery_new(const char *wal_dirname, bool force_recovery,
>  }
>  
>  void
> -recovery_scan(struct recovery *r, struct vclock *end_vclock,
> -	      struct vclock *gc_vclock)
> +recovery_scan(struct recovery *r, struct vclock *end_vclock)
>  {
>  	xdir_scan_xc(&r->wal_dir);
>  
>  	if (xdir_last_vclock(&r->wal_dir, end_vclock) < 0 ||
>  	    vclock_compare(end_vclock, &r->vclock) < 0) {
>  		/* No xlogs after last checkpoint. */
> -		vclock_copy(gc_vclock, &r->vclock);
>  		vclock_copy(end_vclock, &r->vclock);
>  		return;
>  	}
>  
> -	if (xdir_first_vclock(&r->wal_dir, gc_vclock) < 0)
> -		unreachable();
> -
>  	/* Scan the last xlog to find end vclock. */
>  	struct xlog_cursor cursor;
>  	if (xdir_open_cursor(&r->wal_dir, vclock_sum(end_vclock), &cursor) != 0)
> diff --git a/src/box/recovery.h b/src/box/recovery.h
> index 6e68abc0b..2a03cfc2f 100644
> --- a/src/box/recovery.h
> +++ b/src/box/recovery.h
> @@ -75,8 +75,7 @@ recovery_delete(struct recovery *r);
>   * WAL directory.
>   */
>  void
> -recovery_scan(struct recovery *r,  struct vclock *end_vclock,
> -	      struct vclock *gc_vclock);
> +recovery_scan(struct recovery *r, struct vclock *end_vclock);
>  
>  void
>  recovery_follow_local(struct recovery *r, struct xstream *stream,
> diff --git a/src/box/vy_log.c b/src/box/vy_log.c
> index cb291f3c8..d97eff9b8 100644
> --- a/src/box/vy_log.c
> +++ b/src/box/vy_log.c
> @@ -949,7 +949,7 @@ vy_log_open(struct xlog *xlog)
>  	}
>  
>  	if (xdir_create_xlog(&vy_log.dir, xlog,
> -			     &vy_log.last_checkpoint) < 0)
> +			     &vy_log.last_checkpoint, NULL) < 0)
>  		goto fail;
>  
>  	struct xrow_header row;
> @@ -2585,7 +2585,7 @@ vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery)
>  	rlist_foreach_entry(lsm, &recovery->lsms, in_recovery) {
>  		/* Create the log file on the first write. */
>  		if (!xlog_is_open(&xlog) &&
> -		    xdir_create_xlog(&vy_log.dir, &xlog, vclock) != 0)
> +		    xdir_create_xlog(&vy_log.dir, &xlog, vclock, NULL) != 0)
>  			goto err_create_xlog;
>  
>  		if (vy_log_append_lsm(&xlog, lsm) != 0)
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 5d8dcc4f7..a09ab7187 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -801,20 +759,69 @@ wal_opt_rotate(struct wal_writer *writer)
>  		return 0;
>  
>  	if (xdir_create_xlog(&writer->wal_dir, &writer->current_wal,
> -			     &writer->vclock) != 0) {
> +			     &writer->vclock, &writer->prev_vclock) != 0) {
>  		diag_log();
>  		return -1;
>  	}
> -	/*
> -	 * Keep track of the new WAL vclock. Required for garbage
> -	 * collection, see wal_collect_garbage().
> -	 */
> -	xdir_add_vclock(&writer->wal_dir, &writer->vclock);
> +	vclock_copy(&writer->prev_vclock, &writer->vclock);
>  
> +	wal_notify_log_action(writer, WAL_LOG_OPEN);
>  	wal_notify_watchers(writer, WAL_EVENT_ROTATE);
>  	return 0;
>  }
>  
> +struct gc_force_wal_msg {
> +	struct cmsg base;
> +	struct fiber_cond done_cond;
> +	bool done;
> +	int rc;
> +};
> +
> +static void
> +wal_gc_wal_force_done(struct cmsg *base)
> +{
> +	struct gc_force_wal_msg *msg = container_of(base,
> +						   struct gc_force_wal_msg,
> +						   base);
> +	msg->done = true;
> +	fiber_cond_signal(&msg->done_cond);
> +}
> +
> +static int
> +tx_gc_force_wal_f(va_list ap)
> +{
> +	struct wal_writer *writer = &wal_writer_singleton;
> +	struct gc_force_wal_msg *msg = va_arg(ap, struct gc_force_wal_msg *);
> +	static struct cmsg_hop respond_route[] = {
> +		{wal_gc_wal_force_done, NULL}
> +	};
> +	msg->rc = gc_force_wal_cleanup();

The idea was to avoid direct calls from WAL to gc, because those are
different subsystems and WAL is a more low-level one. Using callbacks
allowed to avoid a dependency loop. Now there are still callbacks, but
certain methods are called directly. We should either remove callbacks
altogether turning this code into a monolith or add another callback,
I guess.

> +	cmsg_init(&msg->base, respond_route);
> +	cpipe_push(&writer->wal_pipe, &msg->base);
> +	return 0;
> +}
> +
> +void
> +tx_gc_wal_force(struct cmsg *base)
> +{
> +	struct wal_writer *writer = &wal_writer_singleton;
> +	struct gc_force_wal_msg *msg = container_of(base,
> +						   struct gc_force_wal_msg,
> +						   base);
> +	struct fiber *gc_fiber = fiber_new("wal_gc_fiber", tx_gc_force_wal_f);
> +	if (gc_fiber == NULL) {
> +		struct cmsg_hop respond_route[] = {
> +			{wal_gc_wal_force_done, NULL}
> +		};
> +		msg->rc = -1;
> +		cmsg_init(&msg->base, respond_route);
> +		cpipe_push(&writer->wal_pipe, &msg->base);
> +		return;
> +	}
> +	fiber_start(gc_fiber, msg);
> +	return;
> +}
> +
>  /**
>   * Make sure there's enough disk space to append @len bytes
>   * of data to the current WAL.
> @@ -825,17 +832,10 @@ wal_opt_rotate(struct wal_writer *writer)
>  static int
>  wal_fallocate(struct wal_writer *writer, size_t len)
>  {
> -	bool warn_no_space = true, notify_gc = false;
>  	struct xlog *l = &writer->current_wal;
>  	struct errinj *errinj = errinj(ERRINJ_WAL_FALLOCATE, ERRINJ_INT);
>  	int rc = 0;
>  
> -	/*
> -	 * Max LSN that can be collected in case of ENOSPC -
> -	 * we must not delete WALs necessary for recovery.
> -	 */
> -	int64_t gc_lsn = vclock_sum(&writer->checkpoint_vclock);
> -
>  	/*
>  	 * The actual write size can be greater than the sum size
>  	 * of encoded rows (compression, fixheaders). Double the
> @@ -856,45 +856,23 @@ retry:
>  	}
>  	if (errno != ENOSPC)
>  		goto error;
> -	if (!xdir_has_garbage(&writer->wal_dir, gc_lsn))
> -		goto error;
> -
> -	if (warn_no_space) {
> -		say_crit("ran out of disk space, try to delete old WAL files");
> -		warn_no_space = false;
> -	}

I think we shouldn't remove this warning.

> -
> -	xdir_collect_garbage(&writer->wal_dir, gc_lsn, XDIR_GC_REMOVE_ONE);
> -	notify_gc = true;
> -	goto retry;
> +	static struct cmsg_hop gc_wal_force_route[] = {
> +		{tx_gc_wal_force, NULL}
> +	};
> +	struct gc_force_wal_msg msg;
> +	msg.done = false;
> +	fiber_cond_create(&msg.done_cond);
> +	cmsg_init(&msg.base, gc_wal_force_route);
> +	cpipe_push(&writer->tx_prio_pipe, &msg.base);
> +
> +	while (!msg.done)
> +		fiber_cond_wait(&msg.done_cond);
> +	if (msg.rc == 0)
> +		goto retry;
>  error:
>  	diag_log();
>  	rc = -1;

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH 3/7] Replication: Relay does not rely on xlog boundaries
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 3/7] Replication: Relay does not rely on xlog boundaries Georgy Kirichenko
@ 2019-08-21 11:35   ` Vladimir Davydov
  0 siblings, 0 replies; 18+ messages in thread
From: Vladimir Davydov @ 2019-08-21 11:35 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Aug 13, 2019 at 09:27:41AM +0300, Georgy Kirichenko wrote:
> When relay got an ACK then it matches received vclock against xlog file
> boundaries detected using on_close_log trigger and send a consumer
> advance message. However, for each ACK relay send a status update
> message to the tx cord which could be used for gc purposes.
> This patch removes any knowledge about xlog boundaries from relay
> because there would not any xlog files in case of in-memory replication.
> As gc now tracks all xlog files then it is able to handle garbage files
> using relay status updates.
> 
> Note: after parallel applier there is no more one ACK per transaction
> so it should not be too expensive to advance a consumer on each status
> update. However I think it could be improved, for instance with tracking
> the next wal file vclock.

I don't see a way to preserve the old behavior after switching to
replication from memory so I guess this patch is okay.

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH 4/7] Replication: wal memory buffer
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 4/7] Replication: wal memory buffer Georgy Kirichenko
@ 2019-08-21 11:57   ` Vladimir Davydov
  0 siblings, 0 replies; 18+ messages in thread
From: Vladimir Davydov @ 2019-08-21 11:57 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Aug 13, 2019 at 09:27:42AM +0300, Georgy Kirichenko wrote:
> Introduce a wal memory buffer which contains logged transactions. Wal
> writes all rows into the memory buffer and then flushes new data to disk.
> Wal memory consist of rotated pairs of xrow header array and encoded xrow
> data buffer.
> ---
>  src/box/CMakeLists.txt |   1 +
>  src/box/wal.c          |  38 ++++--
>  src/box/wal_mem.c      | 273 +++++++++++++++++++++++++++++++++++++++++
>  src/box/wal_mem.h      | 166 +++++++++++++++++++++++++
>  src/box/xlog.c         |  77 +++++++++---
>  src/box/xlog.h         |   9 ++
>  6 files changed, 536 insertions(+), 28 deletions(-)
>  create mode 100644 src/box/wal_mem.c
>  create mode 100644 src/box/wal_mem.h
> 
> diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
> index 9bba37bcb..bd31b07df 100644
> --- a/src/box/CMakeLists.txt
> +++ b/src/box/CMakeLists.txt
> @@ -125,6 +125,7 @@ add_library(box STATIC
>      bind.c
>      execute.c
>      wal.c
> +    wal_mem.c
>      call.c
>      merger.c
>      ${lua_sources}
> diff --git a/src/box/wal.c b/src/box/wal.c
> index a09ab7187..6cdb0db15 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -44,6 +44,7 @@
>  #include "coio_task.h"
>  #include "replication.h"
>  #include "gc.h"
> +#include "wal_mem.h"
>  
>  enum {
>  	/**
> @@ -156,6 +157,8 @@ struct wal_writer
>  	 * Used for replication relays.
>  	 */
>  	struct rlist watchers;
> +	/** Wal memory buffer. */
> +	struct wal_mem wal_mem;
>  };
>  
>  enum wal_msg_type {
> @@ -936,6 +939,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>  	int64_t tsn = 0;
>  	/** Assign LSN to all local rows. */
>  	for ( ; row < end; row++) {
> +		(*row)->tm = ev_now(loop());
>  		if ((*row)->replica_id == 0) {
>  			(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
>  				      vclock_get(base, instance_id);
> @@ -1027,25 +1031,37 @@ wal_write_batch(struct wal_writer *writer, struct wal_msg *wal_msg)
>  	int rc;
>  	struct journal_entry *entry;
>  	struct stailq_entry *last_committed = NULL;
> +	wal_mem_svp(&writer->wal_mem, &writer->vclock);
>  	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
>  		wal_assign_lsn(&vclock_diff, &writer->vclock,
>  			       entry->rows, entry->rows + entry->n_rows);
>  		entry->res = vclock_sum(&vclock_diff) +
>  			     vclock_sum(&writer->vclock);
> -		rc = xlog_write_entry(l, entry);
> -		if (rc < 0)
> +		if (wal_mem_write(&writer->wal_mem, entry->rows,
> +				  entry->rows + entry->n_rows) < 0) {
> +			wal_mem_svp_reset(&writer->wal_mem);
>  			goto done;
> -		if (rc > 0) {
> -			writer->checkpoint_wal_size += rc;
> -			last_committed = &entry->fifo;
> -			vclock_merge(&writer->vclock, &vclock_diff);
>  		}
> -		/* rc == 0: the write is buffered in xlog_tx */
>  	}
> -	rc = xlog_flush(l);
> -	if (rc < 0)
> -		goto done;
>  
> +	struct iovec iov[SMALL_OBUF_IOV_MAX];

You implicitly assume that SMALL_OBUF_IOV_MAX should be enough to store
a transaction. This looks error-prone. Please rework the API somehow,
e.g. allocate iov on a region.

> +	int iovcnt;
> +	iovcnt = wal_mem_svp_data(&writer->wal_mem, iov);

For obuf, ibuf, region, one can create as many svp as he likes while
here it isn't a case. I think we should either make svp self-sufficient
or rename those methods to something like _tx_begin/commit.

> +	xlog_tx_begin(l);
> +	if (xlog_write_iov(l, iov, iovcnt,
> +			   wal_mem_svp_row_count(&writer->wal_mem)) < 0) {
> +		xlog_tx_rollback(l);
> +		wal_mem_svp_reset(&writer->wal_mem);
> +		goto done;
> +	}
> +	rc = xlog_tx_commit(l);

Before this patch we wrapped every tx transaction in
xlog_tx_begin/commit. Now we wrap the whole wal transaction
instead. Why change that?

> +	if (rc == 0)
> +		/* Data is buffered but not yet flushed. */
> +		rc = xlog_flush(l);
> +	if (rc < 0) {
> +		wal_mem_svp_reset(&writer->wal_mem);
> +		goto done;
> +	}
>  	writer->checkpoint_wal_size += rc;
>  	last_committed = stailq_last(&wal_msg->commit);
>  	vclock_merge(&writer->vclock, &vclock_diff);
> @@ -1147,6 +1163,7 @@ wal_cord_f(va_list ap)
>  {
>  	(void) ap;
>  	struct wal_writer *writer = &wal_writer_singleton;
> +	wal_mem_create(&writer->wal_mem);
>  
>  	/** Initialize eio in this thread */
>  	coio_enable();
> @@ -1195,6 +1212,7 @@ wal_cord_f(va_list ap)
>  		xlog_close(&vy_log_writer.xlog, false);
>  
>  	cpipe_destroy(&writer->tx_prio_pipe);
> +	wal_mem_destroy(&writer->wal_mem);
>  	return 0;
>  }
>  
> diff --git a/src/box/wal_mem.c b/src/box/wal_mem.c
> new file mode 100644
> index 000000000..fdfc6f93d
> --- /dev/null
> +++ b/src/box/wal_mem.c
> @@ -0,0 +1,273 @@
> +/*
> + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +
> +#include "wal_mem.h"
> +
> +#include "fiber.h"
> +#include "errinj.h"
> +
> +enum {
> +	/* Initial size for rows storage. */
> +	WAL_MEM_BUF_INITIAL_ROW_COUNT = 4096,
> +	/* Initial size for data storage. */
> +	WAL_MEM_BUF_INITIAL_DATA_SIZE = 65536,
> +	/* How many rows we will place in one buffer. */
> +	WAL_MEM_BUF_ROWS_LIMIT = 8192,
> +	/* How many data we will place in one buffer. */
> +	WAL_MEM_BUF_DATA_LIMIT = 1 << 19,
> +};
> +
> +void
> +wal_mem_create(struct wal_mem *wal_mem)

I think we should name this object in such a way that it doesn't imply
WAL. AFAICS it's just a ring buffer for storing rows so we should name
it appropriately. May be, we should allow to customize all those options
while creating a new object. Also, it would be nice to have a unit test
for this object.

> diff --git a/src/box/wal_mem.h b/src/box/wal_mem.h
> new file mode 100644
> index 000000000..d26d00157
> --- /dev/null
> +++ b/src/box/wal_mem.h
> @@ -0,0 +1,166 @@
> +#ifndef TARANTOOL_WAL_MEM_H_INCLUDED
> +#define TARANTOOL_WAL_MEM_H_INCLUDED
> +/*
> + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +#include <stdint.h>
> +
> +#include "small/ibuf.h"
> +#include "small/obuf.h"
> +#include "xrow.h"
> +#include "vclock.h"
> +
> +enum {
> +	/*
> +	 * Wal memory object contains some count of rotating data buffers.
> +	 * Estimated decrease in amount of stored row is about
> +	 * 1/(COUNT OF BUFFERS). However the bigger value makes rotation
> +	 * more frequent, the decrease would be smoother and size of
> +	 * a wal memory more stable.
> +	 */
> +	WAL_MEM_BUF_COUNT = 8,
> +};
> +
> +/*
> + * A wal memory row descriptor which contains decoded xrow header and
> + * encoded data pointer and size.
> + */
> +struct wal_mem_buf_row {
> +	/* Decoded xrow header. */
> +	struct xrow_header xrow;
> +	/* Pointer to the xrow encoded raw data. */
> +	void *data;
> +	/* xrow raw data size. */
> +	size_t size;
> +};
> +
> +/*
> + * Wal memory data buffer which contains
> + *  a vclock just before the first contained row,
> + *  an ibuf with row descriptors
> + *  an obuf with encoded data
> + */
> +struct wal_mem_buf {
> +	/* vclock just before the first row. */
> +	struct vclock vclock;
> +	/* A row descriptor array. */
> +	struct ibuf rows;
> +	/* Data storage for encoded row data. */
> +	struct obuf data;
> +};
> +
> +/*
> + * Wal memory contains WAL_MEM_BUF_COUNT wal memory buffers which are
> + * organized in a ring. In order to track Wal memory tracks the first and
> + * the last used buffers indexes (generation) and those indexes are not wrapped
> + * around the ring. Each rotation increases the last buffer index and
> + * each buffer discard increases the first buffer index. To evaluate effective
> + * index in an wal memory array a modulo operation (or mask) should be used.
> + */
> +struct wal_mem {
> +	/* An index of the first used buffer. */
> +	uint64_t first_buf_index;
> +	/* An index of the last used buffer. */
> +	uint64_t last_buf_index;
> +	/* A memory buffer array. */
> +	struct wal_mem_buf buf[WAL_MEM_BUF_COUNT];
> +	/* The first row index written in the current transaction. */
> +	uint32_t tx_first_row_index;
> +	/* The first row data svp written in the current transaction. */
> +	struct obuf_svp tx_first_row_svp;
> +};
> +
> +/* Create a wal memory. */
> +void
> +wal_mem_create(struct wal_mem *wal_mem);
> +
> +/* Destroy wal memory structure. */
> +void
> +wal_mem_destroy(struct wal_mem *wal_mem);
> +
> +/*
> + * Rotate a wal memory if required and save the current wal memory write
> + * position.
> + */
> +void
> +wal_mem_svp(struct wal_mem *wal_mem, struct vclock *vclock);
> +
> +/* Retrieve data after last svp. */
> +int
> +wal_mem_svp_data(struct wal_mem *wal_mem, struct iovec *iovec);
> +
> +/* Truncate all the data written after the last svp. */
> +void
> +wal_mem_svp_reset(struct wal_mem *wal_mem);
> +
> +/* Count of rows written since the last svp. */
> +static inline int
> +wal_mem_svp_row_count(struct wal_mem *wal_mem)
> +{
> +	struct wal_mem_buf *mem_buf = wal_mem->buf +
> +				      wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
> +	return ibuf_used(&mem_buf->rows) / sizeof(struct wal_mem_buf_row) -
> +	       wal_mem->tx_first_row_index;
> +}
> +
> +/*
> + * Append xrow array to a wal memory. The array is placed into one
> + * wal memory buffer and each row takes a continuous space in a data buffer.
> + * continuously.
> + * Return
> + *  0 for Ok
> + *  -1 in case of error
> + */
> +int
> +wal_mem_write(struct wal_mem *wal_mem, struct xrow_header **begin,
> +	      struct xrow_header **end);
> +
> +/* Wal memory cursor to track a position in a wal memory. */
> +struct wal_mem_cursor {
> +	/* Current memory buffer index. */
> +	uint64_t buf_index;
> +	/* Current row index. */
> +	uint32_t row_index;
> +};
> +
> +/* Create a wal memory cursor from the wal memory current position. */
> +int
> +wal_mem_cursor_create(struct wal_mem *wal_mem,
> +		      struct wal_mem_cursor *wal_mem_cursor,
> +		      struct vclock *vclock);
> +
> +int
> +wal_mem_cursor_next(struct wal_mem *wal_mem,
> +		    struct wal_mem_cursor *wal_mem_cursor,
> +		    struct xrow_header **row,
> +		    void **data,
> +		    size_t *size);

What is returned on EOF? What happens if the buffer is rotated while
there's a cursor open for it? A comment would be helpful here.

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH 5/7] Replication: in memory replication
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 5/7] Replication: in memory replication Georgy Kirichenko
@ 2019-08-21 13:52   ` Vladimir Davydov
  0 siblings, 0 replies; 18+ messages in thread
From: Vladimir Davydov @ 2019-08-21 13:52 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Aug 13, 2019 at 09:27:43AM +0300, Georgy Kirichenko wrote:
> diff --git a/src/box/gc.h b/src/box/gc.h
> index 9b38a0c06..f28b716b5 100644
> --- a/src/box/gc.h
> +++ b/src/box/gc.h
> @@ -125,6 +125,8 @@ struct gc_state {
>  	/** True if log is opened. */
>  	bool log_opened;
>  	/** Registered consumers, linked by gc_consumer::node. */
> +	/** xdir to track wal files. */
> +	struct xdir xdir;

AFAICS this xdir is only created and destroyed - it isn't used anywhere.

>  	gc_tree_t consumers;
>  	/** Fiber responsible for periodic checkpointing. */
>  	struct fiber *checkpoint_fiber;
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index a1b841291..05fc0f691 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -116,6 +114,9 @@ struct relay {
>  	/** Relay sync state. */
>  	enum relay_state state;
>  
> +	struct vclock relay_vclock;
> +	char *wal_dir;
> +

Comments, please. It would also be nice to update a comment to relay->r
to clarify when it can be NULL.

>  	struct {
>  		/* Align to prevent false-sharing with tx thread */
>  		alignas(CACHELINE_SIZE)
> @@ -434,6 +426,27 @@ relay_send_heartbeat(struct relay *relay)
>  	}
>  }
>  
> +static int
> +relay_send_cb(struct xrow_header *row, void *data)
> +{
> +	try {
> +		struct relay *relay = (struct relay *)data;
> +		relay_send_row(&relay->stream, row);
> +		return 0;
> +	} catch (Exception *e) {
> +		return -1;
> +	}
> +}
> +
> +static void
> +relay_endpoint_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
> +{
> +	(void) loop;
> +	(void) events;
> +	struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data;
> +	cbus_process(endpoint);
> +}

I guess we could define this helper somewhere in cbus internals, just
like fiber_schedule_cb.

> diff --git a/src/box/wal.c b/src/box/wal.c
> index 6cdb0db15..0457f3d46 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -1163,7 +1166,9 @@ wal_cord_f(va_list ap)
>  {
>  	(void) ap;
>  	struct wal_writer *writer = &wal_writer_singleton;
> +	fiber_cond_create(&writer->wal_mem_cond);
>  	wal_mem_create(&writer->wal_mem);
> +	wal_mem_svp(&writer->wal_mem, &writer->vclock);

Looks liks this belongs to the previous patch.

>  
>  	/** Initialize eio in this thread */
>  	coio_enable();
> @@ -1494,3 +1500,149 @@ wal_atfork()
>  	if (xlog_is_open(&vy_log_writer.xlog))
>  		xlog_atfork(&vy_log_writer.xlog);
>  }
> +
> +struct wal_relay_msg {
> +	struct cmsg base;
> +	struct cpipe wal_pipe;
> +	struct cpipe relay_pipe;
> +
> +	struct vclock *vclock;
> +	wal_relay_cb on_wal_relay;
> +	void *cb_data;
> +	struct fiber *fiber;
> +	struct cmsg cancel_msg;
> +	struct fiber_cond done_cond;
> +	bool done;
> +	int rc;
> +	struct diag diag;
> +};

Comments, comments...

> +
> +static void
> +wal_relay_done(struct cmsg *base)
> +{
> +	struct wal_relay_msg *msg =
> +		container_of(base, struct wal_relay_msg, base);
> +	msg->done = true;
> +	fiber_cond_signal(&msg->done_cond);
> +}
> +
> +static int
> +wal_relay_f(va_list ap)
> +{
> +	struct wal_writer *writer = &wal_writer_singleton;
> +	struct wal_relay_msg *msg = va_arg(ap, struct wal_relay_msg *);
> +	struct vclock *vclock = msg->vclock;
> +	wal_relay_cb on_wal_relay = msg->on_wal_relay;
> +	void *cb_data = msg->cb_data;
> +
> +	double last_row_time = ev_monotonic_now(loop());
> +
> +	struct wal_mem_cursor cursor;
> +	if (wal_mem_cursor_create(&writer->wal_mem, &cursor, vclock) != 0)
> +		goto done;
> +	while (!fiber_is_cancelled()) {
> +		struct xrow_header *row;
> +		void *data;
> +		size_t size;
> +		int rc = wal_mem_cursor_next(&writer->wal_mem, &cursor,
> +					     &row, &data, &size);
> +		if (rc < 0) {
> +			/* Outdated cursor. */
> +			break;
> +		}
> +		if (rc == 0 && vclock_get(vclock, row->replica_id) >= row->lsn)
> +			continue;
> +		if (rc > 0) {
> +			double timeout = replication_timeout;
> +			struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> +						    ERRINJ_DOUBLE);
> +			if (inj != NULL && inj->dparam != 0)
> +				timeout = inj->dparam;
> +
> +			/*
> +			 * Nothing to send so wait for the next row
> +			 * and send a hearth beat if timeout exceeded.
> +			 */
> +			fiber_cond_wait_deadline(&writer->wal_mem_cond,
> +						 last_row_time + timeout);
> +			if (fiber_is_cancelled())
> +				break;
> +			if (ev_monotonic_now(loop()) - last_row_time >
> +			    timeout) {
> +				struct xrow_header hearth_beat;
> +				xrow_encode_timestamp(&hearth_beat, instance_id,

s/hearth/heart

> +						      ev_now(loop()));
> +				row = &hearth_beat;
> +			} else
> +				continue;
> +		}
> +		last_row_time = ev_monotonic_now(loop());
> +		if (on_wal_relay(row, cb_data) != 0) {
> +			diag_move(&fiber()->diag, &msg->diag);
> +			break;
> +		}
> +	}
> +	static struct cmsg_hop done_route[] = {
> +		{wal_relay_done, NULL}
> +	};
> +done:
> +	cmsg_init(&msg->base, done_route);
> +	cpipe_push(&msg->relay_pipe, &msg->base);
> +	msg->fiber = NULL;
> +	return 0;
> +}
> diff --git a/src/box/wal.h b/src/box/wal.h
> index 1a7156d97..bd298cebe 100644
> --- a/src/box/wal.h
> +++ b/src/box/wal.h
> @@ -235,6 +235,12 @@ wal_write_vy_log(struct journal_entry *req);
>  void
>  wal_rotate_vy_log();
>  
> +typedef int (*wal_relay_cb)(struct xrow_header *header, void *data);
> +
> +int
> +wal_relay(struct vclock *vclock, wal_relay_cb on_wal_relay, void *cb_data,
> +	  const char *endpoint_name);

Comments, comments...

> +
>  #if defined(__cplusplus)
>  } /* extern "C" */
>  #endif /* defined(__cplusplus) */
> diff --git a/src/box/wal_mem.c b/src/box/wal_mem.c
> index fdfc6f93d..b01604d55 100644
> --- a/src/box/wal_mem.c
> +++ b/src/box/wal_mem.c
> @@ -247,14 +247,13 @@ wal_mem_cursor_next(struct wal_mem *wal_mem,
>  	}
>  
>  	struct wal_mem_buf *mem_buf;
> -	size_t last_row_index;
>  
>  next_buffer:
>  	mem_buf = wal_mem->buf +
> -		  wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
> -	last_row_index = ibuf_used(&mem_buf->rows) /
> -			 sizeof(struct wal_mem_buf_row);
> -	if (last_row_index == wal_mem_cursor->row_index) {
> +		  wal_mem_cursor->buf_index % WAL_MEM_BUF_COUNT;
> +	size_t buf_row_count = ibuf_used(&mem_buf->rows) /
> +			       sizeof(struct wal_mem_buf_row);
> +	if (buf_row_count == wal_mem_cursor->row_index) {
>  		/* No more rows in the current buffer. */
>  		if (wal_mem->last_buf_index == wal_mem_cursor->buf_index)
>  			/* No more rows in the memory. */
> @@ -269,5 +268,6 @@ next_buffer:
>  	*row = &buf_row->xrow;
>  	*data = buf_row->data;
>  	*size = buf_row->size;
> +	++wal_mem_cursor->row_index;
>  	return 0;
>  }

Again, this seems to belong to the previous patch.

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH 6/7] Refactoring: remove wal_watcher routines
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 6/7] Refactoring: remove wal_watcher routines Georgy Kirichenko
@ 2019-08-21 13:52   ` Vladimir Davydov
  0 siblings, 0 replies; 18+ messages in thread
From: Vladimir Davydov @ 2019-08-21 13:52 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Aug 13, 2019 at 09:27:44AM +0300, Georgy Kirichenko wrote:
> As relay uses wal memory replication there is no need in this facility
> yet.
> ---
>  src/box/relay.cc |   2 -
>  src/box/wal.c    | 127 -----------------------------------------------
>  src/box/wal.h    |  68 -------------------------
>  3 files changed, 197 deletions(-)

Ack

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH 7/7] Refactoring: get rid of on_close_log
  2019-08-13  6:27 ` [tarantool-patches] [PATCH 7/7] Refactoring: get rid of on_close_log Georgy Kirichenko
@ 2019-08-21 13:52   ` Vladimir Davydov
  0 siblings, 0 replies; 18+ messages in thread
From: Vladimir Davydov @ 2019-08-21 13:52 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Aug 13, 2019 at 09:27:45AM +0300, Georgy Kirichenko wrote:
> We do not use this one
> ---
>  src/box/recovery.cc | 3 ---
>  src/box/recovery.h  | 2 --
>  2 files changed, 5 deletions(-)

Ack

^ permalink raw reply	[flat|nested] 18+ messages in thread

end of thread, other threads:[~2019-08-21 13:52 UTC | newest]

Thread overview: 18+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-08-13  6:27 [tarantool-patches] [PATCH 0/7] Replication: In-memory replication Georgy Kirichenko
2019-08-13  6:27 ` [tarantool-patches] [PATCH 1/7] Refactoring: wal writer fiber and queue Georgy Kirichenko
2019-08-16 13:53   ` [tarantool-patches] " Konstantin Osipov
2019-08-20 10:57     ` Георгий Кириченко
2019-08-21 10:18   ` [tarantool-patches] " Vladimir Davydov
2019-08-13  6:27 ` [tarantool-patches] [PATCH 2/7] Refactoring: Track wal files using gc state Georgy Kirichenko
2019-08-21 10:44   ` Vladimir Davydov
2019-08-13  6:27 ` [tarantool-patches] [PATCH 3/7] Replication: Relay does not rely on xlog boundaries Georgy Kirichenko
2019-08-21 11:35   ` Vladimir Davydov
2019-08-13  6:27 ` [tarantool-patches] [PATCH 4/7] Replication: wal memory buffer Georgy Kirichenko
2019-08-21 11:57   ` Vladimir Davydov
2019-08-13  6:27 ` [tarantool-patches] [PATCH 5/7] Replication: in memory replication Georgy Kirichenko
2019-08-21 13:52   ` Vladimir Davydov
2019-08-13  6:27 ` [tarantool-patches] [PATCH 6/7] Refactoring: remove wal_watcher routines Georgy Kirichenko
2019-08-21 13:52   ` Vladimir Davydov
2019-08-13  6:27 ` [tarantool-patches] [PATCH 7/7] Refactoring: get rid of on_close_log Georgy Kirichenko
2019-08-21 13:52   ` Vladimir Davydov
2019-08-16 13:47 ` [tarantool-patches] Re: [PATCH 0/7] Replication: In-memory replication Konstantin Osipov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox