[tarantool-patches] [PATCH v3 1/4] relay: adjust gc state on relay status update
Georgy Kirichenko
georgy at tarantool.org
Wed Oct 9 19:45:43 MSK 2019
Don't use on_close_log trigger to track xlog file boundaries. As we
intend implement in-memory replication relay could have no more xlog
file operations and couldn't rely on previous trigger invocations. Now
the consumer state is advanced together with relay vclock. After
parallel applier implementation relay wouldn't receive an ACK packet for
each transaction (because an applier groups them) so it should not be
too expensive to advance gc on each relay vclock update.
Note: this changes cluster gc behavior - an instance gc will hold
only it's locally generated transaction. Also it is only a
temporary solution until relay processing would be merged with
a wal writer context when wal will process relay ACK requests
as well as log writing and redundancy evaluating.
Part of #3794
---
src/box/gc.c | 7 +--
src/box/relay.cc | 108 +----------------------------------------------
2 files changed, 6 insertions(+), 109 deletions(-)
diff --git a/src/box/gc.c b/src/box/gc.c
index e0df92473..649932801 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -57,6 +57,7 @@
#include "engine.h" /* engine_collect_garbage() */
#include "wal.h" /* wal_collect_garbage() */
#include "checkpoint_schedule.h"
+#include "replication.h"
struct gc_state gc;
@@ -72,9 +73,9 @@ gc_checkpoint_fiber_f(va_list);
static inline int
gc_consumer_cmp(const struct gc_consumer *a, const struct gc_consumer *b)
{
- if (vclock_sum(&a->vclock) < vclock_sum(&b->vclock))
+ if (vclock_get(&a->vclock, instance_id) < vclock_get(&b->vclock, instance_id))
return -1;
- if (vclock_sum(&a->vclock) > vclock_sum(&b->vclock))
+ if (vclock_get(&a->vclock, instance_id) > vclock_get(&b->vclock, instance_id))
return 1;
if ((intptr_t)a < (intptr_t)b)
return -1;
@@ -562,7 +563,7 @@ gc_consumer_advance(struct gc_consumer *consumer, const struct vclock *vclock)
*/
struct gc_consumer *next = gc_tree_next(&gc.consumers, consumer);
bool update_tree = (next != NULL &&
- signature >= vclock_sum(&next->vclock));
+ vclock_get(vclock, instance_id) >= vclock_get(&next->vclock, instance_id));
if (update_tree)
gc_tree_remove(&gc.consumers, consumer);
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a19abf6a9..21674119d 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -66,23 +66,6 @@ struct relay_status_msg {
struct vclock vclock;
};
-/**
- * Cbus message to update replica gc state in tx thread.
- */
-struct relay_gc_msg {
- /** Parent */
- struct cmsg msg;
- /**
- * Link in the list of pending gc messages,
- * see relay::pending_gc.
- */
- struct stailq_entry in_pending;
- /** Relay instance */
- struct relay *relay;
- /** Vclock to advance to */
- struct vclock vclock;
-};
-
/** State of a replication relay. */
struct relay {
/** The thread in which we relay data to the replica. */
@@ -123,11 +106,6 @@ struct relay {
struct cpipe relay_pipe;
/** Status message */
struct relay_status_msg status_msg;
- /**
- * List of garbage collection messages awaiting
- * confirmation from the replica.
- */
- struct stailq pending_gc;
/** Time when last row was sent to peer. */
double last_row_time;
/** Relay sync state. */
@@ -185,7 +163,6 @@ relay_new(struct replica *replica)
relay->last_row_time = ev_monotonic_now(loop());
fiber_cond_create(&relay->reader_cond);
diag_create(&relay->diag);
- stailq_create(&relay->pending_gc);
relay->state = RELAY_OFF;
return relay;
}
@@ -241,12 +218,6 @@ relay_exit(struct relay *relay)
static void
relay_stop(struct relay *relay)
{
- struct relay_gc_msg *gc_msg, *next_gc_msg;
- stailq_foreach_entry_safe(gc_msg, next_gc_msg,
- &relay->pending_gc, in_pending) {
- free(gc_msg);
- }
- stailq_create(&relay->pending_gc);
if (relay->r != NULL)
recovery_delete(relay->r);
relay->r = NULL;
@@ -370,6 +341,7 @@ tx_status_update(struct cmsg *msg)
{
struct relay_status_msg *status = (struct relay_status_msg *)msg;
vclock_copy(&status->relay->tx.vclock, &status->vclock);
+ gc_consumer_advance(status->relay->replica->gc, &status->vclock);
static const struct cmsg_hop route[] = {
{relay_status_update, NULL}
};
@@ -377,73 +349,6 @@ tx_status_update(struct cmsg *msg)
cpipe_push(&status->relay->relay_pipe, msg);
}
-/**
- * Update replica gc state in tx thread.
- */
-static void
-tx_gc_advance(struct cmsg *msg)
-{
- struct relay_gc_msg *m = (struct relay_gc_msg *)msg;
- gc_consumer_advance(m->relay->replica->gc, &m->vclock);
- free(m);
-}
-
-static void
-relay_on_close_log_f(struct trigger *trigger, void * /* event */)
-{
- static const struct cmsg_hop route[] = {
- {tx_gc_advance, NULL}
- };
- struct relay *relay = (struct relay *)trigger->data;
- struct relay_gc_msg *m = (struct relay_gc_msg *)malloc(sizeof(*m));
- if (m == NULL) {
- say_warn("failed to allocate relay gc message");
- return;
- }
- cmsg_init(&m->msg, route);
- m->relay = relay;
- vclock_copy(&m->vclock, &relay->r->vclock);
- /*
- * Do not invoke garbage collection until the replica
- * confirms that it has received data stored in the
- * sent xlog.
- */
- stailq_add_tail_entry(&relay->pending_gc, m, in_pending);
-}
-
-/**
- * Invoke pending garbage collection requests.
- *
- * This function schedules the most recent gc message whose
- * vclock is less than or equal to the given one. Older
- * messages are discarded as their job will be done by the
- * scheduled message anyway.
- */
-static inline void
-relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
-{
- struct relay_gc_msg *curr, *next, *gc_msg = NULL;
- stailq_foreach_entry_safe(curr, next, &relay->pending_gc, in_pending) {
- /*
- * We may delete a WAL file only if its vclock is
- * less than or equal to the vclock acknowledged by
- * the replica. Even if the replica's signature is
- * is greater, but the vclocks are incomparable, we
- * must not delete the WAL, because there may still
- * be rows not applied by the replica in it while
- * the greater signatures is due to changes pulled
- * from other members of the cluster.
- */
- if (vclock_compare(&curr->vclock, vclock) > 0)
- break;
- stailq_shift(&relay->pending_gc);
- free(gc_msg);
- gc_msg = curr;
- }
- if (gc_msg != NULL)
- cpipe_push(&relay->tx_pipe, &gc_msg->msg);
-}
-
static void
relay_set_error(struct relay *relay, struct error *e)
{
@@ -545,12 +450,6 @@ relay_subscribe_f(va_list ap)
cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
&relay->relay_pipe, NULL, NULL, cbus_process);
- /* Setup garbage collection trigger. */
- struct trigger on_close_log = {
- RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL
- };
- trigger_add(&r->on_close_log, &on_close_log);
-
/* Setup WAL watcher for sending new rows to the replica. */
wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
relay_process_wal_event, cbus_process);
@@ -614,8 +513,6 @@ relay_subscribe_f(va_list ap)
vclock_copy(&relay->status_msg.vclock, send_vclock);
relay->status_msg.relay = relay;
cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
- /* Collect xlog files received by the replica. */
- relay_schedule_pending_gc(relay, send_vclock);
}
/*
@@ -627,8 +524,7 @@ relay_subscribe_f(va_list ap)
diag_log();
say_crit("exiting the relay loop");
- /* Clear garbage collector trigger and WAL watcher. */
- trigger_clear(&on_close_log);
+ /* Clear WAL watcher. */
wal_clear_watcher(&relay->wal_watcher, cbus_process);
/* Join ack reader fiber. */
--
2.23.0
More information about the Tarantool-patches
mailing list