* [Tarantool-patches] [PATCH v2 1/5] box: introduce matrix clock
2020-03-18 19:47 [Tarantool-patches] [PATCH v2 0/5] replication: fix local space tracking Serge Petrenko
@ 2020-03-18 19:47 ` Serge Petrenko
2020-03-18 20:08 ` Konstantin Osipov
2020-03-18 19:47 ` [Tarantool-patches] [PATCH v2 2/5] wal: track consumer vclock and collect logs in wal thread Serge Petrenko
` (5 subsequent siblings)
6 siblings, 1 reply; 17+ messages in thread
From: Serge Petrenko @ 2020-03-18 19:47 UTC (permalink / raw)
To: v.shpilevoy, kostja.osipov, georgy; +Cc: tarantool-patches
From: Georgy Kirichenko <georgy@tarantool.org>
A matrix clock which allows to maintain a set of vclocks and
their components order. The main target is to be able to
build a vclock which contains lsns each one is less or equal
that n corresponding lsn from a matrix clock.
The purpose of the matrix clock is to evaluate a vclock
which is already processed by wal consumers like relays
or to obtain a majority vclock to commit journal entries
in case of synchronous replication.
@sergepetrenko: refactoring & rewrite comments to doxygen style.
Part of #980, #3794
Prerequisite #4114
---
src/box/CMakeLists.txt | 4 +
src/box/mclock.c | 394 +++++++++++++++++++++++++++++++++++++++
src/box/mclock.h | 151 +++++++++++++++
src/box/wal.c | 1 -
test/unit/CMakeLists.txt | 2 +
test/unit/mclock.result | 18 ++
test/unit/mclock.test.c | 160 ++++++++++++++++
7 files changed, 729 insertions(+), 1 deletion(-)
create mode 100644 src/box/mclock.c
create mode 100644 src/box/mclock.h
create mode 100644 test/unit/mclock.result
create mode 100644 test/unit/mclock.test.c
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 56758bd2f..cbffab046 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -32,6 +32,9 @@ target_link_libraries(box_error core stat)
add_library(vclock STATIC vclock.c)
target_link_libraries(vclock core)
+add_library(mclock STATIC mclock.c)
+target_link_libraries(mclock vclock core)
+
add_library(xrow STATIC xrow.c iproto_constants.c)
target_link_libraries(xrow server core small vclock misc box_error
scramble ${MSGPUCK_LIBRARIES})
@@ -134,6 +137,7 @@ add_library(box STATIC
execute.c
sql_stmt_cache.c
wal.c
+ mclock.c
call.c
merger.c
${lua_sources}
diff --git a/src/box/mclock.c b/src/box/mclock.c
new file mode 100644
index 000000000..de4ca2a08
--- /dev/null
+++ b/src/box/mclock.c
@@ -0,0 +1,394 @@
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "mclock.h"
+
+void
+mclock_create(struct mclock *mclock)
+{
+ memset(mclock, 0, sizeof(struct mclock));
+}
+
+void
+mclock_destroy(struct mclock *mclock)
+{
+ memset(mclock, 0, sizeof(struct mclock));
+}
+
+/**
+ * Check whether vclock contains any ids unknown to the matrix
+ * clock and initialize the corresponding column, if it does.
+ *
+ * @param mclock Matrix clock.
+ * @param id the replica id vector clock comes from.
+ * @param vclock Vector clock.
+ */
+static void
+mclock_adjust_col_map(struct mclock *mclock, uint32_t id,
+ const struct vclock *vclock)
+{
+ assert(id < VCLOCK_MAX);
+ /* Evaluate new matrix column identifiers. */
+ uint32_t new_col_map = vclock->map & ~mclock->col_map;
+ struct bit_iterator col_map_it;
+ bit_iterator_init(&col_map_it, &new_col_map, sizeof(new_col_map), true);
+ for (size_t col_id = bit_iterator_next(&col_map_it); col_id < VCLOCK_MAX;
+ col_id = bit_iterator_next(&col_map_it)) {
+ /* Register new replica identifier. */
+ mclock->col_map |= (1 << col_id);
+ struct bit_iterator row_map_it;
+ bit_iterator_init(&row_map_it, &mclock->row_map,
+ sizeof(mclock->row_map), true);
+ /* Buuild an order map for given column. */
+ mclock->order[col_id][0] = id;
+ for (size_t row_id = bit_iterator_next(&row_map_it), i = 1;
+ row_id < VCLOCK_MAX;
+ row_id = bit_iterator_next(&row_map_it)) {
+ if (row_id != id)
+ mclock->order[col_id][i++] = row_id;
+ }
+ }
+}
+
+/**
+ * Fetch the n-th largest lsn for the given replica id from the
+ * matrix clock.
+ *
+ * @param mclock Matrix clock.
+ * @param col_id Replica id to fetch lsn for.
+ * @param pos The position of desired lsn in ordered array.
+ *
+ * @return An lsn from the \a pos position in the ordered array
+ * corresponding to \a col_id.
+ */
+static inline int64_t
+mclock_get_pos_lsn(const struct mclock *mclock, uint32_t col_id, uint32_t pos)
+{
+ assert(col_id < VCLOCK_MAX);
+ assert(pos < VCLOCK_MAX);
+ uint32_t row_id = mclock->order[col_id][pos];
+ return vclock_get(mclock->vclock + row_id, col_id);
+}
+
+/**
+ * Search the ordered array corresponding to the given replica id
+ * for the range which contains entries with given lsn.
+ *
+ * @param mclock Matrix clock.
+ * @param col_id Replica id to perform search for.
+ * @param lsn Lsn to match.
+ * @param[in,out] from The beginning of the found range.
+ * @param[in,out] to The end of the found range.
+ */
+static inline void
+mclock_find_range(const struct mclock *mclock, uint32_t col_id, int64_t lsn,
+ uint32_t *from, uint32_t *to)
+{
+ /* Setup initial search ranges for the binary search. */
+ uint32_t from_lo = *from, from_hi = *to;
+ uint32_t to_lo = *from, to_hi = *to;
+ /* Look for `from' position. */
+ while (from_hi - from_lo > 1) {
+ uint32_t mid = (from_lo + from_hi) / 2;
+ int64_t mid_lsn = mclock_get_pos_lsn(mclock, col_id, mid);
+ if (mid_lsn <= lsn)
+ from_hi = mid;
+ else
+ from_lo = mid;
+ /*
+ * Optimization: check if we could decrease
+ * the `to' search range.
+ */
+ if (mid_lsn < lsn)
+ to_hi = MIN(mid, to_hi);
+ else
+ to_lo = MAX(mid, to_lo);
+ }
+ if (mclock_get_pos_lsn(mclock, col_id, from_lo) > lsn)
+ *from = from_hi;
+ else
+ *from = from_lo;
+ /* Look for `to' position. */
+ while (to_hi - to_lo > 1) {
+ uint32_t mid = (to_lo + to_hi) / 2;
+ int64_t mid_lsn = mclock_get_pos_lsn(mclock, col_id, mid);
+ if (mid_lsn < lsn)
+ to_hi = mid;
+ else
+ to_lo = mid;
+ }
+ *to = to_hi;
+}
+
+/**
+ * Make space for an array element being moved from \a old_pos
+ * to \a new pos. In order to do so, shift all array members
+ * between old pos and new pos in the direction of \a old_pos,
+ * effectively making space for the element at \a new_pos and
+ * rewriting it at \a old_pos at the same time.
+ *
+ * @param mclock Matrix clock.
+ * @param col_id The column id to shift.
+ * @param old_pos The previous position of the array element
+ * being moved.
+ * @param new_pos Desired position of the array element being
+ * moved.
+ */
+static inline void
+mclock_shift(struct mclock *mclock, uint32_t col_id, uint32_t old_pos,
+ uint32_t new_pos)
+{
+ if (old_pos > new_pos) {
+ memmove(mclock->order[col_id] + new_pos + 1,
+ mclock->order[col_id] + new_pos,
+ (old_pos - new_pos) * sizeof(**mclock->order));
+ } else if (old_pos < new_pos) {
+ memmove(mclock->order[col_id] + old_pos,
+ mclock->order[col_id] + old_pos + 1,
+ (new_pos - old_pos) * sizeof(**mclock->order));
+ }
+}
+
+/**
+ * Update replica vclock and reorder mclock members.
+ *
+ * @param mclock Matrix clock.
+ * @param id Replica id whose vclock to update.
+ * @param vclock Vector clock.
+ */
+static void
+mclock_update_vclock(struct mclock *mclock, uint32_t id,
+ const struct vclock *vclock)
+{
+ uint32_t count = __builtin_popcount(mclock->row_map);
+ mclock_adjust_col_map(mclock, id, vclock);
+ /* Perform reordering for each column. */
+ struct bit_iterator col_map_it;
+ bit_iterator_init(&col_map_it, &mclock->col_map,
+ sizeof(mclock->col_map), true);
+ for (size_t col_id = bit_iterator_next(&col_map_it); col_id < VCLOCK_MAX;
+ col_id = bit_iterator_next(&col_map_it)) {
+ int64_t new_lsn = vclock_get(vclock, col_id);
+ int64_t old_lsn = vclock_get(mclock->vclock + id, col_id);
+
+ if (old_lsn == new_lsn)
+ continue;
+ /*
+ * Find a positions range which contains given
+ * replica id for current column (old lsn position).
+ */
+ uint32_t from = 0, to = count;
+ mclock_find_range(mclock, col_id, old_lsn, &from, &to);
+ assert(to > from);
+ uint32_t old_pos = from, new_pos;
+ while (old_pos < to) {
+ uint32_t replica_id = mclock->order[col_id][old_pos];
+ if (replica_id == id)
+ break;
+ ++old_pos;
+ }
+ /* Replica id should be found. */
+ assert(old_pos < to);
+ if (new_lsn > mclock_get_pos_lsn(mclock, col_id, 0)) {
+ /*
+ * New lsn is the biggest one so put on
+ * the first position in a column.
+ */
+ new_pos = 0;
+ } else if (new_lsn <= mclock_get_pos_lsn(mclock, col_id,
+ count - 1)) {
+ /* The least one - the last position. */
+ new_pos = count - 1;
+ } else {
+ /* Find a range of position which contains new lsn. */
+ if (new_lsn > old_lsn)
+ from = 0;
+ else
+ to = count;
+ mclock_find_range(mclock, col_id, new_lsn, &from, &to);
+ /* Take care about positions shift - to the
+ * head or to the tail of column order map.
+ */
+ new_pos = to - (new_lsn <= old_lsn? 1: 0);
+ }
+
+ if (old_pos == new_pos)
+ continue;
+ mclock_shift(mclock, col_id, old_pos, new_pos);
+ mclock->order[col_id][new_pos] = id;
+ }
+
+ vclock_copy(&mclock->vclock[id], vclock);
+}
+
+/**
+ * Delete the vclock coming from a replica with given id.
+ *
+ * @param mclock Matrix clock.
+ * @param id Replica id whose vclock to remove.
+ */
+static void
+mclock_delete_vclock(struct mclock *mclock, uint32_t id)
+{
+ assert((mclock->row_map & (1 << id)) != 0);
+ uint32_t count = __builtin_popcount(mclock->row_map);
+ /* Perform reordering for each column. */
+ struct bit_iterator col_map_it;
+ bit_iterator_init(&col_map_it, &mclock->col_map,
+ sizeof(mclock->col_map), true);
+
+ for (size_t col_id = bit_iterator_next(&col_map_it);
+ col_id < VCLOCK_MAX; col_id = bit_iterator_next(&col_map_it)) {
+ int64_t old_lsn = vclock_get(mclock->vclock + id, col_id);
+ /*
+ * Find a positions range which contains given
+ * replica id for current column (old lsn position).
+ */
+ uint32_t from = 0, to = count;
+ mclock_find_range(mclock, col_id, old_lsn, &from, &to);
+ assert(to > from);
+ uint32_t old_pos = from, new_pos = count - 1;
+ while (old_pos < to) {
+ uint32_t replica_id = mclock->order[col_id][old_pos];
+ if (replica_id == id)
+ break;
+ ++old_pos;
+ }
+ /* Replica id should be found. */
+ assert(old_pos < to);
+ new_pos = count - 1;
+
+ if (old_pos == new_pos)
+ continue;
+ mclock_shift(mclock, col_id, old_pos, new_pos);
+ }
+ mclock->row_map ^= (1 << id);
+}
+
+void
+mclock_update(struct mclock *mclock, uint32_t id, const struct vclock *vclock)
+{
+ /* Vclock is zero - delete the corresponding entry. */
+ if (vclock_sum(vclock) == 0) {
+ if ((mclock->row_map & (1 << id)) != 0)
+ mclock_delete_vclock(mclock, id);
+ return;
+ }
+ /*
+ * The given replica id is not yet attached so
+ * put a zero vclock on the last position with
+ * corresponding replica identifier.
+ */
+ if ((mclock->row_map & (1 << id)) == 0) {
+ vclock_create(&mclock->vclock[id]);
+ mclock->row_map |= 1 << id;
+ uint32_t count = __builtin_popcount(mclock->row_map);
+ struct bit_iterator col_map_it;
+ bit_iterator_init(&col_map_it, &mclock->col_map,
+ sizeof(mclock->col_map), true);
+ for (size_t col_id = bit_iterator_next(&col_map_it);
+ col_id < VCLOCK_MAX;
+ col_id = bit_iterator_next(&col_map_it)) {
+ mclock->order[col_id][count - 1] = id;
+ }
+ }
+ mclock_update_vclock(mclock, id, vclock);
+}
+
+int
+mclock_get(struct mclock *mclock, int32_t offset, struct vclock *vclock)
+{
+ int32_t count = __builtin_popcount(mclock->row_map);
+ /* Check if given offset is out of mclock range. */
+ if (offset >= count || offset < -count) {
+ vclock_create(vclock);
+ return -1;
+ }
+ offset = (offset + count) % count;
+ vclock_create(vclock);
+ /* Fetch lsn for each known replica identifier. */
+ struct bit_iterator col_map_it;
+ bit_iterator_init(&col_map_it, &mclock->col_map,
+ sizeof(mclock->col_map), true);
+ for (size_t col_id = bit_iterator_next(&col_map_it); col_id < VCLOCK_MAX;
+ col_id = bit_iterator_next(&col_map_it)) {
+ int64_t lsn = mclock_get_pos_lsn(mclock, col_id, offset);
+ if (lsn > 0)
+ vclock_follow(vclock, col_id, lsn);
+ }
+ return 0;
+}
+
+int
+mclock_get_row(struct mclock *mclock, uint32_t id, struct vclock *vclock)
+{
+ if (mclock->row_map && (1 << id) == 0)
+ return -1;
+ vclock_copy(vclock, mclock->vclock + id);
+ return 0;
+}
+
+int
+mclock_get_col(struct mclock *mclock, uint32_t id, struct vclock *vclock)
+{
+ vclock_create(vclock);
+ if (mclock->col_map && (1 << id) == 0)
+ return -1;
+
+ struct bit_iterator row_map_it;
+ bit_iterator_init(&row_map_it, &mclock->row_map,
+ sizeof(mclock->row_map), true);
+ for (size_t row_id = bit_iterator_next(&row_map_it);
+ row_id < VCLOCK_MAX; row_id = bit_iterator_next(&row_map_it)) {
+ int64_t lsn = vclock_get(mclock->vclock + row_id, id);
+ if (lsn == 0)
+ continue;
+ vclock_follow(vclock, row_id, lsn);
+ }
+
+ return 0;
+}
+
+bool
+mclock_check(struct mclock *mclock)
+{
+ uint32_t count = __builtin_popcount(mclock->row_map);
+ struct bit_iterator col_map_it;
+ bit_iterator_init(&col_map_it, &mclock->col_map,
+ sizeof(mclock->col_map), true);
+ for (size_t col_id = bit_iterator_next(&col_map_it); col_id < VCLOCK_MAX;
+ col_id = bit_iterator_next(&col_map_it)) {
+ for (uint32_t n = 0; n < count - 1; ++n)
+ if (mclock_get_pos_lsn(mclock, col_id, n) <
+ mclock_get_pos_lsn(mclock, col_id, n + 1))
+ return false;
+ }
+ return true;
+}
diff --git a/src/box/mclock.h b/src/box/mclock.h
new file mode 100644
index 000000000..fb734a6d5
--- /dev/null
+++ b/src/box/mclock.h
@@ -0,0 +1,151 @@
+#ifndef INCLUDES_TARANTOOL_MCLOCK_H
+#define INCLUDES_TARANTOOL_MCLOCK_H
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <stdlib.h>
+
+#include "vclock.h"
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/**
+ * The matrix clock structure contains vclocks as seen by
+ * cluster members.
+ */
+struct mclock {
+ /** A bit map containing row ids present in the mclock. */
+ uint32_t row_map;
+ /**
+ * A bit map ofd component ids set in any of the
+ * contained vclocks.
+ */
+ uint32_t col_map;
+ /**
+ * An array of vclocks indexed by replica ids.
+ */
+ struct vclock vclock[VCLOCK_MAX];
+ /**
+ * Per column ordered map. Each row describes an ordered
+ * map of attached replica identifiers where the biggest
+ * lsn is on the first position. In case of sequence of
+ * the equal lsns the latest is on the last position in
+ * the sequence.
+ * For instance, if we have the following vclocks:
+ * 1: {1: 10, 2: 12, 3: 0}
+ * 2: {1: 10, 2: 14, 3: 1}
+ * 3: {1: 0, 2: 8, 3: 4}
+ * The order map will look as follows:
+ * {{1, 2, 3}, {2, 1, 3}, {3, 2, 1}}
+ */
+ uint8_t order[VCLOCK_MAX][VCLOCK_MAX];
+};
+
+/** Create a mclock structure. */
+void
+mclock_create(struct mclock *mclock);
+
+/** Reset mclock. */
+void
+mclock_destroy(struct mclock *mclock);
+
+/**
+ * Update a vclock identified by replica id and
+ * sort mclock members.
+ */
+void
+mclock_update(struct mclock *mclock, uint32_t id, const struct vclock *vclock);
+
+/**
+ * Build a vclock each component of which is less than or equal
+ * to offset + 1 (or count + offset + 1 if offset < 0)
+ * corresponding components of contained vclocks.
+ * So mclock_get(mclock, 0) selects the biggest lsns for each
+ * column and mclock_get(-1) will select the smallest lsns for
+ * each column.
+ * For instance if we have mclock with the following vclocks:
+ * 1: {1: 10, 2: 12, 3: 0}
+ * 2: {1: 10, 2: 14, 3: 1}
+ * 3: {1: 0, 2: 8, 3: 4}
+ * mclock_get(0) will build vclock {1: 10, 2: 14, 3: 4}
+ * whereas mclock_get(2) build {1: 0, 2: 8, 3: 0}
+ *
+ * @param mclock Matrix clock.
+ * @param offset The count of components which are greater than
+ * the output vclock.
+ *
+ * @return A vclock whose each component is less than or equal to
+ * offset + 1 other vclocks' components.
+ */
+int
+mclock_get(struct mclock *mclock, int32_t offset, struct vclock *vclock);
+
+/**
+ * Get a vclock corresponding to the given replica id.
+ *
+ * @param mclock Matrix clock.
+ * @param id Replica id.
+ * @param[out] vclock Found vclock.
+ *
+ * @return 0 vclock was found
+ * -1 vclock corresponding to \a id isn't in mclock.
+ */
+int
+mclock_get_row(struct mclock *mclock, uint32_t id, struct vclock *vclock);
+
+/**
+ * Fetch a column from a matrix clock. Such column describes
+ * the id replica lsn visible by cluster members.
+ *
+ * Get a column from the matrix clock. The column contains the
+ * vclock components corresponding to the given id as seen by
+ * cluster members.
+ *
+ * @param mclock Matrix clock.
+ * @param id Replica id indicating vclock component to fetch.
+ * @param[out] vclock Found column.
+ *
+ * @return 0 the column was found
+ * -1 the corresponding column isn't in mclock.
+ */
+int
+mclock_get_col(struct mclock *mclock, uint32_t id, struct vclock *vclock);
+
+/** Check the mclock integrity. */
+bool
+mclock_check(struct mclock *mclock);
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* INCLUDES_TARANTOOL_MCLOCK_H */
diff --git a/src/box/wal.c b/src/box/wal.c
index 1668c9348..ecbe0919e 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -1429,7 +1429,6 @@ wal_notify_watchers(struct wal_writer *writer, unsigned events)
wal_watcher_notify(watcher, events);
}
-
/**
* After fork, the WAL writer thread disappears.
* Make sure that atexit() handlers in the child do
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 4ac08de8d..dd3e5f0c3 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -65,6 +65,8 @@ add_executable(bloom.test bloom.cc)
target_link_libraries(bloom.test salad)
add_executable(vclock.test vclock.cc)
target_link_libraries(vclock.test vclock unit)
+add_executable(mclock.test mclock.test.c)
+target_link_libraries(mclock.test mclock vclock bit unit)
add_executable(xrow.test xrow.cc)
target_link_libraries(xrow.test xrow unit)
add_executable(decimal.test decimal.c)
diff --git a/test/unit/mclock.result b/test/unit/mclock.result
new file mode 100644
index 000000000..eb3aa649d
--- /dev/null
+++ b/test/unit/mclock.result
@@ -0,0 +1,18 @@
+1..2
+ 1..1
+ *** test_random_stress ***
+ ok 1 - random stress
+ *** test_random_stress: done ***
+ok 1 - subtests
+ 1..8
+ *** test_func ***
+ ok 1 - consistency 1
+ ok 2 - first vclock 1
+ ok 3 - last vclock 1
+ ok 4 - second vclock
+ ok 5 - consistency 2
+ ok 6 - consistency 3
+ ok 7 - first vclock - 2
+ ok 8 - last vclock - 2
+ *** test_func: done ***
+ok 2 - subtests
diff --git a/test/unit/mclock.test.c b/test/unit/mclock.test.c
new file mode 100644
index 000000000..675cbeee3
--- /dev/null
+++ b/test/unit/mclock.test.c
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "unit.h"
+#include <stdarg.h>
+
+#include "box/mclock.h"
+
+static void
+test_random_stress()
+{
+ srand(time(NULL));
+ plan(1);
+ header();
+ struct mclock mclock;
+ mclock_create(&mclock);
+ bool ok = true;
+ for (int i = 0; i < 50000; ++i) {
+ struct vclock vclock;
+ vclock_create(&vclock);
+ uint32_t id = rand() % 31 + 1;
+ /* Count of non-zero items. */
+ int tm = rand() % 31;
+ for (int t = 0; t < tm;) {
+ uint32_t j = rand() % 31 + 1;
+ if (vclock_get(&vclock, j) > 0)
+ continue;
+ vclock_follow(&vclock, j, rand() + 1);
+ ++t;
+ }
+ mclock_update(&mclock, id, &vclock);
+ if (!(ok = mclock_check(&mclock)))
+ break;
+ }
+ struct vclock vclock;
+ vclock_create(&vclock);
+ for (int i = 1; i < 32; ++i)
+ mclock_update(&mclock, i, &vclock);
+ mclock_destroy(&mclock);
+ is(ok, true, "random stress");
+ footer();
+ check_plan();
+}
+
+static void
+test_func()
+{
+ plan(8);
+ header();
+ struct mclock mclock;
+ mclock_create(&mclock);
+ struct vclock v1, v2, v3;
+ vclock_create(&v1);
+ vclock_follow(&v1, 1, 11);
+ vclock_follow(&v1, 2, 21);
+ vclock_follow(&v1, 3, 31);
+ vclock_create(&v2);
+ vclock_follow(&v2, 1, 22);
+ vclock_follow(&v2, 2, 12);
+ vclock_follow(&v2, 3, 30);
+ vclock_create(&v3);
+ vclock_follow(&v3, 2, 32);
+ vclock_follow(&v3, 3, 2);
+ vclock_follow(&v3, 4, 5);
+ mclock_update(&mclock, 1, &v1);
+ mclock_update(&mclock, 2, &v2);
+ mclock_update(&mclock, 3, &v3);
+ is(mclock_check(&mclock), true, "consistency 1");
+
+ struct vclock v, t;
+ vclock_create(&t);
+ vclock_follow(&t, 1, 22);
+ vclock_follow(&t, 2, 32);
+ vclock_follow(&t, 3, 31);
+ vclock_follow(&t, 4, 5);
+
+ mclock_get(&mclock, 0, &v);
+ is(vclock_compare(&v, &t), 0, "first vclock 1");
+
+ vclock_create(&t);
+ vclock_follow(&t, 2, 12);
+ vclock_follow(&t, 3, 2);
+
+ mclock_get(&mclock, -1, &v);
+ is(vclock_compare(&v, &t), 0, "last vclock 1");
+
+ vclock_create(&t);
+ vclock_follow(&t, 1, 11);
+ vclock_follow(&t, 2, 21);
+ vclock_follow(&t, 3, 30);
+
+ mclock_get(&mclock, 1, &v);
+ is(vclock_compare(&v, &t), 0, "second vclock");
+
+ vclock_follow(&v1, 1, 40);
+ vclock_follow(&v1, 4, 10);
+ mclock_update(&mclock, 1, &v1);
+ is(mclock_check(&mclock), true, "consistency 2");
+ vclock_follow(&v2, 2, 35);
+ vclock_follow(&v2, 4, 3);
+ mclock_update(&mclock, 2, &v2);
+ is(mclock_check(&mclock), true, "consistency 3");
+
+ vclock_create(&t);
+ vclock_follow(&t, 1, 40);
+ vclock_follow(&t, 2, 35);
+ vclock_follow(&t, 3, 31);
+ vclock_follow(&t, 4, 10);
+
+ mclock_get(&mclock, 0, &v);
+ is(vclock_compare(&v, &t), 0, "first vclock - 2");
+
+ vclock_create(&t);
+ vclock_follow(&t, 2, 21);
+ vclock_follow(&t, 3, 2);
+ vclock_follow(&t, 4, 3);
+
+ mclock_get(&mclock, -1, &v);
+ is(vclock_compare(&v, &t), 0, "last vclock - 2");
+
+ footer();
+ check_plan();
+
+}
+
+int main(void)
+{
+ plan(2);
+ test_random_stress();
+ test_func();
+ check_plan();
+}
+
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 1/5] box: introduce matrix clock
2020-03-18 19:47 ` [Tarantool-patches] [PATCH v2 1/5] box: introduce matrix clock Serge Petrenko
@ 2020-03-18 20:08 ` Konstantin Osipov
2020-03-19 8:11 ` Timur Safin
0 siblings, 1 reply; 17+ messages in thread
From: Konstantin Osipov @ 2020-03-18 20:08 UTC (permalink / raw)
To: Serge Petrenko; +Cc: v.shpilevoy, tarantool-patches
* Serge Petrenko <sergepetrenko@tarantool.org> [20/03/18 22:52]:
> A matrix clock which allows to maintain a set of vclocks and
> their components order. The main target is to be able to
> build a vclock which contains lsns each one is less or equal
> that n corresponding lsn from a matrix clock.
>
> The purpose of the matrix clock is to evaluate a vclock
> which is already processed by wal consumers like relays
> or to obtain a majority vclock to commit journal entries
> in case of synchronous replication.
>
> @sergepetrenko: refactoring & rewrite comments to doxygen style.
I think we have discussed that matrix clock should not be pushed.
It's a huge over complication.
Why are you committing this?
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 1/5] box: introduce matrix clock
2020-03-18 20:08 ` Konstantin Osipov
@ 2020-03-19 8:11 ` Timur Safin
2020-03-19 8:41 ` 'Konstantin Osipov'
0 siblings, 1 reply; 17+ messages in thread
From: Timur Safin @ 2020-03-19 8:11 UTC (permalink / raw)
To: 'Konstantin Osipov', 'Serge Petrenko'
Cc: tarantool-patches, v.shpilevoy
: * Serge Petrenko <sergepetrenko@tarantool.org> [20/03/18 22:52]:
:
: > A matrix clock which allows to maintain a set of vclocks and
: > their components order. The main target is to be able to
: > build a vclock which contains lsns each one is less or equal
: > that n corresponding lsn from a matrix clock.
: >
: > The purpose of the matrix clock is to evaluate a vclock
: > which is already processed by wal consumers like relays
: > or to obtain a majority vclock to commit journal entries
: > in case of synchronous replication.
: >
: > @sergepetrenko: refactoring & rewrite comments to doxygen style.
:
: I think we have discussed that matrix clock should not be pushed.
:
: It's a huge over complication.
:
: Why are you committing this?
:
: --
: Konstantin Osipov, Moscow, Russia
That's the very good question. There is smell of some miscommunication
between parties involved, hopefully we will resolve it soon.
Last time we gathered to discuss sync replications, the consensus was
That we do not want matrix clock as they overcomplicate conflict resolution process (so, at least, it was not looking like prerequisite to sync
replications mechanism).
Serge, if I miss some important detains here, I'd love to get corrected
here. I do feel there are some other reasons needed, which I probably
simply not aware of.
Thanks,
Timur
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 1/5] box: introduce matrix clock
2020-03-19 8:11 ` Timur Safin
@ 2020-03-19 8:41 ` 'Konstantin Osipov'
2020-03-19 9:17 ` Sergey Ostanevich
0 siblings, 1 reply; 17+ messages in thread
From: 'Konstantin Osipov' @ 2020-03-19 8:41 UTC (permalink / raw)
To: Timur Safin; +Cc: tarantool-patches, v.shpilevoy
* Timur Safin <tsafin@tarantool.org> [20/03/19 11:11]:
> : > A matrix clock which allows to maintain a set of vclocks and
> : > their components order. The main target is to be able to
> : > build a vclock which contains lsns each one is less or equal
> : > that n corresponding lsn from a matrix clock.
> : >
> : > The purpose of the matrix clock is to evaluate a vclock
> : > which is already processed by wal consumers like relays
> : > or to obtain a majority vclock to commit journal entries
> : > in case of synchronous replication.
> : >
> : > @sergepetrenko: refactoring & rewrite comments to doxygen style.
> :
> : I think we have discussed that matrix clock should not be pushed.
> :
> : It's a huge over complication.
> :
> : Why are you committing this?
> :
> : --
> : Konstantin Osipov, Moscow, Russia
>
> That's the very good question. There is smell of some miscommunication
> between parties involved, hopefully we will resolve it soon.
> Last time we gathered to discuss sync replications, the consensus was
> That we do not want matrix clock as they overcomplicate conflict resolution process (so, at least, it was not looking like prerequisite to sync
> replications mechanism).
George should clarify, but AFAIU his original design introduced
matrix clock to GC and to sync replication. These series only
touch the GC. George reported there was an issue with the current
GC tracker, basically it becomes non-function when sync
replication is in place -I don't know what the issue is.
I'd love to discuss the problem first, and then see alternatives.
The thing is, I'd like our vclock to become sparse one day and be
able to contain thousands of entries. We could use a dynamic data structure
which changes the underlying structure depending on the actual
member count.
To get there and stay efficient, we need to make sure we never
copy entire vclock by value, and instead begin passing objects
representing a "vclock diff" around. Maintaining a sparse matrix would be
hell in this case.
> Serge, if I miss some important detains here, I'd love to get corrected
> here. I do feel there are some other reasons needed, which I probably
> simply not aware of.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 1/5] box: introduce matrix clock
2020-03-19 8:41 ` 'Konstantin Osipov'
@ 2020-03-19 9:17 ` Sergey Ostanevich
2020-03-19 11:28 ` Serge Petrenko
0 siblings, 1 reply; 17+ messages in thread
From: Sergey Ostanevich @ 2020-03-19 9:17 UTC (permalink / raw)
To: 'Konstantin Osipov',
Timur Safin, 'Serge Petrenko',
v.shpilevoy, tarantool-patches, Kirill Yukhin
On 19 мар 11:41, 'Konstantin Osipov' wrote:
> * Timur Safin <tsafin@tarantool.org> [20/03/19 11:11]:
> > : > A matrix clock which allows to maintain a set of vclocks and
> > : > their components order. The main target is to be able to
> > : > build a vclock which contains lsns each one is less or equal
> > : > that n corresponding lsn from a matrix clock.
> > : >
> > : > The purpose of the matrix clock is to evaluate a vclock
> > : > which is already processed by wal consumers like relays
> > : > or to obtain a majority vclock to commit journal entries
> > : > in case of synchronous replication.
> > : >
> > : > @sergepetrenko: refactoring & rewrite comments to doxygen style.
> > :
> > : I think we have discussed that matrix clock should not be pushed.
> > :
> > : It's a huge over complication.
> > :
> > : Why are you committing this?
> > :
> > : --
> > : Konstantin Osipov, Moscow, Russia
> >
> > That's the very good question. There is smell of some miscommunication
> > between parties involved, hopefully we will resolve it soon.
> > Last time we gathered to discuss sync replications, the consensus was
> > That we do not want matrix clock as they overcomplicate conflict resolution process (so, at least, it was not looking like prerequisite to sync
> > replications mechanism).
>
> George should clarify, but AFAIU his original design introduced
> matrix clock to GC and to sync replication. These series only
> touch the GC. George reported there was an issue with the current
> GC tracker, basically it becomes non-function when sync
> replication is in place -I don't know what the issue is.
>
> I'd love to discuss the problem first, and then see alternatives.
>
> The thing is, I'd like our vclock to become sparse one day and be
> able to contain thousands of entries. We could use a dynamic data structure
> which changes the underlying structure depending on the actual
> member count.
> To get there and stay efficient, we need to make sure we never
> copy entire vclock by value, and instead begin passing objects
> representing a "vclock diff" around. Maintaining a sparse matrix would be
> hell in this case.
>
> > Serge, if I miss some important detains here, I'd love to get corrected
> > here. I do feel there are some other reasons needed, which I probably
> > simply not aware of.
The discussion was with MRG users of Tarantool and their point was: they
are facing problems with consistent state restore and root cause
analysis in case of Tarantool failure.
It's a huge burden for them to debug and fix their applications even with
vector clock, while introduction of matrix one will make this task
impossible.
>
> --
> Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 1/5] box: introduce matrix clock
2020-03-19 9:17 ` Sergey Ostanevich
@ 2020-03-19 11:28 ` Serge Petrenko
2020-03-19 11:56 ` Konstantin Osipov
0 siblings, 1 reply; 17+ messages in thread
From: Serge Petrenko @ 2020-03-19 11:28 UTC (permalink / raw)
To: Sergey Ostanevich, Timur Safin, Konstantin Osipov
Cc: kirichenkoga, tarantool-patches, Vladislav Shpilevoy
[-- Attachment #1: Type: text/plain, Size: 7565 bytes --]
> 19 марта 2020 г., в 12:17, Sergey Ostanevich <sergos@tarantool.org> написал(а):
>
> On 19 мар 11:41, 'Konstantin Osipov' wrote:
>> * Timur Safin <tsafin@tarantool.org> [20/03/19 11:11]:
>>> : > A matrix clock which allows to maintain a set of vclocks and
>>> : > their components order. The main target is to be able to
>>> : > build a vclock which contains lsns each one is less or equal
>>> : > that n corresponding lsn from a matrix clock.
>>> : >
>>> : > The purpose of the matrix clock is to evaluate a vclock
>>> : > which is already processed by wal consumers like relays
>>> : > or to obtain a majority vclock to commit journal entries
>>> : > in case of synchronous replication.
>>> : >
>>> : > @sergepetrenko: refactoring & rewrite comments to doxygen style.
>>> :
>>> : I think we have discussed that matrix clock should not be pushed.
>>> :
>>> : It's a huge over complication.
>>> :
>>> : Why are you committing this?
>>> :
>>> : --
>>> : Konstantin Osipov, Moscow, Russia
>>>
>>> That's the very good question. There is smell of some miscommunication
>>> between parties involved, hopefully we will resolve it soon.
>>> Last time we gathered to discuss sync replications, the consensus was
>>> That we do not want matrix clock as they overcomplicate conflict resolution process (so, at least, it was not looking like prerequisite to sync
>>> replications mechanism).
Hi, Timur. It’s not about conflict resolution or anything user-visible. I guess the structure naming
is misguiding here. Please see a more detailed answer below.
>>
>> George should clarify, but AFAIU his original design introduced
>> matrix clock to GC and to sync replication. These series only
>> touch the GC. George reported there was an issue with the current
>> GC tracker, basically it becomes non-function when sync
>> replication is in place -I don't know what the issue is.
>>
>> I'd love to discuss the problem first, and then see alternatives.
>>
>> The thing is, I'd like our vclock to become sparse one day and be
>> able to contain thousands of entries. We could use a dynamic data structure
>> which changes the underlying structure depending on the actual
>> member count.
>> To get there and stay efficient, we need to make sure we never
>> copy entire vclock by value, and instead begin passing objects
>> representing a "vclock diff" around. Maintaining a sparse matrix would be
>> hell in this case.
>>
>>> Serge, if I miss some important detains here, I'd love to get corrected
>>> here. I do feel there are some other reasons needed, which I probably
>>> simply not aware of.
>
> The discussion was with MRG users of Tarantool and their point was: they
> are facing problems with consistent state restore and root cause
> analysis in case of Tarantool failure.
> It's a huge burden for them to debug and fix their applications even with
> vector clock, while introduction of matrix one will make this task
> impossible.
The patches are not making it harder for the customers. The ‘matrix clock’ is just an
in-memory structure, which cannot be accessed by the database user.
It is used to build the minimal possible clock, which is still needed by the replicas.
We may rename it to whatever you wish, just like Georgy said. Let it be vclock sorted set
(although we already have a vclock set), or something else, like vclock sorter, whatever.
This structure is not persisted anywhere and no one sees it.
We do need to rework our GC, and Georgy’s patches regarding matrix clock perform exactly
this task.
First of all, let me describe the issue with the current WAL GC:
it tracks consumers (i.e. remote replicas) by their vclock signature,
which is the sum of all vclock components.
It has always been wrong (since the introduction of vclocks, at least):
Say, you have 2 masters, A and B with ids 1 and 2 respectively, and a replica C with id 3.
The example will be a little synthetic, but it illustrates the problem:
Say С replicates from both A and B, and there is no replication between A and B (say, the
instances were reconfigured to not replicate from each other).
Now, say replica C has followed A and B to vclock {1:5, 2:13}. At the same time, A has lsn 10
and B has lsn 15. A and B do not know about each other’s changes, so A’s vclock is {1:10} and
B’s vclock is {2:15}. Now imagine A does a snapshot and creates a new xlog with signature 10.
A’s directory will look like: 00…000.xlog 00…010.snap 00….010.xlog
Replica C reports its vclock {1:5, 2:13} to A, A uses the vclock to update the corresponding GC
consumer. Since signatures are used, GC consumer is assigned a signature = 13 + 5 = 18.
This is greater than the signature of the last xlog on A (10), so the previous xlog (00…00.xlog) can be
deleted (at least A assumes it can be). Actually, replica still needs 00…00.xlog, because it contains
rows corresponding to vclocks {1:6} - {1:10}, which haven’t been replicated yet.
If instead of using vclock signatures, gc consumers used vclocks, such a problem wouldn’t arise.
Replia would report its vclock {1:5, 2:13}. The vclock is NOT strictly greater than A’s most recent
xlog vclock ({1:10}), so the previous log is kept until replica reports a vclock {1:10, 2:something}.
(or {1:11, …} and so on).
Local space rework performed in this patchset in scope of the issue #4114 makes the problem even
more obvious: I want to use 0-th vclock component to track local space changes.
Now master has it’s own 0-th vclock component, and replica has its own one.
If I used the replica clock «as is» with current GC consumer, it wouldn’t work at all: see example 1 below.
If I substituted the replica clock[0] with masters clock[0] it also wouldn’t work, see example 2 below.
Example 1:
master vclock {1:10}, replica vclock {0:5, 1:5}. Replica still needs rows in range {1:5} - {1:10}, but its vclock
signature is the same as master’s one, so master deletes logs needed by replica.
Example 2:
master vclock {0:100500, 1:10}, replica vclock {0:1, 1:5}.
master substitutes replica’s 0 vclock component, so it becomes {0:100500, 1:5}. Now replica vclock
signature is 100505, which is much greater than signature 10 of the xlog containing rows in range
{1:1} - {1:10} and especially the range still needed by replica: {1:6} - {1:10}.
It might seem than ignoring 0 vclock component is an option, but it’s not: we may ignore 0-th vclock components
for GC consumers corresponding to remote replicas, but local instance checkpoints(snapshots) are also tracked
together with GC consumers, and 0 vclock component cannot be ignored for them.
OK, so now it’s clear that we need to use vclocks to track GC consumers. How do we find the minimal vclock
that is still needed? We need the biggest vclock, that is less than vclock of every GC consumer. This vclock
can be built using the matrix clock Georgy introduced. That’s it. Matrix clock works as follows:
say you have consumer clocks: {1:5, 2:10, 3:15}, {1:10, 2:6, 3:10} and {1:10, 2:15, 3:9}. The matrix clock will
build a vclock {1:5, 2:6, 3:9}. This vclock holds the minimum component per lsn that is still needed.
Now all the logs with vclocks strictly less, than the built one, may be deleted.
I hope my explanation answers most of the questions. Feel free to ask.
--
Serge Petrenko
sergepetrenko@tarantool.org
>
>>
>> --
>> Konstantin Osipov, Moscow, Russia
[-- Attachment #2: Type: text/html, Size: 17170 bytes --]
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 1/5] box: introduce matrix clock
2020-03-19 11:28 ` Serge Petrenko
@ 2020-03-19 11:56 ` Konstantin Osipov
2020-03-19 11:59 ` Serge Petrenko
0 siblings, 1 reply; 17+ messages in thread
From: Konstantin Osipov @ 2020-03-19 11:56 UTC (permalink / raw)
To: Serge Petrenko; +Cc: tarantool-patches, Vladislav Shpilevoy, kirichenkoga
* Serge Petrenko <sergepetrenko@tarantool.org> [20/03/19 14:29]:
> First of all, let me describe the issue with the current WAL GC:
> it tracks consumers (i.e. remote replicas) by their vclock signature,
> which is the sum of all vclock components.
> It has always been wrong (since the introduction of vclocks, at least):
> Say, you have 2 masters, A and B with ids 1 and 2 respectively, and a replica C with id 3.
> The example will be a little synthetic, but it illustrates the problem:
> Say С replicates from both A and B, and there is no replication between A and B (say, the
> instances were reconfigured to not replicate from each other).
> Now, say replica C has followed A and B to vclock {1:5, 2:13}. At the same time, A has lsn 10
> and B has lsn 15. A and B do not know about each other’s changes, so A’s vclock is {1:10} and
> B’s vclock is {2:15}. Now imagine A does a snapshot and creates a new xlog with signature 10.
> A’s directory will look like: 00…000.xlog 00…010.snap 00….010.xlog
> Replica C reports its vclock {1:5, 2:13} to A, A uses the vclock to update the corresponding GC
> consumer. Since signatures are used, GC consumer is assigned a signature = 13 + 5 = 18.
> This is greater than the signature of the last xlog on A (10), so the previous xlog (00…00.xlog) can be
> deleted (at least A assumes it can be). Actually, replica still needs 00…00.xlog, because it contains
> rows corresponding to vclocks {1:6} - {1:10}, which haven’t been replicated yet.
>
> If instead of using vclock signatures, gc consumers used vclocks, such a problem wouldn’t arise.
> Replia would report its vclock {1:5, 2:13}. The vclock is NOT strictly greater than A’s most recent
> xlog vclock ({1:10}), so the previous log is kept until replica reports a vclock {1:10, 2:something}.
> (or {1:11, …} and so on).
This explanation belongs to the commit comment.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 1/5] box: introduce matrix clock
2020-03-19 11:56 ` Konstantin Osipov
@ 2020-03-19 11:59 ` Serge Petrenko
0 siblings, 0 replies; 17+ messages in thread
From: Serge Petrenko @ 2020-03-19 11:59 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches, Vladislav Shpilevoy, kirichenkoga
> 19 марта 2020 г., в 14:56, Konstantin Osipov <kostja.osipov@gmail.com> написал(а):
>
> * Serge Petrenko <sergepetrenko@tarantool.org> [20/03/19 14:29]:
>> First of all, let me describe the issue with the current WAL GC:
>> it tracks consumers (i.e. remote replicas) by their vclock signature,
>> which is the sum of all vclock components.
>> It has always been wrong (since the introduction of vclocks, at least):
>> Say, you have 2 masters, A and B with ids 1 and 2 respectively, and a replica C with id 3.
>> The example will be a little synthetic, but it illustrates the problem:
>> Say С replicates from both A and B, and there is no replication between A and B (say, the
>> instances were reconfigured to not replicate from each other).
>> Now, say replica C has followed A and B to vclock {1:5, 2:13}. At the same time, A has lsn 10
>> and B has lsn 15. A and B do not know about each other’s changes, so A’s vclock is {1:10} and
>> B’s vclock is {2:15}. Now imagine A does a snapshot and creates a new xlog with signature 10.
>> A’s directory will look like: 00…000.xlog 00…010.snap 00….010.xlog
>> Replica C reports its vclock {1:5, 2:13} to A, A uses the vclock to update the corresponding GC
>> consumer. Since signatures are used, GC consumer is assigned a signature = 13 + 5 = 18.
>> This is greater than the signature of the last xlog on A (10), so the previous xlog (00…00.xlog) can be
>> deleted (at least A assumes it can be). Actually, replica still needs 00…00.xlog, because it contains
>> rows corresponding to vclocks {1:6} - {1:10}, which haven’t been replicated yet.
>>
>> If instead of using vclock signatures, gc consumers used vclocks, such a problem wouldn’t arise.
>> Replia would report its vclock {1:5, 2:13}. The vclock is NOT strictly greater than A’s most recent
>> xlog vclock ({1:10}), so the previous log is kept until replica reports a vclock {1:10, 2:something}.
>> (or {1:11, …} and so on).
>
> This explanation belongs to the commit comment.
>
Ok, will amend.
>
>
> --
> Konstantin Osipov, Moscow, Russia
--
Serge Petrenko
sergepetrenko@tarantool.org
^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v2 2/5] wal: track consumer vclock and collect logs in wal thread
2020-03-18 19:47 [Tarantool-patches] [PATCH v2 0/5] replication: fix local space tracking Serge Petrenko
2020-03-18 19:47 ` [Tarantool-patches] [PATCH v2 1/5] box: introduce matrix clock Serge Petrenko
@ 2020-03-18 19:47 ` Serge Petrenko
2020-03-18 19:47 ` [Tarantool-patches] [PATCH v2 3/5] vclock: add an ability to set individual clock components Serge Petrenko
` (4 subsequent siblings)
6 siblings, 0 replies; 17+ messages in thread
From: Serge Petrenko @ 2020-03-18 19:47 UTC (permalink / raw)
To: v.shpilevoy, kostja.osipov, georgy; +Cc: tarantool-patches
From: Georgy Kirichenko <georgy@tarantool.org>
Wal uses a matrix clock (mclock) in order to track vclocks reported
by relay. This allows wal to build the minimal boundary vclock which
is used in order to collect unneeded files. Box protects logs
from collecting using wal_set_first_checkpoint() call.
In order to preserve logs while joining a replica, gc tracks all
join readview vclocks as checkpoints with a special mark -
is_join_readview set to true.
Also there is no more gc consumer in tx thread, gc consumer info in
box.info output and corresponding lines were commented from test out.
@sergepetrenko: reword some comments and do a bit of refactoring.
Part of #3794, #980
Prerequisite #4114
---
src/box/box.cc | 38 ++--
src/box/gc.c | 216 ++++++--------------
src/box/gc.h | 95 ++-------
src/box/lua/info.c | 29 +--
src/box/relay.cc | 133 +------------
src/box/replication.cc | 47 +----
src/box/replication.h | 3 -
src/box/wal.c | 271 +++++++++++++++++++++++---
src/box/wal.h | 17 +-
test/replication/gc_no_space.result | 30 +--
test/replication/gc_no_space.test.lua | 12 +-
11 files changed, 369 insertions(+), 522 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index a5052dba4..6e101a0be 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1585,11 +1585,12 @@ box_process_register(struct ev_io *io, struct xrow_header *header)
"wal_mode = 'none'");
}
- struct gc_consumer *gc = gc_consumer_register(&replicaset.vclock,
- "replica %s", tt_uuid_str(&instance_uuid));
- if (gc == NULL)
- diag_raise();
- auto gc_guard = make_scoped_guard([&] { gc_consumer_unregister(gc); });
+ struct vclock register_vclock;
+ vclock_copy(®ister_vclock, &replicaset.vclock);
+ gc_add_join_readview(®ister_vclock);
+ auto gc_guard = make_scoped_guard([&] {
+ gc_del_join_readview(®ister_vclock);
+ });
say_info("registering replica %s at %s",
tt_uuid_str(&instance_uuid), sio_socketname(io->fd));
@@ -1628,12 +1629,8 @@ box_process_register(struct ev_io *io, struct xrow_header *header)
* registration was complete and assign it to the
* replica.
*/
- gc_consumer_advance(gc, &stop_vclock);
replica = replica_by_uuid(&instance_uuid);
- if (replica->gc != NULL)
- gc_consumer_unregister(replica->gc);
- replica->gc = gc;
- gc_guard.is_active = false;
+ wal_relay_status_update(replica->id, &stop_vclock);
}
void
@@ -1727,11 +1724,12 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
* Register the replica as a WAL consumer so that
* it can resume FINAL JOIN where INITIAL JOIN ends.
*/
- struct gc_consumer *gc = gc_consumer_register(&replicaset.vclock,
- "replica %s", tt_uuid_str(&instance_uuid));
- if (gc == NULL)
- diag_raise();
- auto gc_guard = make_scoped_guard([&] { gc_consumer_unregister(gc); });
+ struct vclock join_vclock;
+ vclock_copy(&join_vclock, &replicaset.vclock);
+ gc_add_join_readview(&join_vclock);
+ auto gc_guard = make_scoped_guard([&] {
+ gc_del_join_readview(&join_vclock);
+ });
say_info("joining replica %s at %s",
tt_uuid_str(&instance_uuid), sio_socketname(io->fd));
@@ -1775,16 +1773,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
row.sync = header->sync;
coio_write_xrow(io, &row);
- /*
- * Advance the WAL consumer state to the position where
- * FINAL JOIN ended and assign it to the replica.
- */
- gc_consumer_advance(gc, &stop_vclock);
replica = replica_by_uuid(&instance_uuid);
- if (replica->gc != NULL)
- gc_consumer_unregister(replica->gc);
- replica->gc = gc;
- gc_guard.is_active = false;
+ wal_relay_status_update(replica->id, &stop_vclock);
}
void
diff --git a/src/box/gc.c b/src/box/gc.c
index f5c387f9d..48ec2f529 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -65,35 +65,6 @@ gc_cleanup_fiber_f(va_list);
static int
gc_checkpoint_fiber_f(va_list);
-/**
- * Comparator used for ordering gc_consumer objects by signature
- * in a binary tree.
- */
-static inline int
-gc_consumer_cmp(const struct gc_consumer *a, const struct gc_consumer *b)
-{
- if (vclock_sum(&a->vclock) < vclock_sum(&b->vclock))
- return -1;
- if (vclock_sum(&a->vclock) > vclock_sum(&b->vclock))
- return 1;
- if ((intptr_t)a < (intptr_t)b)
- return -1;
- if ((intptr_t)a > (intptr_t)b)
- return 1;
- return 0;
-}
-
-rb_gen(MAYBE_UNUSED static inline, gc_tree_, gc_tree_t,
- struct gc_consumer, node, gc_consumer_cmp);
-
-/** Free a consumer object. */
-static void
-gc_consumer_delete(struct gc_consumer *consumer)
-{
- TRASH(consumer);
- free(consumer);
-}
-
/** Free a checkpoint object. */
static void
gc_checkpoint_delete(struct gc_checkpoint *checkpoint)
@@ -110,7 +81,6 @@ gc_init(void)
vclock_create(&gc.vclock);
rlist_create(&gc.checkpoints);
- gc_tree_new(&gc.consumers);
fiber_cond_create(&gc.cleanup_cond);
checkpoint_schedule_cfg(&gc.checkpoint_schedule, 0, 0);
engine_collect_garbage(&gc.vclock);
@@ -142,15 +112,6 @@ gc_free(void)
next_checkpoint) {
gc_checkpoint_delete(checkpoint);
}
- /* Free all registered consumers. */
- struct gc_consumer *consumer = gc_tree_first(&gc.consumers);
- while (consumer != NULL) {
- struct gc_consumer *next = gc_tree_next(&gc.consumers,
- consumer);
- gc_tree_remove(&gc.consumers, consumer);
- gc_consumer_delete(consumer);
- consumer = next;
- }
}
/**
@@ -161,7 +122,6 @@ gc_free(void)
static void
gc_run_cleanup(void)
{
- bool run_wal_gc = false;
bool run_engine_gc = false;
/*
@@ -170,10 +130,11 @@ gc_run_cleanup(void)
* checkpoints, plus we can't remove checkpoints that
* are still in use.
*/
- struct gc_checkpoint *checkpoint = NULL;
- while (true) {
- checkpoint = rlist_first_entry(&gc.checkpoints,
- struct gc_checkpoint, in_checkpoints);
+ struct gc_checkpoint *checkpoint = NULL, *tmp;
+ rlist_foreach_entry_safe(checkpoint, &gc.checkpoints,
+ in_checkpoints, tmp) {
+ if (checkpoint->is_join_readview)
+ continue;
if (gc.checkpoint_count <= gc.min_checkpoint_count)
break;
if (!rlist_empty(&checkpoint->refs))
@@ -187,23 +148,7 @@ gc_run_cleanup(void)
/* At least one checkpoint must always be available. */
assert(checkpoint != NULL);
- /*
- * Find the vclock of the oldest WAL row to keep.
- * Note, we must keep all WALs created after the
- * oldest checkpoint, even if no consumer needs them.
- */
- const struct vclock *vclock = (gc_tree_empty(&gc.consumers) ? NULL :
- &gc_tree_first(&gc.consumers)->vclock);
- if (vclock == NULL ||
- vclock_sum(vclock) > vclock_sum(&checkpoint->vclock))
- vclock = &checkpoint->vclock;
-
- if (vclock_sum(vclock) > vclock_sum(&gc.vclock)) {
- vclock_copy(&gc.vclock, vclock);
- run_wal_gc = true;
- }
-
- if (!run_engine_gc && !run_wal_gc)
+ if (!run_engine_gc)
return; /* nothing to do */
/*
@@ -219,10 +164,10 @@ gc_run_cleanup(void)
* we never remove the last checkpoint and the following
* WALs so this may only affect backup checkpoints.
*/
- if (run_engine_gc)
- engine_collect_garbage(&checkpoint->vclock);
- if (run_wal_gc)
- wal_collect_garbage(vclock);
+ engine_collect_garbage(&checkpoint->vclock);
+ checkpoint = rlist_first_entry(&gc.checkpoints,
+ struct gc_checkpoint, in_checkpoints);
+ wal_set_gc_first_vclock(&checkpoint->vclock);
}
static int
@@ -278,28 +223,10 @@ void
gc_advance(const struct vclock *vclock)
{
/*
- * In case of emergency ENOSPC, the WAL thread may delete
- * WAL files needed to restore from backup checkpoints,
- * which would be kept by the garbage collector otherwise.
- * Bring the garbage collector vclock up to date.
+ * Bring the garbage collector up to date with the oldest
+ * wal xlog file.
*/
vclock_copy(&gc.vclock, vclock);
-
- struct gc_consumer *consumer = gc_tree_first(&gc.consumers);
- while (consumer != NULL &&
- vclock_sum(&consumer->vclock) < vclock_sum(vclock)) {
- struct gc_consumer *next = gc_tree_next(&gc.consumers,
- consumer);
- assert(!consumer->is_inactive);
- consumer->is_inactive = true;
- gc_tree_remove(&gc.consumers, consumer);
-
- say_crit("deactivated WAL consumer %s at %s", consumer->name,
- vclock_to_string(&consumer->vclock));
-
- consumer = next;
- }
- gc_schedule_cleanup();
}
void
@@ -329,6 +256,10 @@ void
gc_add_checkpoint(const struct vclock *vclock)
{
struct gc_checkpoint *last_checkpoint = gc_last_checkpoint();
+ while (last_checkpoint != NULL && last_checkpoint->is_join_readview) {
+ last_checkpoint = rlist_prev_entry(last_checkpoint,
+ in_checkpoints);
+ }
if (last_checkpoint != NULL &&
vclock_sum(&last_checkpoint->vclock) == vclock_sum(vclock)) {
/*
@@ -351,6 +282,8 @@ gc_add_checkpoint(const struct vclock *vclock)
if (checkpoint == NULL)
panic("out of memory");
+ if (rlist_empty(&gc.checkpoints))
+ wal_set_gc_first_vclock(vclock);
rlist_create(&checkpoint->refs);
vclock_copy(&checkpoint->vclock, vclock);
rlist_add_tail_entry(&gc.checkpoints, checkpoint, in_checkpoints);
@@ -359,6 +292,47 @@ gc_add_checkpoint(const struct vclock *vclock)
gc_schedule_cleanup();
}
+void
+gc_add_join_readview(const struct vclock *vclock)
+{
+ struct gc_checkpoint *checkpoint = calloc(1, sizeof(*checkpoint));
+ /*
+ * It is not a fatal error if we couldn't register a
+ * readview.
+ */
+ if (checkpoint == NULL) {
+ say_error("GC: couldn't add a join readview reference");
+ return;
+ }
+ if (rlist_empty(&gc.checkpoints))
+ wal_set_gc_first_vclock(vclock);
+ checkpoint->is_join_readview = true;
+ rlist_create(&checkpoint->refs);
+ vclock_copy(&checkpoint->vclock, vclock);
+ rlist_add_tail_entry(&gc.checkpoints, checkpoint, in_checkpoints);
+}
+
+void
+gc_del_join_readview(const struct vclock *vclock)
+{
+ struct gc_checkpoint *checkpoint;
+ rlist_foreach_entry(checkpoint, &gc.checkpoints, in_checkpoints) {
+ if (!checkpoint->is_join_readview ||
+ vclock_compare(&checkpoint->vclock, vclock) != 0)
+ continue;
+ rlist_del(&checkpoint->in_checkpoints);
+ free(checkpoint);
+ checkpoint = rlist_first_entry(&gc.checkpoints,
+ struct gc_checkpoint,
+ in_checkpoints);
+ wal_set_gc_first_vclock(&checkpoint->vclock);
+ return;
+ }
+ /* A join readview was not found. */
+ say_error("GC: couldn't delete a join readview reference");
+}
+
+
static int
gc_do_checkpoint(void)
{
@@ -513,75 +487,3 @@ gc_unref_checkpoint(struct gc_checkpoint_ref *ref)
rlist_del_entry(ref, in_refs);
gc_schedule_cleanup();
}
-
-struct gc_consumer *
-gc_consumer_register(const struct vclock *vclock, const char *format, ...)
-{
- struct gc_consumer *consumer = calloc(1, sizeof(*consumer));
- if (consumer == NULL) {
- diag_set(OutOfMemory, sizeof(*consumer),
- "malloc", "struct gc_consumer");
- return NULL;
- }
-
- va_list ap;
- va_start(ap, format);
- vsnprintf(consumer->name, GC_NAME_MAX, format, ap);
- va_end(ap);
-
- vclock_copy(&consumer->vclock, vclock);
- gc_tree_insert(&gc.consumers, consumer);
- return consumer;
-}
-
-void
-gc_consumer_unregister(struct gc_consumer *consumer)
-{
- if (!consumer->is_inactive) {
- gc_tree_remove(&gc.consumers, consumer);
- gc_schedule_cleanup();
- }
- gc_consumer_delete(consumer);
-}
-
-void
-gc_consumer_advance(struct gc_consumer *consumer, const struct vclock *vclock)
-{
- if (consumer->is_inactive)
- return;
-
- int64_t signature = vclock_sum(vclock);
- int64_t prev_signature = vclock_sum(&consumer->vclock);
-
- assert(signature >= prev_signature);
- if (signature == prev_signature)
- return; /* nothing to do */
-
- /*
- * Do not update the tree unless the tree invariant
- * is violated.
- */
- struct gc_consumer *next = gc_tree_next(&gc.consumers, consumer);
- bool update_tree = (next != NULL &&
- signature >= vclock_sum(&next->vclock));
-
- if (update_tree)
- gc_tree_remove(&gc.consumers, consumer);
-
- vclock_copy(&consumer->vclock, vclock);
-
- if (update_tree)
- gc_tree_insert(&gc.consumers, consumer);
-
- gc_schedule_cleanup();
-}
-
-struct gc_consumer *
-gc_consumer_iterator_next(struct gc_consumer_iterator *it)
-{
- if (it->curr != NULL)
- it->curr = gc_tree_next(&gc.consumers, it->curr);
- else
- it->curr = gc_tree_first(&gc.consumers);
- return it->curr;
-}
diff --git a/src/box/gc.h b/src/box/gc.h
index 827a5db8e..b34fee0fe 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -45,12 +45,9 @@ extern "C" {
#endif /* defined(__cplusplus) */
struct fiber;
-struct gc_consumer;
enum { GC_NAME_MAX = 64 };
-typedef rb_node(struct gc_consumer) gc_node_t;
-
/**
* Garbage collector keeps track of all preserved checkpoints.
* The following structure represents a checkpoint.
@@ -60,6 +57,8 @@ struct gc_checkpoint {
struct rlist in_checkpoints;
/** VClock of the checkpoint. */
struct vclock vclock;
+ /** True when it is a join readview. */
+ bool is_join_readview;
/**
* List of checkpoint references, linked by
* gc_checkpoint_ref::in_refs.
@@ -81,26 +80,6 @@ struct gc_checkpoint_ref {
char name[GC_NAME_MAX];
};
-/**
- * The object of this type is used to prevent garbage
- * collection from removing WALs that are still in use.
- */
-struct gc_consumer {
- /** Link in gc_state::consumers. */
- gc_node_t node;
- /** Human-readable name. */
- char name[GC_NAME_MAX];
- /** The vclock tracked by this consumer. */
- struct vclock vclock;
- /**
- * This flag is set if a WAL needed by this consumer was
- * deleted by the WAL thread on ENOSPC.
- */
- bool is_inactive;
-};
-
-typedef rb_tree(struct gc_consumer) gc_tree_t;
-
/** Garbage collection state. */
struct gc_state {
/** VClock of the oldest WAL row available on the instance. */
@@ -121,8 +100,6 @@ struct gc_state {
* to the tail. Linked by gc_checkpoint::in_checkpoints.
*/
struct rlist checkpoints;
- /** Registered consumers, linked by gc_consumer::node. */
- gc_tree_t consumers;
/** Fiber responsible for periodic checkpointing. */
struct fiber *checkpoint_fiber;
/** Schedule of periodic checkpoints. */
@@ -208,7 +185,6 @@ gc_free(void);
/**
* Advance the garbage collector vclock to the given position.
- * Deactivate WAL consumers that need older data.
*/
void
gc_advance(const struct vclock *vclock);
@@ -219,7 +195,7 @@ gc_advance(const struct vclock *vclock);
*
* Note, this function doesn't run garbage collector so
* changes will take effect only after a new checkpoint
- * is created or a consumer is unregistered.
+ * is created.
*/
void
gc_set_min_checkpoint_count(int min_checkpoint_count);
@@ -239,6 +215,19 @@ gc_set_checkpoint_interval(double interval);
void
gc_add_checkpoint(const struct vclock *vclock);
+/**
+ * Register a join readview in the garbage collector state in order
+ * to prevent subsequent logs deletion.
+ */
+void
+gc_add_join_readview(const struct vclock *vclock);
+
+/**
+ * Unregister a join readview from the garbage collector state.
+ */
+void
+gc_del_join_readview(const struct vclock *vclock);
+
/**
* Make a *manual* checkpoint.
* This is entry point for box.snapshot() and SIGUSR1 signal
@@ -283,58 +272,6 @@ gc_ref_checkpoint(struct gc_checkpoint *checkpoint,
void
gc_unref_checkpoint(struct gc_checkpoint_ref *ref);
-/**
- * Register a consumer.
- *
- * This will stop garbage collection of WAL files newer than
- * @vclock until the consumer is unregistered or advanced.
- * @format... specifies a human-readable name of the consumer,
- * it will be used for listing the consumer in box.info.gc().
- *
- * Returns a pointer to the new consumer object or NULL on
- * memory allocation failure.
- */
-CFORMAT(printf, 2, 3)
-struct gc_consumer *
-gc_consumer_register(const struct vclock *vclock, const char *format, ...);
-
-/**
- * Unregister a consumer and invoke garbage collection
- * if needed.
- */
-void
-gc_consumer_unregister(struct gc_consumer *consumer);
-
-/**
- * Advance the vclock tracked by a consumer and
- * invoke garbage collection if needed.
- */
-void
-gc_consumer_advance(struct gc_consumer *consumer, const struct vclock *vclock);
-
-/**
- * Iterator over registered consumers. The iterator is valid
- * as long as the caller doesn't yield.
- */
-struct gc_consumer_iterator {
- struct gc_consumer *curr;
-};
-
-/** Init an iterator over consumers. */
-static inline void
-gc_consumer_iterator_init(struct gc_consumer_iterator *it)
-{
- it->curr = NULL;
-}
-
-/**
- * Iterate to the next registered consumer. Return a pointer
- * to the next consumer object or NULL if there is no more
- * consumers.
- */
-struct gc_consumer *
-gc_consumer_iterator_next(struct gc_consumer_iterator *it);
-
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index c004fad27..aba9a4b7c 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -399,6 +399,8 @@ lbox_info_gc_call(struct lua_State *L)
count = 0;
struct gc_checkpoint *checkpoint;
gc_foreach_checkpoint(checkpoint) {
+ if (checkpoint->is_join_readview)
+ continue;
lua_createtable(L, 0, 2);
lua_pushstring(L, "vclock");
@@ -423,33 +425,6 @@ lbox_info_gc_call(struct lua_State *L)
}
lua_settable(L, -3);
- lua_pushstring(L, "consumers");
- lua_newtable(L);
-
- struct gc_consumer_iterator consumers;
- gc_consumer_iterator_init(&consumers);
-
- count = 0;
- struct gc_consumer *consumer;
- while ((consumer = gc_consumer_iterator_next(&consumers)) != NULL) {
- lua_createtable(L, 0, 3);
-
- lua_pushstring(L, "name");
- lua_pushstring(L, consumer->name);
- lua_settable(L, -3);
-
- lua_pushstring(L, "vclock");
- lbox_pushvclock(L, &consumer->vclock);
- lua_settable(L, -3);
-
- lua_pushstring(L, "signature");
- luaL_pushint64(L, vclock_sum(&consumer->vclock));
- lua_settable(L, -3);
-
- lua_rawseti(L, -2, ++count);
- }
- lua_settable(L, -3);
-
return 1;
}
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 95245a3cf..d21129d73 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -66,23 +66,6 @@ struct relay_status_msg {
struct vclock vclock;
};
-/**
- * Cbus message to update replica gc state in tx thread.
- */
-struct relay_gc_msg {
- /** Parent */
- struct cmsg msg;
- /**
- * Link in the list of pending gc messages,
- * see relay::pending_gc.
- */
- struct stailq_entry in_pending;
- /** Relay instance */
- struct relay *relay;
- /** Vclock to advance to */
- struct vclock vclock;
-};
-
/** State of a replication relay. */
struct relay {
/** The thread in which we relay data to the replica. */
@@ -130,11 +113,6 @@ struct relay {
struct cpipe relay_pipe;
/** Status message */
struct relay_status_msg status_msg;
- /**
- * List of garbage collection messages awaiting
- * confirmation from the replica.
- */
- struct stailq pending_gc;
/** Time when last row was sent to peer. */
double last_row_time;
/** Relay sync state. */
@@ -192,7 +170,6 @@ relay_new(struct replica *replica)
relay->last_row_time = ev_monotonic_now(loop());
fiber_cond_create(&relay->reader_cond);
diag_create(&relay->diag);
- stailq_create(&relay->pending_gc);
relay->state = RELAY_OFF;
return relay;
}
@@ -248,12 +225,6 @@ relay_exit(struct relay *relay)
static void
relay_stop(struct relay *relay)
{
- struct relay_gc_msg *gc_msg, *next_gc_msg;
- stailq_foreach_entry_safe(gc_msg, next_gc_msg,
- &relay->pending_gc, in_pending) {
- free(gc_msg);
- }
- stailq_create(&relay->pending_gc);
if (relay->r != NULL)
recovery_delete(relay->r);
relay->r = NULL;
@@ -388,7 +359,9 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
static void
relay_status_update(struct cmsg *msg)
{
+ struct relay_status_msg *status = (struct relay_status_msg *)msg;
msg->route = NULL;
+ fiber_cond_signal(&status->relay->reader_cond);
}
/**
@@ -398,6 +371,8 @@ static void
tx_status_update(struct cmsg *msg)
{
struct relay_status_msg *status = (struct relay_status_msg *)msg;
+ if (!status->relay->replica->anon)
+ wal_relay_status_update(status->relay->replica->id, &status->vclock);
vclock_copy(&status->relay->tx.vclock, &status->vclock);
static const struct cmsg_hop route[] = {
{relay_status_update, NULL}
@@ -406,74 +381,6 @@ tx_status_update(struct cmsg *msg)
cpipe_push(&status->relay->relay_pipe, msg);
}
-/**
- * Update replica gc state in tx thread.
- */
-static void
-tx_gc_advance(struct cmsg *msg)
-{
- struct relay_gc_msg *m = (struct relay_gc_msg *)msg;
- gc_consumer_advance(m->relay->replica->gc, &m->vclock);
- free(m);
-}
-
-static int
-relay_on_close_log_f(struct trigger *trigger, void * /* event */)
-{
- static const struct cmsg_hop route[] = {
- {tx_gc_advance, NULL}
- };
- struct relay *relay = (struct relay *)trigger->data;
- struct relay_gc_msg *m = (struct relay_gc_msg *)malloc(sizeof(*m));
- if (m == NULL) {
- say_warn("failed to allocate relay gc message");
- return 0;
- }
- cmsg_init(&m->msg, route);
- m->relay = relay;
- vclock_copy(&m->vclock, &relay->r->vclock);
- /*
- * Do not invoke garbage collection until the replica
- * confirms that it has received data stored in the
- * sent xlog.
- */
- stailq_add_tail_entry(&relay->pending_gc, m, in_pending);
- return 0;
-}
-
-/**
- * Invoke pending garbage collection requests.
- *
- * This function schedules the most recent gc message whose
- * vclock is less than or equal to the given one. Older
- * messages are discarded as their job will be done by the
- * scheduled message anyway.
- */
-static inline void
-relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
-{
- struct relay_gc_msg *curr, *next, *gc_msg = NULL;
- stailq_foreach_entry_safe(curr, next, &relay->pending_gc, in_pending) {
- /*
- * We may delete a WAL file only if its vclock is
- * less than or equal to the vclock acknowledged by
- * the replica. Even if the replica's signature is
- * is greater, but the vclocks are incomparable, we
- * must not delete the WAL, because there may still
- * be rows not applied by the replica in it while
- * the greater signatures is due to changes pulled
- * from other members of the cluster.
- */
- if (vclock_compare(&curr->vclock, vclock) > 0)
- break;
- stailq_shift(&relay->pending_gc);
- free(gc_msg);
- gc_msg = curr;
- }
- if (gc_msg != NULL)
- cpipe_push(&relay->tx_pipe, &gc_msg->msg);
-}
-
static void
relay_set_error(struct relay *relay, struct error *e)
{
@@ -575,17 +482,6 @@ relay_subscribe_f(va_list ap)
cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
&relay->relay_pipe, NULL, NULL, cbus_process);
- /*
- * Setup garbage collection trigger.
- * Not needed for anonymous replicas, since they
- * aren't registered with gc at all.
- */
- struct trigger on_close_log = {
- RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL
- };
- if (!relay->replica->anon)
- trigger_add(&r->on_close_log, &on_close_log);
-
/* Setup WAL watcher for sending new rows to the replica. */
wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
relay_process_wal_event, cbus_process);
@@ -649,8 +545,6 @@ relay_subscribe_f(va_list ap)
vclock_copy(&relay->status_msg.vclock, send_vclock);
relay->status_msg.relay = relay;
cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
- /* Collect xlog files received by the replica. */
- relay_schedule_pending_gc(relay, send_vclock);
}
/*
@@ -663,14 +557,16 @@ relay_subscribe_f(va_list ap)
say_crit("exiting the relay loop");
/* Clear garbage collector trigger and WAL watcher. */
- if (!relay->replica->anon)
- trigger_clear(&on_close_log);
wal_clear_watcher(&relay->wal_watcher, cbus_process);
/* Join ack reader fiber. */
fiber_cancel(reader);
fiber_join(reader);
+ /* Wait until the last status message is processed. */
+ while (relay->status_msg.msg.route != NULL)
+ fiber_cond_wait(&relay->reader_cond);
+
/* Destroy cpipe to tx. */
cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
NULL, NULL, cbus_process);
@@ -689,17 +585,8 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
assert(replica->anon || replica->id != REPLICA_ID_NIL);
struct relay *relay = replica->relay;
assert(relay->state != RELAY_FOLLOW);
- /*
- * Register the replica with the garbage collector
- * unless it has already been registered by initial
- * join.
- */
- if (replica->gc == NULL && !replica->anon) {
- replica->gc = gc_consumer_register(replica_clock, "replica %s",
- tt_uuid_str(&replica->uuid));
- if (replica->gc == NULL)
- diag_raise();
- }
+ if (!replica->anon)
+ wal_relay_status_update(replica->id, replica_clock);
relay_start(relay, fd, sync, relay_send_row);
auto relay_guard = make_scoped_guard([=] {
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 1345f189b..fd656bbce 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -37,6 +37,7 @@
#include <small/mempool.h>
#include "box.h"
+#include "wal.h"
#include "gc.h"
#include "error.h"
#include "relay.h"
@@ -176,7 +177,6 @@ replica_new(void)
replica->anon = false;
replica->uuid = uuid_nil;
replica->applier = NULL;
- replica->gc = NULL;
rlist_create(&replica->in_anon);
trigger_create(&replica->on_applier_state,
replica_on_applier_state_f, NULL, NULL);
@@ -191,8 +191,6 @@ replica_delete(struct replica *replica)
assert(replica_is_orphan(replica));
if (replica->relay != NULL)
relay_delete(replica->relay);
- if (replica->gc != NULL)
- gc_consumer_unregister(replica->gc);
TRASH(replica);
free(replica);
}
@@ -235,15 +233,6 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
/* Assign local replica id */
assert(instance_id == REPLICA_ID_NIL);
instance_id = replica_id;
- } else if (replica->anon) {
- /*
- * Set replica gc on its transition from
- * anonymous to a normal one.
- */
- assert(replica->gc == NULL);
- replica->gc = gc_consumer_register(&replicaset.vclock,
- "replica %s",
- tt_uuid_str(&replica->uuid));
}
replicaset.replica_by_id[replica_id] = replica;
@@ -271,22 +260,11 @@ replica_clear_id(struct replica *replica)
assert(replicaset.is_joining);
instance_id = REPLICA_ID_NIL;
}
+
+ wal_relay_delete(replica->id);
replica->id = REPLICA_ID_NIL;
say_info("removed replica %s", tt_uuid_str(&replica->uuid));
- /*
- * The replica will never resubscribe so we don't need to keep
- * WALs for it anymore. Unregister it with the garbage collector
- * if the relay thread is stopped. In case the relay thread is
- * still running, it may need to access replica->gc so leave the
- * job to replica_on_relay_stop, which will be called as soon as
- * the relay thread exits.
- */
- if (replica->gc != NULL &&
- relay_get_state(replica->relay) != RELAY_FOLLOW) {
- gc_consumer_unregister(replica->gc);
- replica->gc = NULL;
- }
if (replica_is_orphan(replica)) {
replica_hash_remove(&replicaset.hash, replica);
replica_delete(replica);
@@ -894,25 +872,6 @@ replicaset_check_quorum(void)
void
replica_on_relay_stop(struct replica *replica)
{
- /*
- * If the replica was evicted from the cluster, or was not
- * even added there (anon replica), we don't need to keep
- * WALs for it anymore. Unregister it with the garbage
- * collector then. See also replica_clear_id.
- */
- if (replica->id == REPLICA_ID_NIL) {
- if (!replica->anon) {
- gc_consumer_unregister(replica->gc);
- replica->gc = NULL;
- } else {
- assert(replica->gc == NULL);
- /*
- * We do not replicate from anonymous
- * replicas.
- */
- assert(replica->applier == NULL);
- }
- }
if (replica_is_orphan(replica)) {
replica_hash_remove(&replicaset.hash, replica);
replica_delete(replica);
diff --git a/src/box/replication.h b/src/box/replication.h
index 2ef1255b3..e3a4eddda 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -94,7 +94,6 @@
extern "C" {
#endif /* defined(__cplusplus) */
-struct gc_consumer;
static const int REPLICATION_CONNECT_QUORUM_ALL = INT_MAX;
@@ -281,8 +280,6 @@ struct replica {
struct applier *applier;
/** Relay thread. */
struct relay *relay;
- /** Garbage collection state associated with the replica. */
- struct gc_consumer *gc;
/** Link in the anon_replicas list. */
struct rlist in_anon;
/**
diff --git a/src/box/wal.c b/src/box/wal.c
index ecbe0919e..411850a05 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -43,6 +43,7 @@
#include "cbus.h"
#include "coio_task.h"
#include "replication.h"
+#include "mclock.h"
enum {
/**
@@ -154,6 +155,24 @@ struct wal_writer
* Used for replication relays.
*/
struct rlist watchers;
+ /**
+ * Matrix clock with all wal consumer vclocks.
+ */
+ struct mclock mclock;
+ /**
+ * Fiber condition signalled on matrix clock update.
+ */
+ struct fiber_cond wal_gc_cond;
+ /**
+ * Minimal known xlog vclock used to decide when
+ * wal gc should be invoked. It is a wal vclockset
+ * second cached value.
+ */
+ const struct vclock *gc_wal_vclock;
+ /**
+ * Vclock holding logs past it. Ignored in case of ENOSPC.
+ */
+ struct vclock gc_first_vclock;
};
struct wal_msg {
@@ -335,6 +354,45 @@ tx_notify_checkpoint(struct cmsg *msg)
free(msg);
}
+/**
+ * A shortcut function which returns the vclock of the second
+ * oldest xlog in hte wal directory. If the gc vclock is greater
+ * than or equal to the one of the second oldest log, there's at
+ * least one log file which can be collected.
+ */
+static inline const struct vclock *
+second_vclock(struct wal_writer *writer)
+{
+ struct vclock *first_vclock = vclockset_first(&writer->wal_dir.index);
+ struct vclock *second_vclock = NULL;
+ if (first_vclock != NULL)
+ second_vclock = vclockset_next(&writer->wal_dir.index,
+ first_vclock);
+ if (first_vclock != NULL && second_vclock == NULL &&
+ first_vclock->signature != writer->vclock.signature) {
+ /* New xlog could be not created yet. */
+ second_vclock = &writer->vclock;
+ }
+ return second_vclock;
+}
+
+/**
+ * A shortcut function which compares three vclocks and
+ * return true if the first one is not greater or equal than the
+ * second one whereas the third one is. Used in order to decide
+ * when a wal gc should be signalled.
+ */
+static inline bool
+vclock_order_changed(const struct vclock *old, const struct vclock *target,
+ const struct vclock *new)
+{
+ int rc = vclock_compare(old, target);
+ if (rc > 0 && rc != VCLOCK_ORDER_UNDEFINED)
+ return false;
+ rc = vclock_compare(new, target);
+ return rc >= 0 && rc != VCLOCK_ORDER_UNDEFINED;
+}
+
/**
* Initialize WAL writer context. Even though it's a singleton,
* encapsulate the details just in case we may use
@@ -375,6 +433,12 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
mempool_create(&writer->msg_pool, &cord()->slabc,
sizeof(struct wal_msg));
+
+ mclock_create(&writer->mclock);
+
+ fiber_cond_create(&writer->wal_gc_cond);
+ writer->gc_wal_vclock = NULL;
+ vclock_create(&writer->gc_first_vclock);
}
/** Destroy a WAL writer structure. */
@@ -494,6 +558,7 @@ wal_enable(void)
*/
if (xdir_scan(&writer->wal_dir))
return -1;
+ writer->gc_wal_vclock = second_vclock(writer);
/* Open the most recent WAL file. */
if (wal_open(writer) != 0)
@@ -592,6 +657,8 @@ wal_begin_checkpoint_f(struct cbus_call_msg *data)
/*
* The next WAL will be created on the first write.
*/
+ if (writer->gc_wal_vclock == NULL)
+ writer->gc_wal_vclock = second_vclock(writer);
}
vclock_copy(&msg->vclock, &writer->vclock);
msg->wal_size = writer->checkpoint_wal_size;
@@ -695,20 +762,36 @@ wal_set_checkpoint_threshold(int64_t threshold)
fiber_set_cancellable(cancellable);
}
-struct wal_gc_msg
+static void
+wal_gc_advance(struct wal_writer *writer)
{
- struct cbus_call_msg base;
- const struct vclock *vclock;
-};
+ static struct cmsg_hop route[] = {
+ { tx_notify_gc, NULL },
+ };
+ struct tx_notify_gc_msg *msg = malloc(sizeof(*msg));
+ if (msg != NULL) {
+ if (xdir_first_vclock(&writer->wal_dir, &msg->vclock) < 0)
+ vclock_copy(&msg->vclock, &writer->vclock);
+ cmsg_init(&msg->base, route);
+ cpipe_push(&writer->tx_prio_pipe, &msg->base);
+ } else
+ say_warn("failed to allocate gc notification message");
+}
static int
-wal_collect_garbage_f(struct cbus_call_msg *data)
+wal_collect_garbage(struct wal_writer *writer)
{
- struct wal_writer *writer = &wal_writer_singleton;
- const struct vclock *vclock = ((struct wal_gc_msg *)data)->vclock;
+ struct vclock *collect_vclock = &writer->gc_first_vclock;
+ struct vclock relay_min_vclock;
+ if (mclock_get(&writer->mclock, -1, &relay_min_vclock) == 0) {
+ int rc = vclock_compare(collect_vclock, &relay_min_vclock);
+ if (rc > 0 || rc == VCLOCK_ORDER_UNDEFINED)
+ collect_vclock = &relay_min_vclock;
+ }
+ int rc = vclock_compare(collect_vclock, &writer->vclock);
if (!xlog_is_open(&writer->current_wal) &&
- vclock_sum(vclock) >= vclock_sum(&writer->vclock)) {
+ rc >= 0 && rc != VCLOCK_ORDER_UNDEFINED) {
/*
* The last available WAL file has been sealed and
* all registered consumers have done reading it.
@@ -720,27 +803,54 @@ wal_collect_garbage_f(struct cbus_call_msg *data)
* required by registered consumers and delete all
* older WAL files.
*/
- vclock = vclockset_psearch(&writer->wal_dir.index, vclock);
+ collect_vclock = vclockset_match(&writer->wal_dir.index,
+ collect_vclock);
+ }
+ if (collect_vclock != NULL) {
+ xdir_collect_garbage(&writer->wal_dir,
+ vclock_sum(collect_vclock), XDIR_GC_ASYNC);
+ writer->gc_wal_vclock = second_vclock(writer);
+ wal_gc_advance(writer);
}
- if (vclock != NULL)
- xdir_collect_garbage(&writer->wal_dir, vclock_sum(vclock),
- XDIR_GC_ASYNC);
return 0;
}
+struct wal_set_gc_first_vclock_msg {
+ struct cbus_call_msg base;
+ const struct vclock *vclock;
+};
+
+int
+wal_set_gc_first_vclock_f(struct cbus_call_msg *base)
+{
+ struct wal_writer *writer = &wal_writer_singleton;
+ struct wal_set_gc_first_vclock_msg *msg =
+ container_of(base, struct wal_set_gc_first_vclock_msg, base);
+ if (writer->gc_wal_vclock != NULL &&
+ vclock_order_changed(&writer->gc_first_vclock,
+ writer->gc_wal_vclock, msg->vclock))
+ fiber_cond_signal(&writer->wal_gc_cond);
+ vclock_copy(&writer->gc_first_vclock, msg->vclock);
+ return 0;
+}
+
void
-wal_collect_garbage(const struct vclock *vclock)
+wal_set_gc_first_vclock(const struct vclock *vclock)
{
struct wal_writer *writer = &wal_writer_singleton;
- if (writer->wal_mode == WAL_NONE)
+ if (writer->wal_mode == WAL_NONE) {
+ vclock_copy(&writer->gc_first_vclock, vclock);
return;
- struct wal_gc_msg msg;
+ }
+ struct wal_set_gc_first_vclock_msg msg;
msg.vclock = vclock;
bool cancellable = fiber_set_cancellable(false);
- cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg.base,
- wal_collect_garbage_f, NULL, TIMEOUT_INFINITY);
+ cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
+ &msg.base, wal_set_gc_first_vclock_f, NULL,
+ TIMEOUT_INFINITY);
fiber_set_cancellable(cancellable);
+
}
static void
@@ -790,7 +900,8 @@ wal_opt_rotate(struct wal_writer *writer)
* collection, see wal_collect_garbage().
*/
xdir_add_vclock(&writer->wal_dir, &writer->vclock);
-
+ if (writer->gc_wal_vclock == NULL)
+ writer->gc_wal_vclock = second_vclock(writer);
wal_notify_watchers(writer, WAL_EVENT_ROTATE);
return 0;
}
@@ -845,6 +956,10 @@ retry:
}
xdir_collect_garbage(&writer->wal_dir, gc_lsn, XDIR_GC_REMOVE_ONE);
+ writer->gc_wal_vclock = second_vclock(writer);
+ if (vclock_compare(&writer->gc_first_vclock,
+ writer->gc_wal_vclock) < 0)
+ vclock_copy(&writer->gc_first_vclock, writer->gc_wal_vclock);
notify_gc = true;
goto retry;
error:
@@ -861,20 +976,8 @@ out:
* event and a failure to send this message isn't really
* critical.
*/
- if (notify_gc) {
- static struct cmsg_hop route[] = {
- { tx_notify_gc, NULL },
- };
- struct tx_notify_gc_msg *msg = malloc(sizeof(*msg));
- if (msg != NULL) {
- if (xdir_first_vclock(&writer->wal_dir,
- &msg->vclock) < 0)
- vclock_copy(&msg->vclock, &writer->vclock);
- cmsg_init(&msg->base, route);
- cpipe_push(&writer->tx_prio_pipe, &msg->base);
- } else
- say_warn("failed to allocate gc notification message");
- }
+ if (notify_gc)
+ wal_gc_advance(writer);
return rc;
}
@@ -1117,6 +1220,26 @@ done:
ERROR_INJECT_SLEEP(ERRINJ_RELAY_FASTER_THAN_TX);
}
+
+/*
+ * WAL garbage collection fiber.
+ * The fiber waits until writer mclock is updated
+ * and then compares the mclock lower bound with
+ * the oldest wal file.
+ */
+static int
+wal_gc_f(va_list ap)
+{
+ struct wal_writer *writer = va_arg(ap, struct wal_writer *);
+
+ while (!fiber_is_cancelled()) {
+ fiber_cond_wait(&writer->wal_gc_cond);
+ wal_collect_garbage(writer);
+ }
+
+ return 0;
+}
+
/** WAL writer main loop. */
static int
wal_writer_f(va_list ap)
@@ -1136,8 +1259,15 @@ wal_writer_f(va_list ap)
*/
cpipe_create(&writer->tx_prio_pipe, "tx_prio");
+ struct fiber *wal_gc_fiber = fiber_new("wal_gc", wal_gc_f);
+ fiber_set_joinable(wal_gc_fiber, true);
+ fiber_start(wal_gc_fiber, writer);
+
cbus_loop(&endpoint);
+ fiber_cancel(wal_gc_fiber);
+ fiber_join(wal_gc_fiber);
+
/*
* Create a new empty WAL on shutdown so that we don't
* have to rescan the last WAL to find the instance vclock.
@@ -1429,6 +1559,83 @@ wal_notify_watchers(struct wal_writer *writer, unsigned events)
wal_watcher_notify(watcher, events);
}
+struct wal_relay_status_update_msg {
+ struct cbus_call_msg base;
+ uint32_t replica_id;
+ struct vclock vclock;
+};
+
+static int
+wal_relay_status_update_f(struct cbus_call_msg *base)
+{
+ struct wal_writer *writer = &wal_writer_singleton;
+ struct wal_relay_status_update_msg *msg =
+ container_of(base, struct wal_relay_status_update_msg, base);
+ struct vclock old_vclock;
+ mclock_get_row(&writer->mclock, msg->replica_id, &old_vclock);
+ if (writer->gc_wal_vclock != NULL &&
+ vclock_order_changed(&old_vclock, writer->gc_wal_vclock,
+ &msg->vclock))
+ fiber_cond_signal(&writer->wal_gc_cond);
+ mclock_update(&writer->mclock, msg->replica_id, &msg->vclock);
+ return 0;
+}
+
+void
+wal_relay_status_update(uint32_t replica_id, const struct vclock *vclock)
+{
+ struct wal_writer *writer = &wal_writer_singleton;
+ struct wal_relay_status_update_msg msg;
+ /*
+ * We do not take anonymous replica in account. There is
+ * no way to distinguish them but anonynous replica could
+ * be rebootstrapped at any time.
+ */
+ if (replica_id == 0)
+ return;
+ msg.replica_id = replica_id;
+ vclock_copy(&msg.vclock, vclock);
+ bool cancellable = fiber_set_cancellable(false);
+ cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
+ &msg.base, wal_relay_status_update_f, NULL,
+ TIMEOUT_INFINITY);
+ fiber_set_cancellable(cancellable);
+}
+
+struct wal_relay_delete_msg {
+ struct cmsg base;
+ uint32_t replica_id;
+};
+
+void
+wal_relay_delete_f(struct cmsg *base)
+{
+ struct wal_writer *writer = &wal_writer_singleton;
+ struct wal_relay_delete_msg *msg =
+ container_of(base, struct wal_relay_delete_msg, base);
+ struct vclock vclock;
+ vclock_create(&vclock);
+ mclock_update(&writer->mclock, msg->replica_id, &vclock);
+ fiber_cond_signal(&writer->wal_gc_cond);
+ free(msg);
+}
+
+void
+wal_relay_delete(uint32_t replica_id)
+{
+ struct wal_writer *writer = &wal_writer_singleton;
+ struct wal_relay_delete_msg *msg =
+ (struct wal_relay_delete_msg *)malloc(sizeof(*msg));
+ if (msg == NULL) {
+ say_error("Could not allocate relay delete message");
+ return;
+ }
+ static struct cmsg_hop route[] = {{wal_relay_delete_f, NULL}};
+ cmsg_init(&msg->base, route);
+ msg->replica_id = replica_id;
+ cpipe_push(&writer->wal_pipe, &msg->base);
+}
+
/**
* After fork, the WAL writer thread disappears.
* Make sure that atexit() handlers in the child do
diff --git a/src/box/wal.h b/src/box/wal.h
index 76b44941a..86887656d 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -223,6 +223,12 @@ wal_begin_checkpoint(struct wal_checkpoint *checkpoint);
void
wal_commit_checkpoint(struct wal_checkpoint *checkpoint);
+/**
+ * Prevent wal from collecting logs after the given vclock.
+ */
+void
+wal_set_gc_first_vclock(const struct vclock *vclock);
+
/**
* Set the WAL size threshold exceeding which will trigger
* checkpointing in TX.
@@ -231,11 +237,16 @@ void
wal_set_checkpoint_threshold(int64_t threshold);
/**
- * Remove WAL files that are not needed by consumers reading
- * rows at @vclock or newer.
+ * Update a wal consumer vclock position.
+ */
+void
+wal_relay_status_update(uint32_t replica_id, const struct vclock *vclock);
+
+/**
+ * Unregister a wal consumer.
*/
void
-wal_collect_garbage(const struct vclock *vclock);
+wal_relay_delete(uint32_t replica_id);
void
wal_init_vy_log();
diff --git a/test/replication/gc_no_space.result b/test/replication/gc_no_space.result
index e860ab00f..f295cb16b 100644
--- a/test/replication/gc_no_space.result
+++ b/test/replication/gc_no_space.result
@@ -162,18 +162,12 @@ check_snap_count(2)
gc = box.info.gc()
---
...
-#gc.consumers -- 3
----
-- 3
-...
+--#gc.consumers -- 3
#gc.checkpoints -- 2
---
- 2
...
-gc.signature == gc.consumers[1].signature
----
-- true
-...
+--gc.signature == gc.consumers[1].signature
--
-- Inject a ENOSPC error and check that the WAL thread deletes
-- old WAL files to prevent the user from seeing the error.
@@ -201,18 +195,12 @@ check_snap_count(2)
gc = box.info.gc()
---
...
-#gc.consumers -- 1
----
-- 1
-...
+--#gc.consumers -- 1
#gc.checkpoints -- 2
---
- 2
...
-gc.signature == gc.consumers[1].signature
----
-- true
-...
+--gc.signature == gc.consumers[1].signature
--
-- Check that the WAL thread never deletes WAL files that are
-- needed for recovery from the last checkpoint, but may delete
@@ -242,10 +230,7 @@ check_snap_count(2)
gc = box.info.gc()
---
...
-#gc.consumers -- 0
----
-- 0
-...
+--#gc.consumers -- 0
#gc.checkpoints -- 2
---
- 2
@@ -272,7 +257,4 @@ gc = box.info.gc()
---
- 2
...
-gc.signature == gc.checkpoints[2].signature
----
-- true
-...
+--gc.signature == gc.checkpoints[2].signature
diff --git a/test/replication/gc_no_space.test.lua b/test/replication/gc_no_space.test.lua
index 98ccd401b..c28bc0710 100644
--- a/test/replication/gc_no_space.test.lua
+++ b/test/replication/gc_no_space.test.lua
@@ -72,9 +72,9 @@ s:auto_increment{}
check_wal_count(5)
check_snap_count(2)
gc = box.info.gc()
-#gc.consumers -- 3
+--#gc.consumers -- 3
#gc.checkpoints -- 2
-gc.signature == gc.consumers[1].signature
+--gc.signature == gc.consumers[1].signature
--
-- Inject a ENOSPC error and check that the WAL thread deletes
@@ -87,9 +87,9 @@ errinj.info()['ERRINJ_WAL_FALLOCATE'].state -- 0
check_wal_count(3)
check_snap_count(2)
gc = box.info.gc()
-#gc.consumers -- 1
+--#gc.consumers -- 1
#gc.checkpoints -- 2
-gc.signature == gc.consumers[1].signature
+--gc.signature == gc.consumers[1].signature
--
-- Check that the WAL thread never deletes WAL files that are
@@ -104,7 +104,7 @@ errinj.info()['ERRINJ_WAL_FALLOCATE'].state -- 0
check_wal_count(1)
check_snap_count(2)
gc = box.info.gc()
-#gc.consumers -- 0
+--#gc.consumers -- 0
#gc.checkpoints -- 2
gc.signature == gc.checkpoints[2].signature
@@ -116,4 +116,4 @@ test_run:cleanup_cluster()
test_run:cmd("restart server default")
gc = box.info.gc()
#gc.checkpoints -- 2
-gc.signature == gc.checkpoints[2].signature
+--gc.signature == gc.checkpoints[2].signature
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v2 3/5] vclock: add an ability to set individual clock components
2020-03-18 19:47 [Tarantool-patches] [PATCH v2 0/5] replication: fix local space tracking Serge Petrenko
2020-03-18 19:47 ` [Tarantool-patches] [PATCH v2 1/5] box: introduce matrix clock Serge Petrenko
2020-03-18 19:47 ` [Tarantool-patches] [PATCH v2 2/5] wal: track consumer vclock and collect logs in wal thread Serge Petrenko
@ 2020-03-18 19:47 ` Serge Petrenko
2020-03-18 20:10 ` Konstantin Osipov
2020-03-18 19:47 ` [Tarantool-patches] [PATCH v2 4/5] replication: hide 0-th vclock components in replication responses Serge Petrenko
` (3 subsequent siblings)
6 siblings, 1 reply; 17+ messages in thread
From: Serge Petrenko @ 2020-03-18 19:47 UTC (permalink / raw)
To: v.shpilevoy, kostja.osipov, georgy; +Cc: tarantool-patches
This is needed to 'hide' 0-th vclock component in replication responses.
Follow-up #3186
Prerequisite #4114
---
src/box/vclock.c | 15 +++++++++++++++
src/box/vclock.h | 11 +++++++++++
2 files changed, 26 insertions(+)
diff --git a/src/box/vclock.c b/src/box/vclock.c
index 90ae27591..302c0438d 100644
--- a/src/box/vclock.c
+++ b/src/box/vclock.c
@@ -37,6 +37,21 @@
#include "diag.h"
#include "tt_static.h"
+void
+vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
+{
+ assert(lsn >= 0);
+ assert(replica_id < VCLOCK_MAX);
+ vclock->signature -= vclock_get(vclock, replica_id);
+ if (lsn == 0) {
+ vclock->map &= ~(1 << replica_id);
+ return;
+ }
+ vclock->lsn[replica_id] = lsn;
+ vclock->map |= 1 << replica_id;
+ vclock->signature += lsn;
+}
+
int64_t
vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
{
diff --git a/src/box/vclock.h b/src/box/vclock.h
index eb0fb5d8b..e54711bc1 100644
--- a/src/box/vclock.h
+++ b/src/box/vclock.h
@@ -209,6 +209,17 @@ vclock_calc_sum(const struct vclock *vclock)
return sum;
}
+/**
+ * Set vclock component represented by replica id to the desired
+ * value.
+ *
+ * @param vclock Vector clock.
+ * @param replica_id Replica identifier.
+ * @param lsn Lsn to set
+ */
+void
+vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn);
+
static inline int64_t
vclock_sum(const struct vclock *vclock)
{
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 3/5] vclock: add an ability to set individual clock components
2020-03-18 19:47 ` [Tarantool-patches] [PATCH v2 3/5] vclock: add an ability to set individual clock components Serge Petrenko
@ 2020-03-18 20:10 ` Konstantin Osipov
2020-03-19 11:31 ` Serge Petrenko
0 siblings, 1 reply; 17+ messages in thread
From: Konstantin Osipov @ 2020-03-18 20:10 UTC (permalink / raw)
To: Serge Petrenko; +Cc: v.shpilevoy, tarantool-patches
* Serge Petrenko <sergepetrenko@tarantool.org> [20/03/18 22:52]:
> This is needed to 'hide' 0-th vclock component in replication responses.
How are these patches connected? If they are not connected, why
are in the same series as matrix clock?
--
Konstantin Osipov, Moscow, Russia
https://scylladb.com
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 3/5] vclock: add an ability to set individual clock components
2020-03-18 20:10 ` Konstantin Osipov
@ 2020-03-19 11:31 ` Serge Petrenko
0 siblings, 0 replies; 17+ messages in thread
From: Serge Petrenko @ 2020-03-19 11:31 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: Vladislav Shpilevoy, tarantool-patches
[-- Attachment #1: Type: text/plain, Size: 693 bytes --]
> 18 марта 2020 г., в 23:10, Konstantin Osipov <kostja.osipov@gmail.com> написал(а):
>
> * Serge Petrenko <sergepetrenko@tarantool.org> [20/03/18 22:52]:
>> This is needed to 'hide' 0-th vclock component in replication responses.
>
> How are these patches connected? If they are not connected, why
> are in the same series as matrix clock?
Patches 1-2 are needed for patch 5, which fixes the issue.
Patch 3 is needed for patch 4 which is needed for patch 5.
Matrix clock is connected to the issue, as described in my other letter.
--
Serge Petrenko
sergepetrenko@tarantool.org
>
>
> --
> Konstantin Osipov, Moscow, Russia
> https://scylladb.com
[-- Attachment #2: Type: text/html, Size: 2221 bytes --]
^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v2 4/5] replication: hide 0-th vclock components in replication responses
2020-03-18 19:47 [Tarantool-patches] [PATCH v2 0/5] replication: fix local space tracking Serge Petrenko
` (2 preceding siblings ...)
2020-03-18 19:47 ` [Tarantool-patches] [PATCH v2 3/5] vclock: add an ability to set individual clock components Serge Petrenko
@ 2020-03-18 19:47 ` Serge Petrenko
2020-03-18 19:47 ` [Tarantool-patches] [PATCH v2 5/5] box: start counting local space requests separately Serge Petrenko
` (2 subsequent siblings)
6 siblings, 0 replies; 17+ messages in thread
From: Serge Petrenko @ 2020-03-18 19:47 UTC (permalink / raw)
To: v.shpilevoy, kostja.osipov, georgy; +Cc: tarantool-patches
If an anonymous replica is promoted to a normal one and becomes
replication master later, its vclock contains a non-empty zero
component, tracking local changes on this replica from the time when it
had been anonymous. No need to pollute joining instance's vclock with
our non-empty 0 component.
When an anonymous replica reports its status to a remote instance it
should also hide its 0-th vclock component.
Follow-up #3186
Prerequisite #4114
---
src/box/applier.cc | 5 ++++-
src/box/box.cc | 12 ++++++++++--
src/box/relay.cc | 6 +++++-
test/replication/anon.result | 5 +++++
test/replication/anon.test.lua | 2 ++
5 files changed, 26 insertions(+), 4 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index ff40e03c6..6d2fe73aa 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -173,7 +173,10 @@ applier_writer_f(va_list ap)
continue;
try {
struct xrow_header xrow;
- xrow_encode_vclock(&xrow, &replicaset.vclock);
+ struct vclock vclock;
+ vclock_copy(&vclock, &replicaset.vclock);
+ vclock_set(&vclock, 0, 0);
+ xrow_encode_vclock(&xrow, &vclock);
coio_write_xrow(&io, &xrow);
} catch (SocketError *e) {
/*
diff --git a/src/box/box.cc b/src/box/box.cc
index 6e101a0be..21c7a3324 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1538,6 +1538,7 @@ box_process_fetch_snapshot(struct ev_io *io, struct xrow_header *header)
/* Remember master's vclock after the last request */
struct vclock stop_vclock;
vclock_copy(&stop_vclock, &replicaset.vclock);
+ vclock_set(&stop_vclock, 0, 0);
/* Send end of snapshot data marker */
struct xrow_header row;
@@ -1620,7 +1621,9 @@ box_process_register(struct ev_io *io, struct xrow_header *header)
struct xrow_header row;
/* Send end of WAL stream marker */
- xrow_encode_vclock_xc(&row, &replicaset.vclock);
+ vclock_copy(&vclock, &replicaset.vclock);
+ vclock_set(&vclock, 0, 0);
+ xrow_encode_vclock_xc(&row, &vclock);
row.sync = header->sync;
coio_write_xrow(io, &row);
@@ -1769,7 +1772,11 @@ box_process_join(struct ev_io *io, struct xrow_header *header)
say_info("final data sent.");
/* Send end of WAL stream marker */
- xrow_encode_vclock_xc(&row, &replicaset.vclock);
+ struct vclock vclock;
+ vclock_copy(&vclock, &replicaset.vclock);
+ vclock_set(&vclock, 0, 0);
+ xrow_encode_vclock_xc(&row, &vclock);
+
row.sync = header->sync;
coio_write_xrow(io, &row);
@@ -1893,6 +1900,7 @@ box_process_vote(struct ballot *ballot)
ballot->is_loading = is_ro;
vclock_copy(&ballot->vclock, &replicaset.vclock);
vclock_copy(&ballot->gc_vclock, &gc.vclock);
+ vclock_set(&ballot->gc_vclock, 0, 0);
}
/** Insert a new cluster into _schema */
diff --git a/src/box/relay.cc b/src/box/relay.cc
index d21129d73..3191ad99f 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -293,7 +293,11 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
/* Respond to the JOIN request with the current vclock. */
struct xrow_header row;
- xrow_encode_vclock_xc(&row, vclock);
+ struct vclock subst_vclock;
+ vclock_copy(&subst_vclock, vclock);
+ /* zero - out 0-th vclock component. */
+ vclock_set(&subst_vclock, 0, 0);
+ xrow_encode_vclock_xc(&row, &subst_vclock);
row.sync = sync;
coio_write_xrow(&relay->io, &row);
diff --git a/test/replication/anon.result b/test/replication/anon.result
index 88061569f..cbbeeef09 100644
--- a/test/replication/anon.result
+++ b/test/replication/anon.result
@@ -187,6 +187,11 @@ a > 0
| ---
| - true
| ...
+-- 0-th vclock component isn't propagated across the cluster.
+box.info.vclock[0]
+ | ---
+ | - null
+ | ...
test_run:cmd('switch default')
| ---
| - true
diff --git a/test/replication/anon.test.lua b/test/replication/anon.test.lua
index 8a8d15c18..627dc5c8e 100644
--- a/test/replication/anon.test.lua
+++ b/test/replication/anon.test.lua
@@ -66,6 +66,8 @@ test_run:cmd('switch replica_anon2')
a = box.info.vclock[1]
-- The instance did fetch a snapshot.
a > 0
+-- 0-th vclock component isn't propagated across the cluster.
+box.info.vclock[0]
test_run:cmd('switch default')
box.space.test:insert{2}
test_run:cmd("switch replica_anon2")
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 17+ messages in thread
* [Tarantool-patches] [PATCH v2 5/5] box: start counting local space requests separately
2020-03-18 19:47 [Tarantool-patches] [PATCH v2 0/5] replication: fix local space tracking Serge Petrenko
` (3 preceding siblings ...)
2020-03-18 19:47 ` [Tarantool-patches] [PATCH v2 4/5] replication: hide 0-th vclock components in replication responses Serge Petrenko
@ 2020-03-18 19:47 ` Serge Petrenko
2020-03-18 21:12 ` [Tarantool-patches] [PATCH v2 0/5] replication: fix local space tracking Vladislav Shpilevoy
2020-03-19 8:17 ` Konstantin Osipov
6 siblings, 0 replies; 17+ messages in thread
From: Serge Petrenko @ 2020-03-18 19:47 UTC (permalink / raw)
To: v.shpilevoy, kostja.osipov, georgy; +Cc: tarantool-patches
Sign local space requests with a zero instance id. This allows to split
local changes aside from the changes, which should be visible to the whole
cluster, and stop sending NOPs to replicas to follow local vclock.
Closes #4114
---
src/box/box.cc | 20 +++
src/box/relay.cc | 20 +--
src/box/wal.c | 16 ++-
test/replication/autobootstrap.result | 6 +
test/replication/autobootstrap.test.lua | 2 +
test/replication/before_replace.result | 8 +-
test/replication/before_replace.test.lua | 4 +-
.../gh-4114-local-space-replication.result | 125 ++++++++++++++++++
.../gh-4114-local-space-replication.test.lua | 48 +++++++
test/replication/local_spaces.result | 4 +
test/replication/local_spaces.test.lua | 3 +
test/replication/misc.result | 6 +
test/replication/misc.test.lua | 2 +
test/replication/quorum.result | 6 +
test/replication/quorum.test.lua | 2 +
test/replication/replica_rejoin.result | 9 ++
test/replication/replica_rejoin.test.lua | 3 +
test/replication/skip_conflict_row.result | 3 +
test/replication/skip_conflict_row.test.lua | 1 +
test/replication/suite.cfg | 1 +
test/vinyl/errinj.result | 5 +
test/vinyl/errinj.test.lua | 4 +
22 files changed, 282 insertions(+), 16 deletions(-)
create mode 100644 test/replication/gh-4114-local-space-replication.result
create mode 100644 test/replication/gh-4114-local-space-replication.test.lua
diff --git a/src/box/box.cc b/src/box/box.cc
index 21c7a3324..f4f7fd276 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1870,6 +1870,26 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
say_info("remote vclock %s local vclock %s",
vclock_to_string(&replica_clock), vclock_to_string(&vclock));
+ /*
+ * Replica clock is used in gc state and recovery
+ * initialization, so we need to replace the remote 0-th
+ * component with our own one. This makes recovery work
+ * correctly: we're trying to recover from an xlog whose
+ * vclock is smaller than remote replica clock in each
+ * component, exluding the 0-th component. This leads to
+ * finding the correct WAL, if it exists, since we do not
+ * need to recover local rows (the ones, that contribute
+ * to the 0-th vclock component). It would be bad to set
+ * 0-th vclock component to a smaller value, since it
+ * would unnecessarily require additional WALs, which may
+ * have already been deleted.
+ * Same stands for gc. Remote instances do not need this
+ * instance's local rows, and after gc was reworked to
+ * track individual vclock components instead of
+ * signatures it's safe to set the local component to the
+ * most recent value.
+ */
+ vclock_set(&replica_clock, 0, vclock_get(&replicaset.vclock, 0));
/*
* Process SUBSCRIBE request via replication relay
* Send current recovery vector clock as a marker
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 3191ad99f..454cb5811 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -375,8 +375,11 @@ static void
tx_status_update(struct cmsg *msg)
{
struct relay_status_msg *status = (struct relay_status_msg *)msg;
+ struct vclock subst_vclock;
+ vclock_copy(&subst_vclock, &status->vclock);
+ vclock_set(&subst_vclock, 0, vclock_get(&replicaset.vclock, 0));
if (!status->relay->replica->anon)
- wal_relay_status_update(status->relay->replica->id, &status->vclock);
+ wal_relay_status_update(status->relay->replica->id, &subst_vclock);
vclock_copy(&status->relay->tx.vclock, &status->vclock);
static const struct cmsg_hop route[] = {
{relay_status_update, NULL}
@@ -647,16 +650,15 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
{
struct relay *relay = container_of(stream, struct relay, stream);
assert(iproto_type_is_dml(packet->type));
- /*
- * Transform replica local requests to IPROTO_NOP so as to
- * promote vclock on the replica without actually modifying
- * any data.
- */
if (packet->group_id == GROUP_LOCAL) {
/*
- * Replica-local requests generated while replica
- * was anonymous have a zero instance id. Just
- * skip all these rows.
+ * We do not relay replica-local rows to other
+ * instances, since we started signing them with
+ * a zero instance id. However, if replica-local
+ * rows, signed with a non-zero id are present in
+ * our WAL, we still need to relay them as NOPs in
+ * order to correctly promote the vclock on the
+ * replica.
*/
if (packet->replica_id == REPLICA_ID_NIL)
return;
diff --git a/src/box/wal.c b/src/box/wal.c
index 411850a05..197508214 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -1042,13 +1042,19 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
/** Assign LSN to all local rows. */
for ( ; row < end; row++) {
if ((*row)->replica_id == 0) {
- (*row)->lsn = vclock_inc(vclock_diff, instance_id) +
- vclock_get(base, instance_id);
/*
- * Note, an anonymous replica signs local
- * rows whith a zero instance id.
+ * All rows representing local space data
+ * manipulations are signed wth a zero
+ * instance id. This is also true for
+ * anonymous replicas, since they are
+ * only capable of writing to local and
+ * temporary spaces.
*/
- (*row)->replica_id = instance_id;
+ if ((*row)->group_id != GROUP_LOCAL)
+ (*row)->replica_id = instance_id;
+
+ (*row)->lsn = vclock_inc(vclock_diff, (*row)->replica_id) +
+ vclock_get(base, (*row)->replica_id);
/* Use lsn of the first local row as transaction id. */
tsn = tsn == 0 ? (*row)->lsn : tsn;
(*row)->tsn = tsn;
diff --git a/test/replication/autobootstrap.result b/test/replication/autobootstrap.result
index 743982d47..6918e23ea 100644
--- a/test/replication/autobootstrap.result
+++ b/test/replication/autobootstrap.result
@@ -162,6 +162,9 @@ box.schema.user.revoke('test_u', 'create', 'space')
vclock = test_run:get_vclock('autobootstrap1')
---
...
+vclock[0] = nil
+---
+...
_ = test_run:wait_vclock("autobootstrap2", vclock)
---
...
@@ -206,6 +209,9 @@ test_run:wait_fullmesh(SERVERS)
vclock = test_run:get_vclock("autobootstrap1")
---
...
+vclock[0] = nil
+---
+...
_ = test_run:wait_vclock("autobootstrap2", vclock)
---
...
diff --git a/test/replication/autobootstrap.test.lua b/test/replication/autobootstrap.test.lua
index 055ea4277..f8bb1c74a 100644
--- a/test/replication/autobootstrap.test.lua
+++ b/test/replication/autobootstrap.test.lua
@@ -74,6 +74,7 @@ box.schema.user.revoke('test_u', 'create', 'space')
-- Synchronize
vclock = test_run:get_vclock('autobootstrap1')
+vclock[0] = nil
_ = test_run:wait_vclock("autobootstrap2", vclock)
_ = test_run:wait_vclock("autobootstrap3", vclock)
@@ -95,6 +96,7 @@ _ = test_run:cmd("switch default")
test_run:wait_fullmesh(SERVERS)
vclock = test_run:get_vclock("autobootstrap1")
+vclock[0] = nil
_ = test_run:wait_vclock("autobootstrap2", vclock)
_ = test_run:wait_vclock("autobootstrap3", vclock)
diff --git a/test/replication/before_replace.result b/test/replication/before_replace.result
index ced40547e..61a552e84 100644
--- a/test/replication/before_replace.result
+++ b/test/replication/before_replace.result
@@ -266,7 +266,13 @@ box.space.test:replace{1, 1}
---
- [1, 1]
...
-_ = test_run:wait_vclock('replica', test_run:get_vclock('default'))
+vclock = test_run:get_vclock('default')
+---
+...
+vclock[0] = nil
+---
+...
+_ = test_run:wait_vclock('replica', vclock)
---
...
-- Check that replace{1, 2} coming from the master was suppressed
diff --git a/test/replication/before_replace.test.lua b/test/replication/before_replace.test.lua
index bcc6dc00d..ecd8ff044 100644
--- a/test/replication/before_replace.test.lua
+++ b/test/replication/before_replace.test.lua
@@ -101,7 +101,9 @@ _ = box.space.test:before_replace(function(old, new) return new:update{{'+', 2,
test_run:cmd("switch default")
box.space.test:replace{1, 1}
-_ = test_run:wait_vclock('replica', test_run:get_vclock('default'))
+vclock = test_run:get_vclock('default')
+vclock[0] = nil
+_ = test_run:wait_vclock('replica', vclock)
-- Check that replace{1, 2} coming from the master was suppressed
-- by the before_replace trigger on the replica.
diff --git a/test/replication/gh-4114-local-space-replication.result b/test/replication/gh-4114-local-space-replication.result
new file mode 100644
index 000000000..e524c9a1b
--- /dev/null
+++ b/test/replication/gh-4114-local-space-replication.result
@@ -0,0 +1,125 @@
+-- test-run result file version 2
+env = require('test_run')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+
+--
+-- gh-4114. Account local space changes in a separate vclock
+-- component. Do not replicate local space changes, even as NOPs.
+--
+
+box.schema.user.grant('guest', 'replication')
+ | ---
+ | ...
+_ = box.schema.space.create('test', {is_local=true})
+ | ---
+ | ...
+_ = box.space.test:create_index("pk")
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default, script "replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica with wait=True, wait_load=True')
+ | ---
+ | - true
+ | ...
+
+a = box.info.vclock[0] or 0
+ | ---
+ | ...
+for i = 1,10 do box.space.test:insert{i} end
+ | ---
+ | ...
+box.info.vclock[0] == a + 10 or box.info.vclock[0] - a
+ | ---
+ | - true
+ | ...
+
+test_run:cmd('switch replica')
+ | ---
+ | - true
+ | ...
+box.info.vclock[0]
+ | ---
+ | - null
+ | ...
+box.cfg{checkpoint_count=1}
+ | ---
+ | ...
+box.space.test:select{}
+ | ---
+ | - []
+ | ...
+box.space.test:insert{1}
+ | ---
+ | - [1]
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+box.space.test:insert{2}
+ | ---
+ | - [2]
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+box.space.test:insert{3}
+ | ---
+ | - [3]
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+
+box.info.vclock[0]
+ | ---
+ | - 3
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+
+test_run:cmd('set variable repl_source to "replica.listen"')
+ | ---
+ | - true
+ | ...
+
+box.cfg{replication=repl_source}
+ | ---
+ | ...
+test_run:wait_cond(function()\
+ return box.info.replication[2].upstream and\
+ box.info.replication[2].upstream.status == 'follow'\
+ end,\
+ 10)
+ | ---
+ | - true
+ | ...
+
+-- Cleanup.
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+box.space.test:drop()
+ | ---
+ | ...
+box.schema.user.revoke('guest', 'replication')
+ | ---
+ | ...
diff --git a/test/replication/gh-4114-local-space-replication.test.lua b/test/replication/gh-4114-local-space-replication.test.lua
new file mode 100644
index 000000000..26dccee68
--- /dev/null
+++ b/test/replication/gh-4114-local-space-replication.test.lua
@@ -0,0 +1,48 @@
+env = require('test_run')
+test_run = env.new()
+
+--
+-- gh-4114. Account local space changes in a separate vclock
+-- component. Do not replicate local space changes, even as NOPs.
+--
+
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('test', {is_local=true})
+_ = box.space.test:create_index("pk")
+
+test_run:cmd('create server replica with rpl_master=default, script "replication/replica.lua"')
+test_run:cmd('start server replica with wait=True, wait_load=True')
+
+a = box.info.vclock[0] or 0
+for i = 1,10 do box.space.test:insert{i} end
+box.info.vclock[0] == a + 10 or box.info.vclock[0] - a
+
+test_run:cmd('switch replica')
+box.info.vclock[0]
+box.cfg{checkpoint_count=1}
+box.space.test:select{}
+box.space.test:insert{1}
+box.snapshot()
+box.space.test:insert{2}
+box.snapshot()
+box.space.test:insert{3}
+box.snapshot()
+
+box.info.vclock[0]
+
+test_run:cmd('switch default')
+
+test_run:cmd('set variable repl_source to "replica.listen"')
+
+box.cfg{replication=repl_source}
+test_run:wait_cond(function()\
+ return box.info.replication[2].upstream and\
+ box.info.replication[2].upstream.status == 'follow'\
+ end,\
+ 10)
+
+-- Cleanup.
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+box.space.test:drop()
+box.schema.user.revoke('guest', 'replication')
diff --git a/test/replication/local_spaces.result b/test/replication/local_spaces.result
index cf2c52010..4855d8a88 100644
--- a/test/replication/local_spaces.result
+++ b/test/replication/local_spaces.result
@@ -288,6 +288,10 @@ _ = s3:insert{3}
vclock = test_run:get_vclock('default')
---
...
+-- Ignore 0-th component when waiting. They don't match.
+vclock[0] = nil
+---
+...
_ = test_run:wait_vclock('replica', vclock)
---
...
diff --git a/test/replication/local_spaces.test.lua b/test/replication/local_spaces.test.lua
index 373e2cd20..c5e224030 100644
--- a/test/replication/local_spaces.test.lua
+++ b/test/replication/local_spaces.test.lua
@@ -112,6 +112,9 @@ _ = s1:insert{3}
_ = s2:insert{3}
_ = s3:insert{3}
vclock = test_run:get_vclock('default')
+
+-- Ignore 0-th component when waiting. They don't match.
+vclock[0] = nil
_ = test_run:wait_vclock('replica', vclock)
test_run:cmd("switch replica")
diff --git a/test/replication/misc.result b/test/replication/misc.result
index b63d72846..e5d1f560e 100644
--- a/test/replication/misc.result
+++ b/test/replication/misc.result
@@ -214,6 +214,9 @@ box.space.space1:select{}
vclock = test_run:get_vclock("autobootstrap1")
---
...
+vclock[0] = nil
+---
+...
_ = test_run:wait_vclock("autobootstrap2", vclock)
---
...
@@ -414,6 +417,9 @@ while box.info.replication[2] == nil do fiber.sleep(0.01) end
vclock = test_run:get_vclock('default')
---
...
+vclock[0] = nil
+---
+...
_ = test_run:wait_vclock('replica_auth', vclock)
---
...
diff --git a/test/replication/misc.test.lua b/test/replication/misc.test.lua
index c454a0992..d285b014a 100644
--- a/test/replication/misc.test.lua
+++ b/test/replication/misc.test.lua
@@ -88,6 +88,7 @@ c.space.space1:insert{box.NULL, "data"} -- fails, but bumps sequence value
c.space.space1:insert{box.NULL, 1, "data"}
box.space.space1:select{}
vclock = test_run:get_vclock("autobootstrap1")
+vclock[0] = nil
_ = test_run:wait_vclock("autobootstrap2", vclock)
test_run:cmd("switch autobootstrap2")
box.space.space1:select{}
@@ -172,6 +173,7 @@ box.schema.user.grant('cluster', 'replication')
while box.info.replication[2] == nil do fiber.sleep(0.01) end
vclock = test_run:get_vclock('default')
+vclock[0] = nil
_ = test_run:wait_vclock('replica_auth', vclock)
test_run:cmd("stop server replica_auth")
diff --git a/test/replication/quorum.result b/test/replication/quorum.result
index 07abe7f2a..5ef66bf0a 100644
--- a/test/replication/quorum.result
+++ b/test/replication/quorum.result
@@ -325,6 +325,9 @@ space:insert{2}
vclock = test_run:get_vclock("default")
---
...
+vclock[0] = nil
+---
+...
_ = test_run:wait_vclock("replica", vclock)
---
...
@@ -390,6 +393,9 @@ box.cfg{replication = repl}
vclock = test_run:get_vclock("master_quorum1")
---
...
+vclock[0] = nil
+---
+...
_ = test_run:wait_vclock("master_quorum2", vclock)
---
...
diff --git a/test/replication/quorum.test.lua b/test/replication/quorum.test.lua
index 5f2872675..da24f34a0 100644
--- a/test/replication/quorum.test.lua
+++ b/test/replication/quorum.test.lua
@@ -125,6 +125,7 @@ test_run:cmd("switch default")
box.cfg{listen = listen}
space:insert{2}
vclock = test_run:get_vclock("default")
+vclock[0] = nil
_ = test_run:wait_vclock("replica", vclock)
test_run:cmd("switch replica")
box.info.status -- running
@@ -145,6 +146,7 @@ box.cfg{replication = ""}
box.space.test:insert{1}
box.cfg{replication = repl}
vclock = test_run:get_vclock("master_quorum1")
+vclock[0] = nil
_ = test_run:wait_vclock("master_quorum2", vclock)
test_run:cmd("switch master_quorum2")
box.space.test:select()
diff --git a/test/replication/replica_rejoin.result b/test/replication/replica_rejoin.result
index b8ed79f14..dd04ae297 100644
--- a/test/replication/replica_rejoin.result
+++ b/test/replication/replica_rejoin.result
@@ -144,6 +144,9 @@ for i = 10, 30, 10 do box.space.test:update(i, {{'!', 1, i}}) end
vclock = test_run:get_vclock('default')
---
...
+vclock[0] = nil
+---
+...
_ = test_run:wait_vclock('replica', vclock)
---
...
@@ -295,6 +298,9 @@ for i = 1, 10 do box.space.test:replace{2} end
vclock = test_run:get_vclock('replica')
---
...
+vclock[0] = nil
+---
+...
_ = test_run:wait_vclock('default', vclock)
---
...
@@ -345,6 +351,9 @@ for i = 1, 10 do box.space.test:replace{2} end
vclock = test_run:get_vclock('replica')
---
...
+vclock[0] = nil
+---
+...
_ = test_run:wait_vclock('default', vclock)
---
...
diff --git a/test/replication/replica_rejoin.test.lua b/test/replication/replica_rejoin.test.lua
index 25c0849ec..410ef44d7 100644
--- a/test/replication/replica_rejoin.test.lua
+++ b/test/replication/replica_rejoin.test.lua
@@ -55,6 +55,7 @@ test_run:cmd("switch default")
-- Make sure the replica follows new changes.
for i = 10, 30, 10 do box.space.test:update(i, {{'!', 1, i}}) end
vclock = test_run:get_vclock('default')
+vclock[0] = nil
_ = test_run:wait_vclock('replica', vclock)
test_run:cmd("switch replica")
box.space.test:select()
@@ -109,6 +110,7 @@ box.space.test:replace{1}
test_run:cmd("switch replica")
for i = 1, 10 do box.space.test:replace{2} end
vclock = test_run:get_vclock('replica')
+vclock[0] = nil
_ = test_run:wait_vclock('default', vclock)
-- Restart the master and force garbage collection.
test_run:cmd("switch default")
@@ -126,6 +128,7 @@ test_run:wait_cond(function() return #fio.glob(fio.pathjoin(box.cfg.wal_dir, '*.
test_run:cmd("switch replica")
for i = 1, 10 do box.space.test:replace{2} end
vclock = test_run:get_vclock('replica')
+vclock[0] = nil
_ = test_run:wait_vclock('default', vclock)
-- Restart the replica. It should successfully rebootstrap.
test_run:cmd("restart server replica with args='true'")
diff --git a/test/replication/skip_conflict_row.result b/test/replication/skip_conflict_row.result
index 9b2777872..d70ac8e2a 100644
--- a/test/replication/skip_conflict_row.result
+++ b/test/replication/skip_conflict_row.result
@@ -54,6 +54,9 @@ box.info.status
vclock = test_run:get_vclock('default')
---
...
+vclock[0] = nil
+---
+...
_ = test_run:wait_vclock("replica", vclock)
---
...
diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
index 2982c730a..04fd08136 100644
--- a/test/replication/skip_conflict_row.test.lua
+++ b/test/replication/skip_conflict_row.test.lua
@@ -19,6 +19,7 @@ space:insert{2}
box.info.status
vclock = test_run:get_vclock('default')
+vclock[0] = nil
_ = test_run:wait_vclock("replica", vclock)
test_run:cmd("switch replica")
box.info.replication[1].upstream.message
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 90fd53ca6..0907ac17f 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -12,6 +12,7 @@
"on_schema_init.test.lua": {},
"long_row_timeout.test.lua": {},
"join_without_snap.test.lua": {},
+ "gh-4114-local-space-replication.test.lua": {},
"gh-4402-info-errno.test.lua": {},
"gh-4605-empty-password.test.lua": {},
"gh-4606-admin-creds.test.lua": {},
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index 2635da265..2bd701f70 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -1241,6 +1241,11 @@ test_run:cmd("switch default")
vclock = test_run:get_vclock("default")
---
...
+-- Ignore 0-th vclock component. They don't match between
+-- replicas.
+vclock[0] = nil
+---
+...
_ = test_run:wait_vclock("replica", vclock)
---
...
diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua
index 4230cfae3..750d3bfe8 100644
--- a/test/vinyl/errinj.test.lua
+++ b/test/vinyl/errinj.test.lua
@@ -455,6 +455,10 @@ box.cfg{read_only = true}
test_run:cmd("switch default")
vclock = test_run:get_vclock("default")
+
+-- Ignore 0-th vclock component. They don't match between
+-- replicas.
+vclock[0] = nil
_ = test_run:wait_vclock("replica", vclock)
test_run:cmd("stop server replica")
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 0/5] replication: fix local space tracking
2020-03-18 19:47 [Tarantool-patches] [PATCH v2 0/5] replication: fix local space tracking Serge Petrenko
` (4 preceding siblings ...)
2020-03-18 19:47 ` [Tarantool-patches] [PATCH v2 5/5] box: start counting local space requests separately Serge Petrenko
@ 2020-03-18 21:12 ` Vladislav Shpilevoy
2020-03-19 8:17 ` Konstantin Osipov
6 siblings, 0 replies; 17+ messages in thread
From: Vladislav Shpilevoy @ 2020-03-18 21:12 UTC (permalink / raw)
To: Serge Petrenko, kostja.osipov, georgy; +Cc: tarantool-patches
Hi! Thanks for the patchset!
I will take a look after Kostja is ok with the changes.
So as not to do double work.
^ permalink raw reply [flat|nested] 17+ messages in thread
* Re: [Tarantool-patches] [PATCH v2 0/5] replication: fix local space tracking
2020-03-18 19:47 [Tarantool-patches] [PATCH v2 0/5] replication: fix local space tracking Serge Petrenko
` (5 preceding siblings ...)
2020-03-18 21:12 ` [Tarantool-patches] [PATCH v2 0/5] replication: fix local space tracking Vladislav Shpilevoy
@ 2020-03-19 8:17 ` Konstantin Osipov
6 siblings, 0 replies; 17+ messages in thread
From: Konstantin Osipov @ 2020-03-19 8:17 UTC (permalink / raw)
To: Serge Petrenko; +Cc: v.shpilevoy, tarantool-patches
* Serge Petrenko <sergepetrenko@tarantool.org> [20/03/18 22:52]:
> https://github.com/tarantool/tarantool/issues/4114
> https://github.com/tarantool/tarantool/tree/sp/gh-4114-local-space-replication
>
> The patchset contains 2 of Georgy's patches perorming WAL gc rework.
> The first patch introduces a matrix clock structure, which contains the latest
> known vclocks of all the cluster members and can be used to build a vclock,
> which is less than or equal to all the member vclocks.
> Such a vclock is used then in the second patch, where gc is rewritten to stop
> tracking needed WAL files by vclock signature. Now replica vclocks are used to
> define which files are still needed.
One more follow up. While I don't like the approach with matrix
clock, I accept there may be an issue with the current way GC
subsystem tracks replicas.
Could you add issue description to the changeset comment? What's
the problem with using clock signature to keep track of relays?
And if there is an issue with gc signature, let's switch to
vclocks instead, but why build a matrix?
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 17+ messages in thread