[tarantool-patches] [PATCH v2 2/2] Track wal vclock changes instead of copying
Georgy Kirichenko
georgy at tarantool.org
Wed Feb 13 11:35:17 MSK 2019
Spare some vclock_copy invocations because they could be expensive.
Follow-up #2283
---
src/box/vclock.h | 14 ++++++++++++++
src/box/wal.c | 44 ++++++++++++++++++++++++++++++--------------
2 files changed, 44 insertions(+), 14 deletions(-)
diff --git a/src/box/vclock.h b/src/box/vclock.h
index a59b2bddb..0c9996902 100644
--- a/src/box/vclock.h
+++ b/src/box/vclock.h
@@ -227,6 +227,20 @@ vclock_sum(const struct vclock *vclock)
int64_t
vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn);
+/**
+ * Merge all diff changes into the destination
+ * vclock and after reset the diff.
+ */
+static inline void
+vclock_merge(struct vclock *dst, struct vclock *diff)
+{
+ struct vclock_iterator it;
+ vclock_iterator_init(&it, diff);
+ vclock_foreach(&it, item)
+ vclock_follow(dst, item.id, vclock_get(dst, item.id) + item.lsn);
+ vclock_create(diff);
+}
+
/**
* \brief Format vclock to YAML-compatible string representation:
* { replica_id: lsn, replica_id:lsn })
diff --git a/src/box/wal.c b/src/box/wal.c
index 0b49548c0..b2652bb17 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -886,17 +886,25 @@ wal_writer_begin_rollback(struct wal_writer *writer)
cpipe_push(&writer->tx_prio_pipe, &writer->in_rollback);
}
+/*
+ * Assign lsn and replica identifier for local writes and track
+ * row into vclock_diff.
+ */
static void
-wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
+wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
+ struct xrow_header **row,
struct xrow_header **end)
{
/** Assign LSN to all local rows. */
for ( ; row < end; row++) {
if ((*row)->replica_id == 0) {
- (*row)->lsn = vclock_inc(vclock, instance_id);
+ (*row)->lsn = vclock_inc(vclock_diff, instance_id) +
+ vclock_get(base, instance_id);
(*row)->replica_id = instance_id;
} else {
- vclock_follow_xrow(vclock, *row);
+ vclock_follow(vclock_diff, (*row)->replica_id,
+ (*row)->lsn - vclock_get(base,
+ (*row)->replica_id));
}
}
}
@@ -909,13 +917,12 @@ wal_write_to_disk(struct cmsg *msg)
struct error *error;
/*
- * In order not to promote writer's vclock in case of an error,
- * we create a copy to assign LSNs before rows are actually
- * written. After successful xlog flush we update writer's vclock
- * to the last written vclock value.
+ * Track all vclock changes made by this batch into
+ * vclock_diff variable and then apply it into writers'
+ * vclock after each xlog flush.
*/
- struct vclock vclock;
- vclock_copy(&vclock, &writer->vclock);
+ struct vclock vclock_diff;
+ vclock_create(&vclock_diff);
struct errinj *inj = errinj(ERRINJ_WAL_DELAY, ERRINJ_BOOL);
while (inj != NULL && inj->bparam)
@@ -924,18 +931,21 @@ wal_write_to_disk(struct cmsg *msg)
if (writer->in_rollback.route != NULL) {
/* We're rolling back a failed write. */
stailq_concat(&wal_msg->rollback, &wal_msg->commit);
+ vclock_copy(&wal_msg->vclock, &writer->vclock);
return;
}
/* Xlog is only rotated between queue processing */
if (wal_opt_rotate(writer) != 0) {
stailq_concat(&wal_msg->rollback, &wal_msg->commit);
+ vclock_copy(&wal_msg->vclock, &writer->vclock);
return wal_writer_begin_rollback(writer);
}
/* Ensure there's enough disk space before writing anything. */
if (wal_fallocate(writer, wal_msg->approx_len) != 0) {
stailq_concat(&wal_msg->rollback, &wal_msg->commit);
+ vclock_copy(&wal_msg->vclock, &writer->vclock);
return wal_writer_begin_rollback(writer);
}
@@ -969,15 +979,17 @@ wal_write_to_disk(struct cmsg *msg)
struct journal_entry *entry;
struct stailq_entry *last_committed = NULL;
stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
- wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
- entry->res = vclock_sum(&vclock);
+ wal_assign_lsn(&vclock_diff, &writer->vclock,
+ entry->rows, entry->rows + entry->n_rows);
+ entry->res = vclock_sum(&vclock_diff) +
+ vclock_sum(&writer->vclock);
rc = xlog_write_entry(l, entry);
if (rc < 0)
goto done;
if (rc > 0) {
writer->checkpoint_wal_size += rc;
last_committed = &entry->fifo;
- vclock_copy(&writer->vclock, &vclock);
+ vclock_merge(&writer->vclock, &vclock_diff);
}
/* rc == 0: the write is buffered in xlog_tx */
}
@@ -987,7 +999,7 @@ wal_write_to_disk(struct cmsg *msg)
writer->checkpoint_wal_size += rc;
last_committed = stailq_last(&wal_msg->commit);
- vclock_copy(&writer->vclock, &vclock);
+ vclock_merge(&writer->vclock, &vclock_diff);
/*
* Notify TX if the checkpoint threshold has been exceeded.
@@ -1162,7 +1174,11 @@ wal_write_in_wal_mode_none(struct journal *journal,
struct journal_entry *entry)
{
struct wal_writer *writer = (struct wal_writer *) journal;
- wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
+ struct vclock vclock_diff;
+ vclock_create(&vclock_diff);
+ wal_assign_lsn(&vclock_diff, &writer->vclock, entry->rows,
+ entry->rows + entry->n_rows);
+ vclock_merge(&writer->vclock, &vclock_diff);
vclock_copy(&replicaset.vclock, &writer->vclock);
return vclock_sum(&writer->vclock);
}
--
2.20.1
More information about the Tarantool-patches
mailing list