Tarantool development patches archive
 help / color / mirror / Atom feed
From: Georgy Kirichenko <georgy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: Georgy Kirichenko <georgy@tarantool.org>
Subject: [tarantool-patches] [PATCH 3/7] Replication: Relay does not rely on xlog boundaries
Date: Tue, 13 Aug 2019 09:27:41 +0300	[thread overview]
Message-ID: <cf8b2e805c90c38843edb1133dcb14d89bc9809a.1565676868.git.georgy@tarantool.org> (raw)
In-Reply-To: <cover.1565676868.git.georgy@tarantool.org>

When relay got an ACK then it matches received vclock against xlog file
boundaries detected using on_close_log trigger and send a consumer
advance message. However, for each ACK relay send a status update
message to the tx cord which could be used for gc purposes.
This patch removes any knowledge about xlog boundaries from relay
because there would not any xlog files in case of in-memory replication.
As gc now tracks all xlog files then it is able to handle garbage files
using relay status updates.

Note: after parallel applier there is no more one ACK per transaction
so it should not be too expensive to advance a consumer on each status
update. However I think it could be improved, for instance with tracking
the next wal file vclock.
---
 src/box/gc.c     |  58 +++++++++++++++++++++------
 src/box/relay.cc | 102 +----------------------------------------------
 2 files changed, 48 insertions(+), 112 deletions(-)

diff --git a/src/box/gc.c b/src/box/gc.c
index 944a6f3b2..9771e407a 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -66,6 +66,17 @@ gc_cleanup_fiber_f(va_list);
 static int
 gc_checkpoint_fiber_f(va_list);
 
+/*
+ * A shortcut function which checks if one vclock equal
+ * or greather than another.
+ */
+static inline bool
+gc_vclock_ge(const struct vclock *first, const struct vclock *second)
+{
+	int cmp = vclock_compare(first, second);
+	return (cmp == 0) || (cmp == 1);
+}
+
 /**
  * Comparator used for ordering gc_consumer objects by signature
  * in a binary tree.
@@ -201,8 +212,7 @@ gc_run_cleanup(void)
 	if (vclock == NULL ||
 	    vclock_sum(vclock) > vclock_sum(&checkpoint->vclock))
 		vclock = &checkpoint->vclock;
-	int cmp = vclock_compare(vclock, &replicaset.vclock);
-	if (gc.log_opened || !(cmp == 0 || cmp == 1))
+	if (gc.log_opened || !gc_vclock_ge(vclock, &replicaset.vclock))
 		vclock = vclockset_psearch(&gc.wal_dir.index, vclock);
 	run_wal_gc = vclock != NULL;
 
@@ -558,9 +568,23 @@ gc_consumer_register(const struct vclock *vclock, const char *format, ...)
 	va_start(ap, format);
 	vsnprintf(consumer->name, GC_NAME_MAX, format, ap);
 	va_end(ap);
-
-	vclock_copy(&consumer->vclock, vclock);
-	gc_tree_insert(&gc.consumers, consumer);
+	/* Found vclock of a wal file which contains a vclock from relay. */
+	struct vclock *track_vclock = vclockset_psearch(&gc.wal_dir.index,
+							vclock);
+	if (track_vclock == NULL && !gc.log_opened &&
+	    gc_vclock_ge(vclock, &replicaset.vclock))  {
+		/*
+		 * There is no wal file in index containing the vclock
+		 * which is possible when a consumer is up to date with
+		 * the last checkpoint and there were no subsequent writes.
+		 */
+		track_vclock = &replicaset.vclock;
+	}
+	if (vclock != NULL) {
+		vclock_copy(&consumer->vclock, track_vclock);
+		gc_tree_insert(&gc.consumers, consumer);
+	} else
+		consumer->is_inactive = true;
 	return consumer;
 }
 
@@ -580,20 +604,30 @@ gc_consumer_advance(struct gc_consumer *consumer, const struct vclock *vclock)
 	if (consumer->is_inactive)
 		return;
 
-	int64_t signature = vclock_sum(vclock);
-	int64_t prev_signature = vclock_sum(&consumer->vclock);
-
-	assert(signature >= prev_signature);
-	if (signature == prev_signature)
-		return; /* nothing to do */
+	/*
+	 * In some rare cases relay could downgrade vclock, for instance in
+	 * case of replica data loss, and there is nothing we could do.
+	 */
+	if (!gc_vclock_ge(vclock, &consumer->vclock))
+		return;
 
+	/* Detect which wal file contains ack-ed relay position. */
+	if (!gc.log_opened && gc_vclock_ge(vclock, &replicaset.vclock)) {
+		/*
+		 * Relay is up to date with this instance and there is no
+		 * wal file (and no writes) after the last checkpoint.
+		 */
+		vclock = &replicaset.vclock;
+	} else
+		vclock = vclockset_psearch(&gc.wal_dir.index, vclock);
+	assert(vclock != NULL);
 	/*
 	 * Do not update the tree unless the tree invariant
 	 * is violated.
 	 */
 	struct gc_consumer *next = gc_tree_next(&gc.consumers, consumer);
 	bool update_tree = (next != NULL &&
-			    signature >= vclock_sum(&next->vclock));
+			    vclock_sum(vclock) >= vclock_sum(&next->vclock));
 
 	if (update_tree)
 		gc_tree_remove(&gc.consumers, consumer);
diff --git a/src/box/relay.cc b/src/box/relay.cc
index efa3373f9..a1b841291 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -66,23 +66,6 @@ struct relay_status_msg {
 	struct vclock vclock;
 };
 
-/**
- * Cbus message to update replica gc state in tx thread.
- */
-struct relay_gc_msg {
-	/** Parent */
-	struct cmsg msg;
-	/**
-	 * Link in the list of pending gc messages,
-	 * see relay::pending_gc.
-	 */
-	struct stailq_entry in_pending;
-	/** Relay instance */
-	struct relay *relay;
-	/** Vclock to advance to */
-	struct vclock vclock;
-};
-
 /** State of a replication relay. */
 struct relay {
 	/** The thread in which we relay data to the replica. */
@@ -239,12 +222,6 @@ relay_exit(struct relay *relay)
 static void
 relay_stop(struct relay *relay)
 {
-	struct relay_gc_msg *gc_msg, *next_gc_msg;
-	stailq_foreach_entry_safe(gc_msg, next_gc_msg,
-				  &relay->pending_gc, in_pending) {
-		free(gc_msg);
-	}
-	stailq_create(&relay->pending_gc);
 	if (relay->r != NULL)
 		recovery_delete(relay->r);
 	relay->r = NULL;
@@ -368,6 +345,7 @@ tx_status_update(struct cmsg *msg)
 {
 	struct relay_status_msg *status = (struct relay_status_msg *)msg;
 	vclock_copy(&status->relay->tx.vclock, &status->vclock);
+	gc_consumer_advance(status->relay->replica->gc, &status->vclock);
 	static const struct cmsg_hop route[] = {
 		{relay_status_update, NULL}
 	};
@@ -375,73 +353,6 @@ tx_status_update(struct cmsg *msg)
 	cpipe_push(&status->relay->relay_pipe, msg);
 }
 
-/**
- * Update replica gc state in tx thread.
- */
-static void
-tx_gc_advance(struct cmsg *msg)
-{
-	struct relay_gc_msg *m = (struct relay_gc_msg *)msg;
-	gc_consumer_advance(m->relay->replica->gc, &m->vclock);
-	free(m);
-}
-
-static void
-relay_on_close_log_f(struct trigger *trigger, void * /* event */)
-{
-	static const struct cmsg_hop route[] = {
-		{tx_gc_advance, NULL}
-	};
-	struct relay *relay = (struct relay *)trigger->data;
-	struct relay_gc_msg *m = (struct relay_gc_msg *)malloc(sizeof(*m));
-	if (m == NULL) {
-		say_warn("failed to allocate relay gc message");
-		return;
-	}
-	cmsg_init(&m->msg, route);
-	m->relay = relay;
-	vclock_copy(&m->vclock, &relay->r->vclock);
-	/*
-	 * Do not invoke garbage collection until the replica
-	 * confirms that it has received data stored in the
-	 * sent xlog.
-	 */
-	stailq_add_tail_entry(&relay->pending_gc, m, in_pending);
-}
-
-/**
- * Invoke pending garbage collection requests.
- *
- * This function schedules the most recent gc message whose
- * vclock is less than or equal to the given one. Older
- * messages are discarded as their job will be done by the
- * scheduled message anyway.
- */
-static inline void
-relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
-{
-	struct relay_gc_msg *curr, *next, *gc_msg = NULL;
-	stailq_foreach_entry_safe(curr, next, &relay->pending_gc, in_pending) {
-		/*
-		 * We may delete a WAL file only if its vclock is
-		 * less than or equal to the vclock acknowledged by
-		 * the replica. Even if the replica's signature is
-		 * is greater, but the vclocks are incomparable, we
-		 * must not delete the WAL, because there may still
-		 * be rows not applied by the replica in it while
-		 * the greater signatures is due to changes pulled
-		 * from other members of the cluster.
-		 */
-		if (vclock_compare(&curr->vclock, vclock) > 0)
-			break;
-		stailq_shift(&relay->pending_gc);
-		free(gc_msg);
-		gc_msg = curr;
-	}
-	if (gc_msg != NULL)
-		cpipe_push(&relay->tx_pipe, &gc_msg->msg);
-}
-
 static void
 relay_set_error(struct relay *relay, struct error *e)
 {
@@ -543,12 +454,6 @@ relay_subscribe_f(va_list ap)
 	cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
 		  &relay->relay_pipe, NULL, NULL, cbus_process);
 
-	/* Setup garbage collection trigger. */
-	struct trigger on_close_log = {
-		RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL
-	};
-	trigger_add(&r->on_close_log, &on_close_log);
-
 	/* Setup WAL watcher for sending new rows to the replica. */
 	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
 			relay_process_wal_event, cbus_process);
@@ -612,8 +517,6 @@ relay_subscribe_f(va_list ap)
 		vclock_copy(&relay->status_msg.vclock, send_vclock);
 		relay->status_msg.relay = relay;
 		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
-		/* Collect xlog files received by the replica. */
-		relay_schedule_pending_gc(relay, send_vclock);
 	}
 
 	/*
@@ -625,8 +528,7 @@ relay_subscribe_f(va_list ap)
 	diag_log();
 	say_crit("exiting the relay loop");
 
-	/* Clear garbage collector trigger and WAL watcher. */
-	trigger_clear(&on_close_log);
+	/* Clear WAL watcher. */
 	wal_clear_watcher(&relay->wal_watcher, cbus_process);
 
 	/* Join ack reader fiber. */
-- 
2.22.0

  parent reply	other threads:[~2019-08-13  6:27 UTC|newest]

Thread overview: 18+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-08-13  6:27 [tarantool-patches] [PATCH 0/7] Replication: In-memory replication Georgy Kirichenko
2019-08-13  6:27 ` [tarantool-patches] [PATCH 1/7] Refactoring: wal writer fiber and queue Georgy Kirichenko
2019-08-16 13:53   ` [tarantool-patches] " Konstantin Osipov
2019-08-20 10:57     ` Георгий Кириченко
2019-08-21 10:18   ` [tarantool-patches] " Vladimir Davydov
2019-08-13  6:27 ` [tarantool-patches] [PATCH 2/7] Refactoring: Track wal files using gc state Georgy Kirichenko
2019-08-21 10:44   ` Vladimir Davydov
2019-08-13  6:27 ` Georgy Kirichenko [this message]
2019-08-21 11:35   ` [tarantool-patches] [PATCH 3/7] Replication: Relay does not rely on xlog boundaries Vladimir Davydov
2019-08-13  6:27 ` [tarantool-patches] [PATCH 4/7] Replication: wal memory buffer Georgy Kirichenko
2019-08-21 11:57   ` Vladimir Davydov
2019-08-13  6:27 ` [tarantool-patches] [PATCH 5/7] Replication: in memory replication Georgy Kirichenko
2019-08-21 13:52   ` Vladimir Davydov
2019-08-13  6:27 ` [tarantool-patches] [PATCH 6/7] Refactoring: remove wal_watcher routines Georgy Kirichenko
2019-08-21 13:52   ` Vladimir Davydov
2019-08-13  6:27 ` [tarantool-patches] [PATCH 7/7] Refactoring: get rid of on_close_log Georgy Kirichenko
2019-08-21 13:52   ` Vladimir Davydov
2019-08-16 13:47 ` [tarantool-patches] Re: [PATCH 0/7] Replication: In-memory replication Konstantin Osipov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=cf8b2e805c90c38843edb1133dcb14d89bc9809a.1565676868.git.georgy@tarantool.org \
    --to=georgy@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [tarantool-patches] [PATCH 3/7] Replication: Relay does not rely on xlog boundaries' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox