[tarantool-patches] [PATCH 5/7] Replication: in memory replication

Georgy Kirichenko georgy at tarantool.org
Tue Aug 13 09:27:43 MSK 2019


Relay uses both xlog files and wal memory relaying to feed a replica
with transaction data. New workflow is to read xlog files until the last
one and then switch to wal memory. If relay is out of wal in-memory data
then relay returns to a file mode. A heartbeat is sent only while
in-memory mode from wal thread because there are always data in files.

Closes #3794
---
 src/box/gc.c      |   3 +
 src/box/gc.h      |   2 +
 src/box/relay.cc  | 161 +++++++++++++++++++++++-----------------------
 src/box/wal.c     | 154 +++++++++++++++++++++++++++++++++++++++++++-
 src/box/wal.h     |   6 ++
 src/box/wal_mem.c |  10 +--
 6 files changed, 249 insertions(+), 87 deletions(-)

diff --git a/src/box/gc.c b/src/box/gc.c
index 9771e407a..c36c2ace4 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -126,6 +126,8 @@ gc_init(const char *wal_dir_name)
 	xdir_scan(&gc.wal_dir);
 	gc.log_opened = false;
 	gc_tree_new(&gc.consumers);
+	xdir_create(&gc.xdir, wal_dir_name, XLOG, &INSTANCE_UUID,
+		    &xlog_opts_default);
 	fiber_cond_create(&gc.cleanup_cond);
 	checkpoint_schedule_cfg(&gc.checkpoint_schedule, 0, 0);
 
@@ -166,6 +168,7 @@ gc_free(void)
 		gc_consumer_delete(consumer);
 		consumer = next;
 	}
+	xdir_destroy(&gc.xdir);
 }
 
 /**
diff --git a/src/box/gc.h b/src/box/gc.h
index 9b38a0c06..f28b716b5 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -125,6 +125,8 @@ struct gc_state {
 	/** True if log is opened. */
 	bool log_opened;
 	/** Registered consumers, linked by gc_consumer::node. */
+	/** xdir to track wal files. */
+	struct xdir xdir;
 	gc_tree_t consumers;
 	/** Fiber responsible for periodic checkpointing. */
 	struct fiber *checkpoint_fiber;
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a1b841291..05fc0f691 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -84,8 +84,6 @@ struct relay {
 	struct replica *replica;
 	/** WAL event watcher. */
 	struct wal_watcher wal_watcher;
-	/** Relay reader cond. */
-	struct fiber_cond reader_cond;
 	/** Relay diagnostics. */
 	struct diag diag;
 	/** Vclock recieved from replica. */
@@ -116,6 +114,9 @@ struct relay {
 	/** Relay sync state. */
 	enum relay_state state;
 
+	struct vclock relay_vclock;
+	char *wal_dir;
+
 	struct {
 		/* Align to prevent false-sharing with tx thread */
 		alignas(CACHELINE_SIZE)
@@ -165,7 +166,6 @@ relay_new(struct replica *replica)
 		return NULL;
 	}
 	relay->replica = replica;
-	fiber_cond_create(&relay->reader_cond);
 	diag_create(&relay->diag);
 	stailq_create(&relay->pending_gc);
 	relay->state = RELAY_OFF;
@@ -215,7 +215,8 @@ relay_exit(struct relay *relay)
 	 * cursor, which must be closed in the same thread
 	 * that opened it (it uses cord's slab allocator).
 	 */
-	recovery_delete(relay->r);
+	if (relay->r != NULL)
+		recovery_delete(relay->r);
 	relay->r = NULL;
 }
 
@@ -239,7 +240,6 @@ relay_delete(struct relay *relay)
 {
 	if (relay->state == RELAY_FOLLOW)
 		relay_stop(relay);
-	fiber_cond_destroy(&relay->reader_cond);
 	diag_destroy(&relay->diag);
 	TRASH(relay);
 	free(relay);
@@ -361,31 +361,6 @@ relay_set_error(struct relay *relay, struct error *e)
 		diag_add_error(&relay->diag, e);
 }
 
-static void
-relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
-{
-	struct relay *relay = container_of(watcher, struct relay, wal_watcher);
-	if (fiber_is_cancelled()) {
-		/*
-		 * The relay is exiting. Rescanning the WAL at this
-		 * point would be pointless and even dangerous,
-		 * because the relay could have written a packet
-		 * fragment to the socket before being cancelled
-		 * so that writing another row to the socket would
-		 * lead to corrupted replication stream and, as
-		 * a result, permanent replication breakdown.
-		 */
-		return;
-	}
-	try {
-		recover_remaining_wals(relay->r, &relay->stream, NULL,
-				       (events & WAL_EVENT_ROTATE) != 0);
-	} catch (Exception *e) {
-		relay_set_error(relay, e);
-		fiber_cancel(fiber());
-	}
-}
-
 /*
  * Relay reader fiber function.
  * Read xrow encoded vclocks sent by the replica.
@@ -408,7 +383,24 @@ relay_reader_f(va_list ap)
 			/* vclock is followed while decoding, zeroing it. */
 			vclock_create(&relay->recv_vclock);
 			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
-			fiber_cond_signal(&relay->reader_cond);
+			if (relay->status_msg.msg.route != NULL)
+				continue;
+			struct vclock *send_vclock;
+			if (relay->version_id < version_id(1, 7, 4))
+				send_vclock = &relay->r->vclock;
+			else
+				send_vclock = &relay->recv_vclock;
+			if (vclock_sum(&relay->status_msg.vclock) ==
+			    vclock_sum(send_vclock))
+				continue;
+			static const struct cmsg_hop route[] = {
+				{tx_status_update, NULL}
+			};
+			cmsg_init(&relay->status_msg.msg, route);
+			vclock_copy(&relay->status_msg.vclock,
+				    send_vclock);
+			relay->status_msg.relay = relay;
+			cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
 		}
 	} catch (Exception *e) {
 		relay_set_error(relay, e);
@@ -434,6 +426,27 @@ relay_send_heartbeat(struct relay *relay)
 	}
 }
 
+static int
+relay_send_cb(struct xrow_header *row, void *data)
+{
+	try {
+		struct relay *relay = (struct relay *)data;
+		relay_send_row(&relay->stream, row);
+		return 0;
+	} catch (Exception *e) {
+		return -1;
+	}
+}
+
+static void
+relay_endpoint_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
+{
+	(void) loop;
+	(void) events;
+	struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data;
+	cbus_process(endpoint);
+}
+
 /**
  * A libev callback invoked when a relay client socket is ready
  * for read. This currently only happens when the client closes
@@ -443,21 +456,16 @@ static int
 relay_subscribe_f(va_list ap)
 {
 	struct relay *relay = va_arg(ap, struct relay *);
-	struct recovery *r = relay->r;
 
 	coio_enable();
 	relay_set_cord_name(relay->io.fd);
 
 	/* Create cpipe to tx for propagating vclock. */
 	cbus_endpoint_create(&relay->endpoint, tt_sprintf("relay_%p", relay),
-			     fiber_schedule_cb, fiber());
+			     relay_endpoint_cb, &relay->endpoint);
 	cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
 		  &relay->relay_pipe, NULL, NULL, cbus_process);
 
-	/* Setup WAL watcher for sending new rows to the replica. */
-	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
-			relay_process_wal_event, cbus_process);
-
 	/* Start fiber for receiving replica acks. */
 	char name[FIBER_NAME_MAX];
 	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
@@ -473,50 +481,39 @@ relay_subscribe_f(va_list ap)
 	 */
 	relay_send_heartbeat(relay);
 
-	/*
-	 * Run the event loop until the connection is broken
-	 * or an error occurs.
-	 */
 	while (!fiber_is_cancelled()) {
-		double timeout = replication_timeout;
-		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
-					    ERRINJ_DOUBLE);
-		if (inj != NULL && inj->dparam != 0)
-			timeout = inj->dparam;
-
-		fiber_cond_wait_deadline(&relay->reader_cond,
-					 relay->last_row_time + timeout);
-
 		/*
-		 * The fiber can be woken by IO cancel, by a timeout of
-		 * status messaging or by an acknowledge to status message.
-		 * Handle cbus messages first.
+		 * Start relaying data. First we recover all existing wal
+		 * files then try to continue data streaming from wal memory.
+		 *
+		 * NOTE: it looks more efficient if we try to relay from wal
+		 * first but it breaks tests behavior working around collected
+		 * logs. As it has only little impact we could fix it in
+		 * future.
 		 */
-		cbus_process(&relay->endpoint);
-		/* Check for a heartbeat timeout. */
-		if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
-			relay_send_heartbeat(relay);
+		try {
+			relay->r = recovery_new(relay->wal_dir, false,
+					        &relay->relay_vclock);
+			auto relay_guard = make_scoped_guard([&] {
+				recovery_delete(relay->r);
+				relay->r = NULL;
+			});
+			recover_remaining_wals(relay->r, &relay->stream,
+					       NULL, true);
+		} catch (Exception *e) {
+			e->log();
+			relay_set_error(relay, e);
+			break;
+		}
 		/*
-		 * Check that the vclock has been updated and the previous
-		 * status message is delivered
+		 * All known 'file' data were sent it is high time to
+		 * switch to wal memory relaying.
 		 */
-		if (relay->status_msg.msg.route != NULL)
-			continue;
-		struct vclock *send_vclock;
-		if (relay->version_id < version_id(1, 7, 4))
-			send_vclock = &r->vclock;
-		else
-			send_vclock = &relay->recv_vclock;
-		if (vclock_sum(&relay->status_msg.vclock) ==
-		    vclock_sum(send_vclock))
-			continue;
-		static const struct cmsg_hop route[] = {
-			{tx_status_update, NULL}
+		if (wal_relay(&relay->relay_vclock, relay_send_cb, relay,
+			  tt_sprintf("relay_%p", relay)) != 0) {
+			relay_set_error(relay, diag_last_error(&fiber()->diag));
+			break;
 		};
-		cmsg_init(&relay->status_msg.msg, route);
-		vclock_copy(&relay->status_msg.vclock, send_vclock);
-		relay->status_msg.relay = relay;
-		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
 	}
 
 	/*
@@ -528,9 +525,6 @@ relay_subscribe_f(va_list ap)
 	diag_log();
 	say_crit("exiting the relay loop");
 
-	/* Clear WAL watcher. */
-	wal_clear_watcher(&relay->wal_watcher, cbus_process);
-
 	/* Join ack reader fiber. */
 	fiber_cancel(reader);
 	fiber_join(reader);
@@ -557,6 +551,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	 * unless it has already been registered by initial
 	 * join.
 	 */
+	vclock_copy(&relay->relay_vclock, replica_clock);
 	if (replica->gc == NULL) {
 		replica->gc = gc_consumer_register(replica_clock, "replica %s",
 						   tt_uuid_str(&replica->uuid));
@@ -571,15 +566,16 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	});
 
 	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
-	relay->r = recovery_new(cfg_gets("wal_dir"), false,
-			        replica_clock);
+	relay->wal_dir = strdup(cfg_gets("wal_dir"));
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
+	relay->r = NULL;
 
 	int rc = cord_costart(&relay->cord, "subscribe",
 			      relay_subscribe_f, relay);
 	if (rc == 0)
 		rc = cord_cojoin(&relay->cord);
+	free(relay->wal_dir);
 	if (rc != 0)
 		diag_raise();
 }
@@ -616,7 +612,10 @@ static void
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
-	assert(iproto_type_is_dml(packet->type));
+	if (packet->type != IPROTO_OK) {
+		assert(iproto_type_is_dml(packet->type));
+		vclock_follow_xrow(&relay->relay_vclock, packet);
+	}
 	/*
 	 * Transform replica local requests to IPROTO_NOP so as to
 	 * promote vclock on the replica without actually modifying
diff --git a/src/box/wal.c b/src/box/wal.c
index 6cdb0db15..0457f3d46 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -159,6 +159,8 @@ struct wal_writer
 	struct rlist watchers;
 	/** Wal memory buffer. */
 	struct wal_mem wal_mem;
+	/** Wal memory condition. */
+	struct fiber_cond wal_mem_cond;
 };
 
 enum wal_msg_type {
@@ -1065,6 +1067,7 @@ wal_write_batch(struct wal_writer *writer, struct wal_msg *wal_msg)
 	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
 	vclock_merge(&writer->vclock, &vclock_diff);
+	fiber_cond_broadcast(&writer->wal_mem_cond);
 
 	/*
 	 * Notify TX if the checkpoint threshold has been exceeded.
@@ -1163,7 +1166,9 @@ wal_cord_f(va_list ap)
 {
 	(void) ap;
 	struct wal_writer *writer = &wal_writer_singleton;
+	fiber_cond_create(&writer->wal_mem_cond);
 	wal_mem_create(&writer->wal_mem);
+	wal_mem_svp(&writer->wal_mem, &writer->vclock);
 
 	/** Initialize eio in this thread */
 	coio_enable();
@@ -1457,7 +1462,8 @@ wal_set_watcher(struct wal_watcher *watcher, const char *name,
 		{ wal_watcher_notify_perform, &watcher->wal_pipe };
 	watcher->route[1] = (struct cmsg_hop)
 		{ wal_watcher_notify_complete, NULL };
-	cbus_pair("wal", name, &watcher->wal_pipe, &watcher->watcher_pipe,
+
+	  cbus_pair("wal", name, &watcher->wal_pipe, &watcher->watcher_pipe,
 		  wal_watcher_attach, watcher, process_cb);
 }
 
@@ -1494,3 +1500,149 @@ wal_atfork()
 	if (xlog_is_open(&vy_log_writer.xlog))
 		xlog_atfork(&vy_log_writer.xlog);
 }
+
+struct wal_relay_msg {
+	struct cmsg base;
+	struct cpipe wal_pipe;
+	struct cpipe relay_pipe;
+
+	struct vclock *vclock;
+	wal_relay_cb on_wal_relay;
+	void *cb_data;
+	struct fiber *fiber;
+	struct cmsg cancel_msg;
+	struct fiber_cond done_cond;
+	bool done;
+	int rc;
+	struct diag diag;
+};
+
+static void
+wal_relay_done(struct cmsg *base)
+{
+	struct wal_relay_msg *msg =
+		container_of(base, struct wal_relay_msg, base);
+	msg->done = true;
+	fiber_cond_signal(&msg->done_cond);
+}
+
+static int
+wal_relay_f(va_list ap)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct wal_relay_msg *msg = va_arg(ap, struct wal_relay_msg *);
+	struct vclock *vclock = msg->vclock;
+	wal_relay_cb on_wal_relay = msg->on_wal_relay;
+	void *cb_data = msg->cb_data;
+
+	double last_row_time = ev_monotonic_now(loop());
+
+	struct wal_mem_cursor cursor;
+	if (wal_mem_cursor_create(&writer->wal_mem, &cursor, vclock) != 0)
+		goto done;
+	while (!fiber_is_cancelled()) {
+		struct xrow_header *row;
+		void *data;
+		size_t size;
+		int rc = wal_mem_cursor_next(&writer->wal_mem, &cursor,
+					     &row, &data, &size);
+		if (rc < 0) {
+			/* Outdated cursor. */
+			break;
+		}
+		if (rc == 0 && vclock_get(vclock, row->replica_id) >= row->lsn)
+			continue;
+		if (rc > 0) {
+			double timeout = replication_timeout;
+			struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
+						    ERRINJ_DOUBLE);
+			if (inj != NULL && inj->dparam != 0)
+				timeout = inj->dparam;
+
+			/*
+			 * Nothing to send so wait for the next row
+			 * and send a hearth beat if timeout exceeded.
+			 */
+			fiber_cond_wait_deadline(&writer->wal_mem_cond,
+						 last_row_time + timeout);
+			if (fiber_is_cancelled())
+				break;
+			if (ev_monotonic_now(loop()) - last_row_time >
+			    timeout) {
+				struct xrow_header hearth_beat;
+				xrow_encode_timestamp(&hearth_beat, instance_id,
+						      ev_now(loop()));
+				row = &hearth_beat;
+			} else
+				continue;
+		}
+		last_row_time = ev_monotonic_now(loop());
+		if (on_wal_relay(row, cb_data) != 0) {
+			diag_move(&fiber()->diag, &msg->diag);
+			break;
+		}
+	}
+	static struct cmsg_hop done_route[] = {
+		{wal_relay_done, NULL}
+	};
+done:
+	cmsg_init(&msg->base, done_route);
+	cpipe_push(&msg->relay_pipe, &msg->base);
+	msg->fiber = NULL;
+	return 0;
+}
+
+static void
+wal_relay_attach(void *data)
+{
+	struct wal_relay_msg *msg = (struct wal_relay_msg *)data;
+	msg->fiber = fiber_new("wal relay fiber", wal_relay_f);
+	fiber_start(msg->fiber, msg);
+}
+
+static void
+wal_relay_cancel(struct cmsg *base)
+{
+	struct wal_relay_msg *msg = container_of(base, struct wal_relay_msg,
+						 cancel_msg);
+	if (msg->fiber != NULL)
+		fiber_cancel(msg->fiber);
+}
+
+int
+wal_relay(struct vclock *vclock, wal_relay_cb on_wal_relay, void *cb_data,
+	  const char *endpoint_name)
+{
+	struct wal_relay_msg wal_relay_msg;
+	wal_relay_msg.vclock = vclock;
+	wal_relay_msg.on_wal_relay = on_wal_relay;
+	wal_relay_msg.cb_data = cb_data;
+	diag_create(&wal_relay_msg.diag);
+	wal_relay_msg.cancel_msg.route = NULL;
+
+	fiber_cond_create(&wal_relay_msg.done_cond);
+	wal_relay_msg.done = false;
+
+	cbus_pair("wal", endpoint_name, &wal_relay_msg.wal_pipe,
+		  &wal_relay_msg.relay_pipe,
+		  wal_relay_attach, &wal_relay_msg, cbus_process);
+
+	while (!wal_relay_msg.done) {
+		if (fiber_is_cancelled() &&
+		    wal_relay_msg.cancel_msg.route == NULL) {
+			static struct cmsg_hop cancel_route[]= {
+				{wal_relay_cancel, NULL}};
+			cmsg_init(&wal_relay_msg.cancel_msg, cancel_route);
+			cpipe_push(&wal_relay_msg.wal_pipe, &wal_relay_msg.cancel_msg);
+		}
+		fiber_cond_wait(&wal_relay_msg.done_cond);
+	}
+
+	cbus_unpair(&wal_relay_msg.wal_pipe, &wal_relay_msg.relay_pipe,
+		    NULL, NULL, cbus_process);
+	if (!diag_is_empty(&wal_relay_msg.diag)) {
+		diag_move(&wal_relay_msg.diag, &fiber()->diag);
+		return -1;
+	}
+	return 0;
+}
diff --git a/src/box/wal.h b/src/box/wal.h
index 1a7156d97..bd298cebe 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -235,6 +235,12 @@ wal_write_vy_log(struct journal_entry *req);
 void
 wal_rotate_vy_log();
 
+typedef int (*wal_relay_cb)(struct xrow_header *header, void *data);
+
+int
+wal_relay(struct vclock *vclock, wal_relay_cb on_wal_relay, void *cb_data,
+	  const char *endpoint_name);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/wal_mem.c b/src/box/wal_mem.c
index fdfc6f93d..b01604d55 100644
--- a/src/box/wal_mem.c
+++ b/src/box/wal_mem.c
@@ -247,14 +247,13 @@ wal_mem_cursor_next(struct wal_mem *wal_mem,
 	}
 
 	struct wal_mem_buf *mem_buf;
-	size_t last_row_index;
 
 next_buffer:
 	mem_buf = wal_mem->buf +
-		  wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
-	last_row_index = ibuf_used(&mem_buf->rows) /
-			 sizeof(struct wal_mem_buf_row);
-	if (last_row_index == wal_mem_cursor->row_index) {
+		  wal_mem_cursor->buf_index % WAL_MEM_BUF_COUNT;
+	size_t buf_row_count = ibuf_used(&mem_buf->rows) /
+			       sizeof(struct wal_mem_buf_row);
+	if (buf_row_count == wal_mem_cursor->row_index) {
 		/* No more rows in the current buffer. */
 		if (wal_mem->last_buf_index == wal_mem_cursor->buf_index)
 			/* No more rows in the memory. */
@@ -269,5 +268,6 @@ next_buffer:
 	*row = &buf_row->xrow;
 	*data = buf_row->data;
 	*size = buf_row->size;
+	++wal_mem_cursor->row_index;
 	return 0;
 }
-- 
2.22.0





More information about the Tarantool-patches mailing list