[tarantool-patches] [PATCH v2 2/2] Track wal vclock changes instead of copying

Georgy Kirichenko georgy at tarantool.org
Wed Feb 13 11:35:17 MSK 2019


Spare some vclock_copy invocations because they could be expensive.

Follow-up #2283
---
 src/box/vclock.h | 14 ++++++++++++++
 src/box/wal.c    | 44 ++++++++++++++++++++++++++++++--------------
 2 files changed, 44 insertions(+), 14 deletions(-)

diff --git a/src/box/vclock.h b/src/box/vclock.h
index a59b2bddb..0c9996902 100644
--- a/src/box/vclock.h
+++ b/src/box/vclock.h
@@ -227,6 +227,20 @@ vclock_sum(const struct vclock *vclock)
 int64_t
 vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn);
 
+/**
+ * Merge all diff changes into the destination
+ * vclock and after reset the diff.
+ */
+static inline void
+vclock_merge(struct vclock *dst, struct vclock *diff)
+{
+	struct vclock_iterator it;
+	vclock_iterator_init(&it, diff);
+	vclock_foreach(&it, item)
+		vclock_follow(dst, item.id, vclock_get(dst, item.id) + item.lsn);
+	vclock_create(diff);
+}
+
 /**
  * \brief Format vclock to YAML-compatible string representation:
  * { replica_id: lsn, replica_id:lsn })
diff --git a/src/box/wal.c b/src/box/wal.c
index 0b49548c0..b2652bb17 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -886,17 +886,25 @@ wal_writer_begin_rollback(struct wal_writer *writer)
 	cpipe_push(&writer->tx_prio_pipe, &writer->in_rollback);
 }
 
+/*
+ * Assign lsn and replica identifier for local writes and track
+ * row into vclock_diff.
+ */
 static void
-wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
+wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
+	       struct xrow_header **row,
 	       struct xrow_header **end)
 {
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
 		if ((*row)->replica_id == 0) {
-			(*row)->lsn = vclock_inc(vclock, instance_id);
+			(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
+				      vclock_get(base, instance_id);
 			(*row)->replica_id = instance_id;
 		} else {
-			vclock_follow_xrow(vclock, *row);
+			vclock_follow(vclock_diff, (*row)->replica_id,
+				      (*row)->lsn - vclock_get(base,
+							       (*row)->replica_id));
 		}
 	}
 }
@@ -909,13 +917,12 @@ wal_write_to_disk(struct cmsg *msg)
 	struct error *error;
 
 	/*
-	 * In order not to promote writer's vclock in case of an error,
-	 * we create a copy to assign LSNs before rows are actually
-	 * written. After successful xlog flush we update writer's vclock
-	 * to the last written vclock value.
+	 * Track all vclock changes made by this batch into
+	 * vclock_diff variable and then apply it into writers'
+	 * vclock after each xlog flush.
 	 */
-	struct vclock vclock;
-	vclock_copy(&vclock, &writer->vclock);
+	struct vclock vclock_diff;
+	vclock_create(&vclock_diff);
 
 	struct errinj *inj = errinj(ERRINJ_WAL_DELAY, ERRINJ_BOOL);
 	while (inj != NULL && inj->bparam)
@@ -924,18 +931,21 @@ wal_write_to_disk(struct cmsg *msg)
 	if (writer->in_rollback.route != NULL) {
 		/* We're rolling back a failed write. */
 		stailq_concat(&wal_msg->rollback, &wal_msg->commit);
+		vclock_copy(&wal_msg->vclock, &writer->vclock);
 		return;
 	}
 
 	/* Xlog is only rotated between queue processing  */
 	if (wal_opt_rotate(writer) != 0) {
 		stailq_concat(&wal_msg->rollback, &wal_msg->commit);
+		vclock_copy(&wal_msg->vclock, &writer->vclock);
 		return wal_writer_begin_rollback(writer);
 	}
 
 	/* Ensure there's enough disk space before writing anything. */
 	if (wal_fallocate(writer, wal_msg->approx_len) != 0) {
 		stailq_concat(&wal_msg->rollback, &wal_msg->commit);
+		vclock_copy(&wal_msg->vclock, &writer->vclock);
 		return wal_writer_begin_rollback(writer);
 	}
 
@@ -969,15 +979,17 @@ wal_write_to_disk(struct cmsg *msg)
 	struct journal_entry *entry;
 	struct stailq_entry *last_committed = NULL;
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
-		wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
-		entry->res = vclock_sum(&vclock);
+		wal_assign_lsn(&vclock_diff, &writer->vclock,
+			       entry->rows, entry->rows + entry->n_rows);
+		entry->res = vclock_sum(&vclock_diff) +
+			     vclock_sum(&writer->vclock);
 		rc = xlog_write_entry(l, entry);
 		if (rc < 0)
 			goto done;
 		if (rc > 0) {
 			writer->checkpoint_wal_size += rc;
 			last_committed = &entry->fifo;
-			vclock_copy(&writer->vclock, &vclock);
+			vclock_merge(&writer->vclock, &vclock_diff);
 		}
 		/* rc == 0: the write is buffered in xlog_tx */
 	}
@@ -987,7 +999,7 @@ wal_write_to_disk(struct cmsg *msg)
 
 	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
-	vclock_copy(&writer->vclock, &vclock);
+	vclock_merge(&writer->vclock, &vclock_diff);
 
 	/*
 	 * Notify TX if the checkpoint threshold has been exceeded.
@@ -1162,7 +1174,11 @@ wal_write_in_wal_mode_none(struct journal *journal,
 			   struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
-	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
+	struct vclock vclock_diff;
+	vclock_create(&vclock_diff);
+	wal_assign_lsn(&vclock_diff, &writer->vclock, entry->rows,
+		       entry->rows + entry->n_rows);
+	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
 	return vclock_sum(&writer->vclock);
 }
-- 
2.20.1





More information about the Tarantool-patches mailing list