From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Wed, 21 Aug 2019 16:52:02 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] [PATCH 5/7] Replication: in memory replication Message-ID: <20190821135202.GC13834@esperanza> References: MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: To: Georgy Kirichenko Cc: tarantool-patches@freelists.org List-ID: On Tue, Aug 13, 2019 at 09:27:43AM +0300, Georgy Kirichenko wrote: > diff --git a/src/box/gc.h b/src/box/gc.h > index 9b38a0c06..f28b716b5 100644 > --- a/src/box/gc.h > +++ b/src/box/gc.h > @@ -125,6 +125,8 @@ struct gc_state { > /** True if log is opened. */ > bool log_opened; > /** Registered consumers, linked by gc_consumer::node. */ > + /** xdir to track wal files. */ > + struct xdir xdir; AFAICS this xdir is only created and destroyed - it isn't used anywhere. > gc_tree_t consumers; > /** Fiber responsible for periodic checkpointing. */ > struct fiber *checkpoint_fiber; > diff --git a/src/box/relay.cc b/src/box/relay.cc > index a1b841291..05fc0f691 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -116,6 +114,9 @@ struct relay { > /** Relay sync state. */ > enum relay_state state; > > + struct vclock relay_vclock; > + char *wal_dir; > + Comments, please. It would also be nice to update a comment to relay->r to clarify when it can be NULL. > struct { > /* Align to prevent false-sharing with tx thread */ > alignas(CACHELINE_SIZE) > @@ -434,6 +426,27 @@ relay_send_heartbeat(struct relay *relay) > } > } > > +static int > +relay_send_cb(struct xrow_header *row, void *data) > +{ > + try { > + struct relay *relay = (struct relay *)data; > + relay_send_row(&relay->stream, row); > + return 0; > + } catch (Exception *e) { > + return -1; > + } > +} > + > +static void > +relay_endpoint_cb(struct ev_loop *loop, ev_watcher *watcher, int events) > +{ > + (void) loop; > + (void) events; > + struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data; > + cbus_process(endpoint); > +} I guess we could define this helper somewhere in cbus internals, just like fiber_schedule_cb. > diff --git a/src/box/wal.c b/src/box/wal.c > index 6cdb0db15..0457f3d46 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -1163,7 +1166,9 @@ wal_cord_f(va_list ap) > { > (void) ap; > struct wal_writer *writer = &wal_writer_singleton; > + fiber_cond_create(&writer->wal_mem_cond); > wal_mem_create(&writer->wal_mem); > + wal_mem_svp(&writer->wal_mem, &writer->vclock); Looks liks this belongs to the previous patch. > > /** Initialize eio in this thread */ > coio_enable(); > @@ -1494,3 +1500,149 @@ wal_atfork() > if (xlog_is_open(&vy_log_writer.xlog)) > xlog_atfork(&vy_log_writer.xlog); > } > + > +struct wal_relay_msg { > + struct cmsg base; > + struct cpipe wal_pipe; > + struct cpipe relay_pipe; > + > + struct vclock *vclock; > + wal_relay_cb on_wal_relay; > + void *cb_data; > + struct fiber *fiber; > + struct cmsg cancel_msg; > + struct fiber_cond done_cond; > + bool done; > + int rc; > + struct diag diag; > +}; Comments, comments... > + > +static void > +wal_relay_done(struct cmsg *base) > +{ > + struct wal_relay_msg *msg = > + container_of(base, struct wal_relay_msg, base); > + msg->done = true; > + fiber_cond_signal(&msg->done_cond); > +} > + > +static int > +wal_relay_f(va_list ap) > +{ > + struct wal_writer *writer = &wal_writer_singleton; > + struct wal_relay_msg *msg = va_arg(ap, struct wal_relay_msg *); > + struct vclock *vclock = msg->vclock; > + wal_relay_cb on_wal_relay = msg->on_wal_relay; > + void *cb_data = msg->cb_data; > + > + double last_row_time = ev_monotonic_now(loop()); > + > + struct wal_mem_cursor cursor; > + if (wal_mem_cursor_create(&writer->wal_mem, &cursor, vclock) != 0) > + goto done; > + while (!fiber_is_cancelled()) { > + struct xrow_header *row; > + void *data; > + size_t size; > + int rc = wal_mem_cursor_next(&writer->wal_mem, &cursor, > + &row, &data, &size); > + if (rc < 0) { > + /* Outdated cursor. */ > + break; > + } > + if (rc == 0 && vclock_get(vclock, row->replica_id) >= row->lsn) > + continue; > + if (rc > 0) { > + double timeout = replication_timeout; > + struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, > + ERRINJ_DOUBLE); > + if (inj != NULL && inj->dparam != 0) > + timeout = inj->dparam; > + > + /* > + * Nothing to send so wait for the next row > + * and send a hearth beat if timeout exceeded. > + */ > + fiber_cond_wait_deadline(&writer->wal_mem_cond, > + last_row_time + timeout); > + if (fiber_is_cancelled()) > + break; > + if (ev_monotonic_now(loop()) - last_row_time > > + timeout) { > + struct xrow_header hearth_beat; > + xrow_encode_timestamp(&hearth_beat, instance_id, s/hearth/heart > + ev_now(loop())); > + row = &hearth_beat; > + } else > + continue; > + } > + last_row_time = ev_monotonic_now(loop()); > + if (on_wal_relay(row, cb_data) != 0) { > + diag_move(&fiber()->diag, &msg->diag); > + break; > + } > + } > + static struct cmsg_hop done_route[] = { > + {wal_relay_done, NULL} > + }; > +done: > + cmsg_init(&msg->base, done_route); > + cpipe_push(&msg->relay_pipe, &msg->base); > + msg->fiber = NULL; > + return 0; > +} > diff --git a/src/box/wal.h b/src/box/wal.h > index 1a7156d97..bd298cebe 100644 > --- a/src/box/wal.h > +++ b/src/box/wal.h > @@ -235,6 +235,12 @@ wal_write_vy_log(struct journal_entry *req); > void > wal_rotate_vy_log(); > > +typedef int (*wal_relay_cb)(struct xrow_header *header, void *data); > + > +int > +wal_relay(struct vclock *vclock, wal_relay_cb on_wal_relay, void *cb_data, > + const char *endpoint_name); Comments, comments... > + > #if defined(__cplusplus) > } /* extern "C" */ > #endif /* defined(__cplusplus) */ > diff --git a/src/box/wal_mem.c b/src/box/wal_mem.c > index fdfc6f93d..b01604d55 100644 > --- a/src/box/wal_mem.c > +++ b/src/box/wal_mem.c > @@ -247,14 +247,13 @@ wal_mem_cursor_next(struct wal_mem *wal_mem, > } > > struct wal_mem_buf *mem_buf; > - size_t last_row_index; > > next_buffer: > mem_buf = wal_mem->buf + > - wal_mem->last_buf_index % WAL_MEM_BUF_COUNT; > - last_row_index = ibuf_used(&mem_buf->rows) / > - sizeof(struct wal_mem_buf_row); > - if (last_row_index == wal_mem_cursor->row_index) { > + wal_mem_cursor->buf_index % WAL_MEM_BUF_COUNT; > + size_t buf_row_count = ibuf_used(&mem_buf->rows) / > + sizeof(struct wal_mem_buf_row); > + if (buf_row_count == wal_mem_cursor->row_index) { > /* No more rows in the current buffer. */ > if (wal_mem->last_buf_index == wal_mem_cursor->buf_index) > /* No more rows in the memory. */ > @@ -269,5 +268,6 @@ next_buffer: > *row = &buf_row->xrow; > *data = buf_row->data; > *size = buf_row->size; > + ++wal_mem_cursor->row_index; > return 0; > } Again, this seems to belong to the previous patch.