[tarantool-patches] [PATCH 3/7] Replication: Relay does not rely on xlog boundaries

Georgy Kirichenko georgy at tarantool.org
Tue Aug 13 09:27:41 MSK 2019


When relay got an ACK then it matches received vclock against xlog file
boundaries detected using on_close_log trigger and send a consumer
advance message. However, for each ACK relay send a status update
message to the tx cord which could be used for gc purposes.
This patch removes any knowledge about xlog boundaries from relay
because there would not any xlog files in case of in-memory replication.
As gc now tracks all xlog files then it is able to handle garbage files
using relay status updates.

Note: after parallel applier there is no more one ACK per transaction
so it should not be too expensive to advance a consumer on each status
update. However I think it could be improved, for instance with tracking
the next wal file vclock.
---
 src/box/gc.c     |  58 +++++++++++++++++++++------
 src/box/relay.cc | 102 +----------------------------------------------
 2 files changed, 48 insertions(+), 112 deletions(-)

diff --git a/src/box/gc.c b/src/box/gc.c
index 944a6f3b2..9771e407a 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -66,6 +66,17 @@ gc_cleanup_fiber_f(va_list);
 static int
 gc_checkpoint_fiber_f(va_list);
 
+/*
+ * A shortcut function which checks if one vclock equal
+ * or greather than another.
+ */
+static inline bool
+gc_vclock_ge(const struct vclock *first, const struct vclock *second)
+{
+	int cmp = vclock_compare(first, second);
+	return (cmp == 0) || (cmp == 1);
+}
+
 /**
  * Comparator used for ordering gc_consumer objects by signature
  * in a binary tree.
@@ -201,8 +212,7 @@ gc_run_cleanup(void)
 	if (vclock == NULL ||
 	    vclock_sum(vclock) > vclock_sum(&checkpoint->vclock))
 		vclock = &checkpoint->vclock;
-	int cmp = vclock_compare(vclock, &replicaset.vclock);
-	if (gc.log_opened || !(cmp == 0 || cmp == 1))
+	if (gc.log_opened || !gc_vclock_ge(vclock, &replicaset.vclock))
 		vclock = vclockset_psearch(&gc.wal_dir.index, vclock);
 	run_wal_gc = vclock != NULL;
 
@@ -558,9 +568,23 @@ gc_consumer_register(const struct vclock *vclock, const char *format, ...)
 	va_start(ap, format);
 	vsnprintf(consumer->name, GC_NAME_MAX, format, ap);
 	va_end(ap);
-
-	vclock_copy(&consumer->vclock, vclock);
-	gc_tree_insert(&gc.consumers, consumer);
+	/* Found vclock of a wal file which contains a vclock from relay. */
+	struct vclock *track_vclock = vclockset_psearch(&gc.wal_dir.index,
+							vclock);
+	if (track_vclock == NULL && !gc.log_opened &&
+	    gc_vclock_ge(vclock, &replicaset.vclock))  {
+		/*
+		 * There is no wal file in index containing the vclock
+		 * which is possible when a consumer is up to date with
+		 * the last checkpoint and there were no subsequent writes.
+		 */
+		track_vclock = &replicaset.vclock;
+	}
+	if (vclock != NULL) {
+		vclock_copy(&consumer->vclock, track_vclock);
+		gc_tree_insert(&gc.consumers, consumer);
+	} else
+		consumer->is_inactive = true;
 	return consumer;
 }
 
@@ -580,20 +604,30 @@ gc_consumer_advance(struct gc_consumer *consumer, const struct vclock *vclock)
 	if (consumer->is_inactive)
 		return;
 
-	int64_t signature = vclock_sum(vclock);
-	int64_t prev_signature = vclock_sum(&consumer->vclock);
-
-	assert(signature >= prev_signature);
-	if (signature == prev_signature)
-		return; /* nothing to do */
+	/*
+	 * In some rare cases relay could downgrade vclock, for instance in
+	 * case of replica data loss, and there is nothing we could do.
+	 */
+	if (!gc_vclock_ge(vclock, &consumer->vclock))
+		return;
 
+	/* Detect which wal file contains ack-ed relay position. */
+	if (!gc.log_opened && gc_vclock_ge(vclock, &replicaset.vclock)) {
+		/*
+		 * Relay is up to date with this instance and there is no
+		 * wal file (and no writes) after the last checkpoint.
+		 */
+		vclock = &replicaset.vclock;
+	} else
+		vclock = vclockset_psearch(&gc.wal_dir.index, vclock);
+	assert(vclock != NULL);
 	/*
 	 * Do not update the tree unless the tree invariant
 	 * is violated.
 	 */
 	struct gc_consumer *next = gc_tree_next(&gc.consumers, consumer);
 	bool update_tree = (next != NULL &&
-			    signature >= vclock_sum(&next->vclock));
+			    vclock_sum(vclock) >= vclock_sum(&next->vclock));
 
 	if (update_tree)
 		gc_tree_remove(&gc.consumers, consumer);
diff --git a/src/box/relay.cc b/src/box/relay.cc
index efa3373f9..a1b841291 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -66,23 +66,6 @@ struct relay_status_msg {
 	struct vclock vclock;
 };
 
-/**
- * Cbus message to update replica gc state in tx thread.
- */
-struct relay_gc_msg {
-	/** Parent */
-	struct cmsg msg;
-	/**
-	 * Link in the list of pending gc messages,
-	 * see relay::pending_gc.
-	 */
-	struct stailq_entry in_pending;
-	/** Relay instance */
-	struct relay *relay;
-	/** Vclock to advance to */
-	struct vclock vclock;
-};
-
 /** State of a replication relay. */
 struct relay {
 	/** The thread in which we relay data to the replica. */
@@ -239,12 +222,6 @@ relay_exit(struct relay *relay)
 static void
 relay_stop(struct relay *relay)
 {
-	struct relay_gc_msg *gc_msg, *next_gc_msg;
-	stailq_foreach_entry_safe(gc_msg, next_gc_msg,
-				  &relay->pending_gc, in_pending) {
-		free(gc_msg);
-	}
-	stailq_create(&relay->pending_gc);
 	if (relay->r != NULL)
 		recovery_delete(relay->r);
 	relay->r = NULL;
@@ -368,6 +345,7 @@ tx_status_update(struct cmsg *msg)
 {
 	struct relay_status_msg *status = (struct relay_status_msg *)msg;
 	vclock_copy(&status->relay->tx.vclock, &status->vclock);
+	gc_consumer_advance(status->relay->replica->gc, &status->vclock);
 	static const struct cmsg_hop route[] = {
 		{relay_status_update, NULL}
 	};
@@ -375,73 +353,6 @@ tx_status_update(struct cmsg *msg)
 	cpipe_push(&status->relay->relay_pipe, msg);
 }
 
-/**
- * Update replica gc state in tx thread.
- */
-static void
-tx_gc_advance(struct cmsg *msg)
-{
-	struct relay_gc_msg *m = (struct relay_gc_msg *)msg;
-	gc_consumer_advance(m->relay->replica->gc, &m->vclock);
-	free(m);
-}
-
-static void
-relay_on_close_log_f(struct trigger *trigger, void * /* event */)
-{
-	static const struct cmsg_hop route[] = {
-		{tx_gc_advance, NULL}
-	};
-	struct relay *relay = (struct relay *)trigger->data;
-	struct relay_gc_msg *m = (struct relay_gc_msg *)malloc(sizeof(*m));
-	if (m == NULL) {
-		say_warn("failed to allocate relay gc message");
-		return;
-	}
-	cmsg_init(&m->msg, route);
-	m->relay = relay;
-	vclock_copy(&m->vclock, &relay->r->vclock);
-	/*
-	 * Do not invoke garbage collection until the replica
-	 * confirms that it has received data stored in the
-	 * sent xlog.
-	 */
-	stailq_add_tail_entry(&relay->pending_gc, m, in_pending);
-}
-
-/**
- * Invoke pending garbage collection requests.
- *
- * This function schedules the most recent gc message whose
- * vclock is less than or equal to the given one. Older
- * messages are discarded as their job will be done by the
- * scheduled message anyway.
- */
-static inline void
-relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
-{
-	struct relay_gc_msg *curr, *next, *gc_msg = NULL;
-	stailq_foreach_entry_safe(curr, next, &relay->pending_gc, in_pending) {
-		/*
-		 * We may delete a WAL file only if its vclock is
-		 * less than or equal to the vclock acknowledged by
-		 * the replica. Even if the replica's signature is
-		 * is greater, but the vclocks are incomparable, we
-		 * must not delete the WAL, because there may still
-		 * be rows not applied by the replica in it while
-		 * the greater signatures is due to changes pulled
-		 * from other members of the cluster.
-		 */
-		if (vclock_compare(&curr->vclock, vclock) > 0)
-			break;
-		stailq_shift(&relay->pending_gc);
-		free(gc_msg);
-		gc_msg = curr;
-	}
-	if (gc_msg != NULL)
-		cpipe_push(&relay->tx_pipe, &gc_msg->msg);
-}
-
 static void
 relay_set_error(struct relay *relay, struct error *e)
 {
@@ -543,12 +454,6 @@ relay_subscribe_f(va_list ap)
 	cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
 		  &relay->relay_pipe, NULL, NULL, cbus_process);
 
-	/* Setup garbage collection trigger. */
-	struct trigger on_close_log = {
-		RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL
-	};
-	trigger_add(&r->on_close_log, &on_close_log);
-
 	/* Setup WAL watcher for sending new rows to the replica. */
 	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
 			relay_process_wal_event, cbus_process);
@@ -612,8 +517,6 @@ relay_subscribe_f(va_list ap)
 		vclock_copy(&relay->status_msg.vclock, send_vclock);
 		relay->status_msg.relay = relay;
 		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
-		/* Collect xlog files received by the replica. */
-		relay_schedule_pending_gc(relay, send_vclock);
 	}
 
 	/*
@@ -625,8 +528,7 @@ relay_subscribe_f(va_list ap)
 	diag_log();
 	say_crit("exiting the relay loop");
 
-	/* Clear garbage collector trigger and WAL watcher. */
-	trigger_clear(&on_close_log);
+	/* Clear WAL watcher. */
 	wal_clear_watcher(&relay->wal_watcher, cbus_process);
 
 	/* Join ack reader fiber. */
-- 
2.22.0





More information about the Tarantool-patches mailing list