[tarantool-patches] [PATCH 5/7] Replication: in memory replication
Vladimir Davydov
vdavydov.dev at gmail.com
Wed Aug 21 16:52:02 MSK 2019
On Tue, Aug 13, 2019 at 09:27:43AM +0300, Georgy Kirichenko wrote:
> diff --git a/src/box/gc.h b/src/box/gc.h
> index 9b38a0c06..f28b716b5 100644
> --- a/src/box/gc.h
> +++ b/src/box/gc.h
> @@ -125,6 +125,8 @@ struct gc_state {
> /** True if log is opened. */
> bool log_opened;
> /** Registered consumers, linked by gc_consumer::node. */
> + /** xdir to track wal files. */
> + struct xdir xdir;
AFAICS this xdir is only created and destroyed - it isn't used anywhere.
> gc_tree_t consumers;
> /** Fiber responsible for periodic checkpointing. */
> struct fiber *checkpoint_fiber;
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index a1b841291..05fc0f691 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -116,6 +114,9 @@ struct relay {
> /** Relay sync state. */
> enum relay_state state;
>
> + struct vclock relay_vclock;
> + char *wal_dir;
> +
Comments, please. It would also be nice to update a comment to relay->r
to clarify when it can be NULL.
> struct {
> /* Align to prevent false-sharing with tx thread */
> alignas(CACHELINE_SIZE)
> @@ -434,6 +426,27 @@ relay_send_heartbeat(struct relay *relay)
> }
> }
>
> +static int
> +relay_send_cb(struct xrow_header *row, void *data)
> +{
> + try {
> + struct relay *relay = (struct relay *)data;
> + relay_send_row(&relay->stream, row);
> + return 0;
> + } catch (Exception *e) {
> + return -1;
> + }
> +}
> +
> +static void
> +relay_endpoint_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
> +{
> + (void) loop;
> + (void) events;
> + struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data;
> + cbus_process(endpoint);
> +}
I guess we could define this helper somewhere in cbus internals, just
like fiber_schedule_cb.
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 6cdb0db15..0457f3d46 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -1163,7 +1166,9 @@ wal_cord_f(va_list ap)
> {
> (void) ap;
> struct wal_writer *writer = &wal_writer_singleton;
> + fiber_cond_create(&writer->wal_mem_cond);
> wal_mem_create(&writer->wal_mem);
> + wal_mem_svp(&writer->wal_mem, &writer->vclock);
Looks liks this belongs to the previous patch.
>
> /** Initialize eio in this thread */
> coio_enable();
> @@ -1494,3 +1500,149 @@ wal_atfork()
> if (xlog_is_open(&vy_log_writer.xlog))
> xlog_atfork(&vy_log_writer.xlog);
> }
> +
> +struct wal_relay_msg {
> + struct cmsg base;
> + struct cpipe wal_pipe;
> + struct cpipe relay_pipe;
> +
> + struct vclock *vclock;
> + wal_relay_cb on_wal_relay;
> + void *cb_data;
> + struct fiber *fiber;
> + struct cmsg cancel_msg;
> + struct fiber_cond done_cond;
> + bool done;
> + int rc;
> + struct diag diag;
> +};
Comments, comments...
> +
> +static void
> +wal_relay_done(struct cmsg *base)
> +{
> + struct wal_relay_msg *msg =
> + container_of(base, struct wal_relay_msg, base);
> + msg->done = true;
> + fiber_cond_signal(&msg->done_cond);
> +}
> +
> +static int
> +wal_relay_f(va_list ap)
> +{
> + struct wal_writer *writer = &wal_writer_singleton;
> + struct wal_relay_msg *msg = va_arg(ap, struct wal_relay_msg *);
> + struct vclock *vclock = msg->vclock;
> + wal_relay_cb on_wal_relay = msg->on_wal_relay;
> + void *cb_data = msg->cb_data;
> +
> + double last_row_time = ev_monotonic_now(loop());
> +
> + struct wal_mem_cursor cursor;
> + if (wal_mem_cursor_create(&writer->wal_mem, &cursor, vclock) != 0)
> + goto done;
> + while (!fiber_is_cancelled()) {
> + struct xrow_header *row;
> + void *data;
> + size_t size;
> + int rc = wal_mem_cursor_next(&writer->wal_mem, &cursor,
> + &row, &data, &size);
> + if (rc < 0) {
> + /* Outdated cursor. */
> + break;
> + }
> + if (rc == 0 && vclock_get(vclock, row->replica_id) >= row->lsn)
> + continue;
> + if (rc > 0) {
> + double timeout = replication_timeout;
> + struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
> + ERRINJ_DOUBLE);
> + if (inj != NULL && inj->dparam != 0)
> + timeout = inj->dparam;
> +
> + /*
> + * Nothing to send so wait for the next row
> + * and send a hearth beat if timeout exceeded.
> + */
> + fiber_cond_wait_deadline(&writer->wal_mem_cond,
> + last_row_time + timeout);
> + if (fiber_is_cancelled())
> + break;
> + if (ev_monotonic_now(loop()) - last_row_time >
> + timeout) {
> + struct xrow_header hearth_beat;
> + xrow_encode_timestamp(&hearth_beat, instance_id,
s/hearth/heart
> + ev_now(loop()));
> + row = &hearth_beat;
> + } else
> + continue;
> + }
> + last_row_time = ev_monotonic_now(loop());
> + if (on_wal_relay(row, cb_data) != 0) {
> + diag_move(&fiber()->diag, &msg->diag);
> + break;
> + }
> + }
> + static struct cmsg_hop done_route[] = {
> + {wal_relay_done, NULL}
> + };
> +done:
> + cmsg_init(&msg->base, done_route);
> + cpipe_push(&msg->relay_pipe, &msg->base);
> + msg->fiber = NULL;
> + return 0;
> +}
> diff --git a/src/box/wal.h b/src/box/wal.h
> index 1a7156d97..bd298cebe 100644
> --- a/src/box/wal.h
> +++ b/src/box/wal.h
> @@ -235,6 +235,12 @@ wal_write_vy_log(struct journal_entry *req);
> void
> wal_rotate_vy_log();
>
> +typedef int (*wal_relay_cb)(struct xrow_header *header, void *data);
> +
> +int
> +wal_relay(struct vclock *vclock, wal_relay_cb on_wal_relay, void *cb_data,
> + const char *endpoint_name);
Comments, comments...
> +
> #if defined(__cplusplus)
> } /* extern "C" */
> #endif /* defined(__cplusplus) */
> diff --git a/src/box/wal_mem.c b/src/box/wal_mem.c
> index fdfc6f93d..b01604d55 100644
> --- a/src/box/wal_mem.c
> +++ b/src/box/wal_mem.c
> @@ -247,14 +247,13 @@ wal_mem_cursor_next(struct wal_mem *wal_mem,
> }
>
> struct wal_mem_buf *mem_buf;
> - size_t last_row_index;
>
> next_buffer:
> mem_buf = wal_mem->buf +
> - wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
> - last_row_index = ibuf_used(&mem_buf->rows) /
> - sizeof(struct wal_mem_buf_row);
> - if (last_row_index == wal_mem_cursor->row_index) {
> + wal_mem_cursor->buf_index % WAL_MEM_BUF_COUNT;
> + size_t buf_row_count = ibuf_used(&mem_buf->rows) /
> + sizeof(struct wal_mem_buf_row);
> + if (buf_row_count == wal_mem_cursor->row_index) {
> /* No more rows in the current buffer. */
> if (wal_mem->last_buf_index == wal_mem_cursor->buf_index)
> /* No more rows in the memory. */
> @@ -269,5 +268,6 @@ next_buffer:
> *row = &buf_row->xrow;
> *data = buf_row->data;
> *size = buf_row->size;
> + ++wal_mem_cursor->row_index;
> return 0;
> }
Again, this seems to belong to the previous patch.
More information about the Tarantool-patches
mailing list