[tarantool-patches] [PATCH 5/7] Replication: in memory replication
Georgy Kirichenko
georgy at tarantool.org
Tue Aug 13 09:27:43 MSK 2019
Relay uses both xlog files and wal memory relaying to feed a replica
with transaction data. New workflow is to read xlog files until the last
one and then switch to wal memory. If relay is out of wal in-memory data
then relay returns to a file mode. A heartbeat is sent only while
in-memory mode from wal thread because there are always data in files.
Closes #3794
---
src/box/gc.c | 3 +
src/box/gc.h | 2 +
src/box/relay.cc | 161 +++++++++++++++++++++++-----------------------
src/box/wal.c | 154 +++++++++++++++++++++++++++++++++++++++++++-
src/box/wal.h | 6 ++
src/box/wal_mem.c | 10 +--
6 files changed, 249 insertions(+), 87 deletions(-)
diff --git a/src/box/gc.c b/src/box/gc.c
index 9771e407a..c36c2ace4 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -126,6 +126,8 @@ gc_init(const char *wal_dir_name)
xdir_scan(&gc.wal_dir);
gc.log_opened = false;
gc_tree_new(&gc.consumers);
+ xdir_create(&gc.xdir, wal_dir_name, XLOG, &INSTANCE_UUID,
+ &xlog_opts_default);
fiber_cond_create(&gc.cleanup_cond);
checkpoint_schedule_cfg(&gc.checkpoint_schedule, 0, 0);
@@ -166,6 +168,7 @@ gc_free(void)
gc_consumer_delete(consumer);
consumer = next;
}
+ xdir_destroy(&gc.xdir);
}
/**
diff --git a/src/box/gc.h b/src/box/gc.h
index 9b38a0c06..f28b716b5 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -125,6 +125,8 @@ struct gc_state {
/** True if log is opened. */
bool log_opened;
/** Registered consumers, linked by gc_consumer::node. */
+ /** xdir to track wal files. */
+ struct xdir xdir;
gc_tree_t consumers;
/** Fiber responsible for periodic checkpointing. */
struct fiber *checkpoint_fiber;
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a1b841291..05fc0f691 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -84,8 +84,6 @@ struct relay {
struct replica *replica;
/** WAL event watcher. */
struct wal_watcher wal_watcher;
- /** Relay reader cond. */
- struct fiber_cond reader_cond;
/** Relay diagnostics. */
struct diag diag;
/** Vclock recieved from replica. */
@@ -116,6 +114,9 @@ struct relay {
/** Relay sync state. */
enum relay_state state;
+ struct vclock relay_vclock;
+ char *wal_dir;
+
struct {
/* Align to prevent false-sharing with tx thread */
alignas(CACHELINE_SIZE)
@@ -165,7 +166,6 @@ relay_new(struct replica *replica)
return NULL;
}
relay->replica = replica;
- fiber_cond_create(&relay->reader_cond);
diag_create(&relay->diag);
stailq_create(&relay->pending_gc);
relay->state = RELAY_OFF;
@@ -215,7 +215,8 @@ relay_exit(struct relay *relay)
* cursor, which must be closed in the same thread
* that opened it (it uses cord's slab allocator).
*/
- recovery_delete(relay->r);
+ if (relay->r != NULL)
+ recovery_delete(relay->r);
relay->r = NULL;
}
@@ -239,7 +240,6 @@ relay_delete(struct relay *relay)
{
if (relay->state == RELAY_FOLLOW)
relay_stop(relay);
- fiber_cond_destroy(&relay->reader_cond);
diag_destroy(&relay->diag);
TRASH(relay);
free(relay);
@@ -361,31 +361,6 @@ relay_set_error(struct relay *relay, struct error *e)
diag_add_error(&relay->diag, e);
}
-static void
-relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
-{
- struct relay *relay = container_of(watcher, struct relay, wal_watcher);
- if (fiber_is_cancelled()) {
- /*
- * The relay is exiting. Rescanning the WAL at this
- * point would be pointless and even dangerous,
- * because the relay could have written a packet
- * fragment to the socket before being cancelled
- * so that writing another row to the socket would
- * lead to corrupted replication stream and, as
- * a result, permanent replication breakdown.
- */
- return;
- }
- try {
- recover_remaining_wals(relay->r, &relay->stream, NULL,
- (events & WAL_EVENT_ROTATE) != 0);
- } catch (Exception *e) {
- relay_set_error(relay, e);
- fiber_cancel(fiber());
- }
-}
-
/*
* Relay reader fiber function.
* Read xrow encoded vclocks sent by the replica.
@@ -408,7 +383,24 @@ relay_reader_f(va_list ap)
/* vclock is followed while decoding, zeroing it. */
vclock_create(&relay->recv_vclock);
xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
- fiber_cond_signal(&relay->reader_cond);
+ if (relay->status_msg.msg.route != NULL)
+ continue;
+ struct vclock *send_vclock;
+ if (relay->version_id < version_id(1, 7, 4))
+ send_vclock = &relay->r->vclock;
+ else
+ send_vclock = &relay->recv_vclock;
+ if (vclock_sum(&relay->status_msg.vclock) ==
+ vclock_sum(send_vclock))
+ continue;
+ static const struct cmsg_hop route[] = {
+ {tx_status_update, NULL}
+ };
+ cmsg_init(&relay->status_msg.msg, route);
+ vclock_copy(&relay->status_msg.vclock,
+ send_vclock);
+ relay->status_msg.relay = relay;
+ cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
}
} catch (Exception *e) {
relay_set_error(relay, e);
@@ -434,6 +426,27 @@ relay_send_heartbeat(struct relay *relay)
}
}
+static int
+relay_send_cb(struct xrow_header *row, void *data)
+{
+ try {
+ struct relay *relay = (struct relay *)data;
+ relay_send_row(&relay->stream, row);
+ return 0;
+ } catch (Exception *e) {
+ return -1;
+ }
+}
+
+static void
+relay_endpoint_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
+{
+ (void) loop;
+ (void) events;
+ struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data;
+ cbus_process(endpoint);
+}
+
/**
* A libev callback invoked when a relay client socket is ready
* for read. This currently only happens when the client closes
@@ -443,21 +456,16 @@ static int
relay_subscribe_f(va_list ap)
{
struct relay *relay = va_arg(ap, struct relay *);
- struct recovery *r = relay->r;
coio_enable();
relay_set_cord_name(relay->io.fd);
/* Create cpipe to tx for propagating vclock. */
cbus_endpoint_create(&relay->endpoint, tt_sprintf("relay_%p", relay),
- fiber_schedule_cb, fiber());
+ relay_endpoint_cb, &relay->endpoint);
cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
&relay->relay_pipe, NULL, NULL, cbus_process);
- /* Setup WAL watcher for sending new rows to the replica. */
- wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
- relay_process_wal_event, cbus_process);
-
/* Start fiber for receiving replica acks. */
char name[FIBER_NAME_MAX];
snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
@@ -473,50 +481,39 @@ relay_subscribe_f(va_list ap)
*/
relay_send_heartbeat(relay);
- /*
- * Run the event loop until the connection is broken
- * or an error occurs.
- */
while (!fiber_is_cancelled()) {
- double timeout = replication_timeout;
- struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
- ERRINJ_DOUBLE);
- if (inj != NULL && inj->dparam != 0)
- timeout = inj->dparam;
-
- fiber_cond_wait_deadline(&relay->reader_cond,
- relay->last_row_time + timeout);
-
/*
- * The fiber can be woken by IO cancel, by a timeout of
- * status messaging or by an acknowledge to status message.
- * Handle cbus messages first.
+ * Start relaying data. First we recover all existing wal
+ * files then try to continue data streaming from wal memory.
+ *
+ * NOTE: it looks more efficient if we try to relay from wal
+ * first but it breaks tests behavior working around collected
+ * logs. As it has only little impact we could fix it in
+ * future.
*/
- cbus_process(&relay->endpoint);
- /* Check for a heartbeat timeout. */
- if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
- relay_send_heartbeat(relay);
+ try {
+ relay->r = recovery_new(relay->wal_dir, false,
+ &relay->relay_vclock);
+ auto relay_guard = make_scoped_guard([&] {
+ recovery_delete(relay->r);
+ relay->r = NULL;
+ });
+ recover_remaining_wals(relay->r, &relay->stream,
+ NULL, true);
+ } catch (Exception *e) {
+ e->log();
+ relay_set_error(relay, e);
+ break;
+ }
/*
- * Check that the vclock has been updated and the previous
- * status message is delivered
+ * All known 'file' data were sent it is high time to
+ * switch to wal memory relaying.
*/
- if (relay->status_msg.msg.route != NULL)
- continue;
- struct vclock *send_vclock;
- if (relay->version_id < version_id(1, 7, 4))
- send_vclock = &r->vclock;
- else
- send_vclock = &relay->recv_vclock;
- if (vclock_sum(&relay->status_msg.vclock) ==
- vclock_sum(send_vclock))
- continue;
- static const struct cmsg_hop route[] = {
- {tx_status_update, NULL}
+ if (wal_relay(&relay->relay_vclock, relay_send_cb, relay,
+ tt_sprintf("relay_%p", relay)) != 0) {
+ relay_set_error(relay, diag_last_error(&fiber()->diag));
+ break;
};
- cmsg_init(&relay->status_msg.msg, route);
- vclock_copy(&relay->status_msg.vclock, send_vclock);
- relay->status_msg.relay = relay;
- cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
}
/*
@@ -528,9 +525,6 @@ relay_subscribe_f(va_list ap)
diag_log();
say_crit("exiting the relay loop");
- /* Clear WAL watcher. */
- wal_clear_watcher(&relay->wal_watcher, cbus_process);
-
/* Join ack reader fiber. */
fiber_cancel(reader);
fiber_join(reader);
@@ -557,6 +551,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
* unless it has already been registered by initial
* join.
*/
+ vclock_copy(&relay->relay_vclock, replica_clock);
if (replica->gc == NULL) {
replica->gc = gc_consumer_register(replica_clock, "replica %s",
tt_uuid_str(&replica->uuid));
@@ -571,15 +566,16 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
});
vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
- relay->r = recovery_new(cfg_gets("wal_dir"), false,
- replica_clock);
+ relay->wal_dir = strdup(cfg_gets("wal_dir"));
vclock_copy(&relay->tx.vclock, replica_clock);
relay->version_id = replica_version_id;
+ relay->r = NULL;
int rc = cord_costart(&relay->cord, "subscribe",
relay_subscribe_f, relay);
if (rc == 0)
rc = cord_cojoin(&relay->cord);
+ free(relay->wal_dir);
if (rc != 0)
diag_raise();
}
@@ -616,7 +612,10 @@ static void
relay_send_row(struct xstream *stream, struct xrow_header *packet)
{
struct relay *relay = container_of(stream, struct relay, stream);
- assert(iproto_type_is_dml(packet->type));
+ if (packet->type != IPROTO_OK) {
+ assert(iproto_type_is_dml(packet->type));
+ vclock_follow_xrow(&relay->relay_vclock, packet);
+ }
/*
* Transform replica local requests to IPROTO_NOP so as to
* promote vclock on the replica without actually modifying
diff --git a/src/box/wal.c b/src/box/wal.c
index 6cdb0db15..0457f3d46 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -159,6 +159,8 @@ struct wal_writer
struct rlist watchers;
/** Wal memory buffer. */
struct wal_mem wal_mem;
+ /** Wal memory condition. */
+ struct fiber_cond wal_mem_cond;
};
enum wal_msg_type {
@@ -1065,6 +1067,7 @@ wal_write_batch(struct wal_writer *writer, struct wal_msg *wal_msg)
writer->checkpoint_wal_size += rc;
last_committed = stailq_last(&wal_msg->commit);
vclock_merge(&writer->vclock, &vclock_diff);
+ fiber_cond_broadcast(&writer->wal_mem_cond);
/*
* Notify TX if the checkpoint threshold has been exceeded.
@@ -1163,7 +1166,9 @@ wal_cord_f(va_list ap)
{
(void) ap;
struct wal_writer *writer = &wal_writer_singleton;
+ fiber_cond_create(&writer->wal_mem_cond);
wal_mem_create(&writer->wal_mem);
+ wal_mem_svp(&writer->wal_mem, &writer->vclock);
/** Initialize eio in this thread */
coio_enable();
@@ -1457,7 +1462,8 @@ wal_set_watcher(struct wal_watcher *watcher, const char *name,
{ wal_watcher_notify_perform, &watcher->wal_pipe };
watcher->route[1] = (struct cmsg_hop)
{ wal_watcher_notify_complete, NULL };
- cbus_pair("wal", name, &watcher->wal_pipe, &watcher->watcher_pipe,
+
+ cbus_pair("wal", name, &watcher->wal_pipe, &watcher->watcher_pipe,
wal_watcher_attach, watcher, process_cb);
}
@@ -1494,3 +1500,149 @@ wal_atfork()
if (xlog_is_open(&vy_log_writer.xlog))
xlog_atfork(&vy_log_writer.xlog);
}
+
+struct wal_relay_msg {
+ struct cmsg base;
+ struct cpipe wal_pipe;
+ struct cpipe relay_pipe;
+
+ struct vclock *vclock;
+ wal_relay_cb on_wal_relay;
+ void *cb_data;
+ struct fiber *fiber;
+ struct cmsg cancel_msg;
+ struct fiber_cond done_cond;
+ bool done;
+ int rc;
+ struct diag diag;
+};
+
+static void
+wal_relay_done(struct cmsg *base)
+{
+ struct wal_relay_msg *msg =
+ container_of(base, struct wal_relay_msg, base);
+ msg->done = true;
+ fiber_cond_signal(&msg->done_cond);
+}
+
+static int
+wal_relay_f(va_list ap)
+{
+ struct wal_writer *writer = &wal_writer_singleton;
+ struct wal_relay_msg *msg = va_arg(ap, struct wal_relay_msg *);
+ struct vclock *vclock = msg->vclock;
+ wal_relay_cb on_wal_relay = msg->on_wal_relay;
+ void *cb_data = msg->cb_data;
+
+ double last_row_time = ev_monotonic_now(loop());
+
+ struct wal_mem_cursor cursor;
+ if (wal_mem_cursor_create(&writer->wal_mem, &cursor, vclock) != 0)
+ goto done;
+ while (!fiber_is_cancelled()) {
+ struct xrow_header *row;
+ void *data;
+ size_t size;
+ int rc = wal_mem_cursor_next(&writer->wal_mem, &cursor,
+ &row, &data, &size);
+ if (rc < 0) {
+ /* Outdated cursor. */
+ break;
+ }
+ if (rc == 0 && vclock_get(vclock, row->replica_id) >= row->lsn)
+ continue;
+ if (rc > 0) {
+ double timeout = replication_timeout;
+ struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
+ ERRINJ_DOUBLE);
+ if (inj != NULL && inj->dparam != 0)
+ timeout = inj->dparam;
+
+ /*
+ * Nothing to send so wait for the next row
+ * and send a hearth beat if timeout exceeded.
+ */
+ fiber_cond_wait_deadline(&writer->wal_mem_cond,
+ last_row_time + timeout);
+ if (fiber_is_cancelled())
+ break;
+ if (ev_monotonic_now(loop()) - last_row_time >
+ timeout) {
+ struct xrow_header hearth_beat;
+ xrow_encode_timestamp(&hearth_beat, instance_id,
+ ev_now(loop()));
+ row = &hearth_beat;
+ } else
+ continue;
+ }
+ last_row_time = ev_monotonic_now(loop());
+ if (on_wal_relay(row, cb_data) != 0) {
+ diag_move(&fiber()->diag, &msg->diag);
+ break;
+ }
+ }
+ static struct cmsg_hop done_route[] = {
+ {wal_relay_done, NULL}
+ };
+done:
+ cmsg_init(&msg->base, done_route);
+ cpipe_push(&msg->relay_pipe, &msg->base);
+ msg->fiber = NULL;
+ return 0;
+}
+
+static void
+wal_relay_attach(void *data)
+{
+ struct wal_relay_msg *msg = (struct wal_relay_msg *)data;
+ msg->fiber = fiber_new("wal relay fiber", wal_relay_f);
+ fiber_start(msg->fiber, msg);
+}
+
+static void
+wal_relay_cancel(struct cmsg *base)
+{
+ struct wal_relay_msg *msg = container_of(base, struct wal_relay_msg,
+ cancel_msg);
+ if (msg->fiber != NULL)
+ fiber_cancel(msg->fiber);
+}
+
+int
+wal_relay(struct vclock *vclock, wal_relay_cb on_wal_relay, void *cb_data,
+ const char *endpoint_name)
+{
+ struct wal_relay_msg wal_relay_msg;
+ wal_relay_msg.vclock = vclock;
+ wal_relay_msg.on_wal_relay = on_wal_relay;
+ wal_relay_msg.cb_data = cb_data;
+ diag_create(&wal_relay_msg.diag);
+ wal_relay_msg.cancel_msg.route = NULL;
+
+ fiber_cond_create(&wal_relay_msg.done_cond);
+ wal_relay_msg.done = false;
+
+ cbus_pair("wal", endpoint_name, &wal_relay_msg.wal_pipe,
+ &wal_relay_msg.relay_pipe,
+ wal_relay_attach, &wal_relay_msg, cbus_process);
+
+ while (!wal_relay_msg.done) {
+ if (fiber_is_cancelled() &&
+ wal_relay_msg.cancel_msg.route == NULL) {
+ static struct cmsg_hop cancel_route[]= {
+ {wal_relay_cancel, NULL}};
+ cmsg_init(&wal_relay_msg.cancel_msg, cancel_route);
+ cpipe_push(&wal_relay_msg.wal_pipe, &wal_relay_msg.cancel_msg);
+ }
+ fiber_cond_wait(&wal_relay_msg.done_cond);
+ }
+
+ cbus_unpair(&wal_relay_msg.wal_pipe, &wal_relay_msg.relay_pipe,
+ NULL, NULL, cbus_process);
+ if (!diag_is_empty(&wal_relay_msg.diag)) {
+ diag_move(&wal_relay_msg.diag, &fiber()->diag);
+ return -1;
+ }
+ return 0;
+}
diff --git a/src/box/wal.h b/src/box/wal.h
index 1a7156d97..bd298cebe 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -235,6 +235,12 @@ wal_write_vy_log(struct journal_entry *req);
void
wal_rotate_vy_log();
+typedef int (*wal_relay_cb)(struct xrow_header *header, void *data);
+
+int
+wal_relay(struct vclock *vclock, wal_relay_cb on_wal_relay, void *cb_data,
+ const char *endpoint_name);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/wal_mem.c b/src/box/wal_mem.c
index fdfc6f93d..b01604d55 100644
--- a/src/box/wal_mem.c
+++ b/src/box/wal_mem.c
@@ -247,14 +247,13 @@ wal_mem_cursor_next(struct wal_mem *wal_mem,
}
struct wal_mem_buf *mem_buf;
- size_t last_row_index;
next_buffer:
mem_buf = wal_mem->buf +
- wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
- last_row_index = ibuf_used(&mem_buf->rows) /
- sizeof(struct wal_mem_buf_row);
- if (last_row_index == wal_mem_cursor->row_index) {
+ wal_mem_cursor->buf_index % WAL_MEM_BUF_COUNT;
+ size_t buf_row_count = ibuf_used(&mem_buf->rows) /
+ sizeof(struct wal_mem_buf_row);
+ if (buf_row_count == wal_mem_cursor->row_index) {
/* No more rows in the current buffer. */
if (wal_mem->last_buf_index == wal_mem_cursor->buf_index)
/* No more rows in the memory. */
@@ -269,5 +268,6 @@ next_buffer:
*row = &buf_row->xrow;
*data = buf_row->data;
*size = buf_row->size;
+ ++wal_mem_cursor->row_index;
return 0;
}
--
2.22.0
More information about the Tarantool-patches
mailing list