From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 1FD4128859 for ; Tue, 13 Aug 2019 02:27:54 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id lGxjPqGeNleB for ; Tue, 13 Aug 2019 02:27:53 -0400 (EDT) Received: from smtp39.i.mail.ru (smtp39.i.mail.ru [94.100.177.99]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 732DC28857 for ; Tue, 13 Aug 2019 02:27:53 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH 5/7] Replication: in memory replication Date: Tue, 13 Aug 2019 09:27:43 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Relay uses both xlog files and wal memory relaying to feed a replica with transaction data. New workflow is to read xlog files until the last one and then switch to wal memory. If relay is out of wal in-memory data then relay returns to a file mode. A heartbeat is sent only while in-memory mode from wal thread because there are always data in files. Closes #3794 --- src/box/gc.c | 3 + src/box/gc.h | 2 + src/box/relay.cc | 161 +++++++++++++++++++++++----------------------- src/box/wal.c | 154 +++++++++++++++++++++++++++++++++++++++++++- src/box/wal.h | 6 ++ src/box/wal_mem.c | 10 +-- 6 files changed, 249 insertions(+), 87 deletions(-) diff --git a/src/box/gc.c b/src/box/gc.c index 9771e407a..c36c2ace4 100644 --- a/src/box/gc.c +++ b/src/box/gc.c @@ -126,6 +126,8 @@ gc_init(const char *wal_dir_name) xdir_scan(&gc.wal_dir); gc.log_opened = false; gc_tree_new(&gc.consumers); + xdir_create(&gc.xdir, wal_dir_name, XLOG, &INSTANCE_UUID, + &xlog_opts_default); fiber_cond_create(&gc.cleanup_cond); checkpoint_schedule_cfg(&gc.checkpoint_schedule, 0, 0); @@ -166,6 +168,7 @@ gc_free(void) gc_consumer_delete(consumer); consumer = next; } + xdir_destroy(&gc.xdir); } /** diff --git a/src/box/gc.h b/src/box/gc.h index 9b38a0c06..f28b716b5 100644 --- a/src/box/gc.h +++ b/src/box/gc.h @@ -125,6 +125,8 @@ struct gc_state { /** True if log is opened. */ bool log_opened; /** Registered consumers, linked by gc_consumer::node. */ + /** xdir to track wal files. */ + struct xdir xdir; gc_tree_t consumers; /** Fiber responsible for periodic checkpointing. */ struct fiber *checkpoint_fiber; diff --git a/src/box/relay.cc b/src/box/relay.cc index a1b841291..05fc0f691 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -84,8 +84,6 @@ struct relay { struct replica *replica; /** WAL event watcher. */ struct wal_watcher wal_watcher; - /** Relay reader cond. */ - struct fiber_cond reader_cond; /** Relay diagnostics. */ struct diag diag; /** Vclock recieved from replica. */ @@ -116,6 +114,9 @@ struct relay { /** Relay sync state. */ enum relay_state state; + struct vclock relay_vclock; + char *wal_dir; + struct { /* Align to prevent false-sharing with tx thread */ alignas(CACHELINE_SIZE) @@ -165,7 +166,6 @@ relay_new(struct replica *replica) return NULL; } relay->replica = replica; - fiber_cond_create(&relay->reader_cond); diag_create(&relay->diag); stailq_create(&relay->pending_gc); relay->state = RELAY_OFF; @@ -215,7 +215,8 @@ relay_exit(struct relay *relay) * cursor, which must be closed in the same thread * that opened it (it uses cord's slab allocator). */ - recovery_delete(relay->r); + if (relay->r != NULL) + recovery_delete(relay->r); relay->r = NULL; } @@ -239,7 +240,6 @@ relay_delete(struct relay *relay) { if (relay->state == RELAY_FOLLOW) relay_stop(relay); - fiber_cond_destroy(&relay->reader_cond); diag_destroy(&relay->diag); TRASH(relay); free(relay); @@ -361,31 +361,6 @@ relay_set_error(struct relay *relay, struct error *e) diag_add_error(&relay->diag, e); } -static void -relay_process_wal_event(struct wal_watcher *watcher, unsigned events) -{ - struct relay *relay = container_of(watcher, struct relay, wal_watcher); - if (fiber_is_cancelled()) { - /* - * The relay is exiting. Rescanning the WAL at this - * point would be pointless and even dangerous, - * because the relay could have written a packet - * fragment to the socket before being cancelled - * so that writing another row to the socket would - * lead to corrupted replication stream and, as - * a result, permanent replication breakdown. - */ - return; - } - try { - recover_remaining_wals(relay->r, &relay->stream, NULL, - (events & WAL_EVENT_ROTATE) != 0); - } catch (Exception *e) { - relay_set_error(relay, e); - fiber_cancel(fiber()); - } -} - /* * Relay reader fiber function. * Read xrow encoded vclocks sent by the replica. @@ -408,7 +383,24 @@ relay_reader_f(va_list ap) /* vclock is followed while decoding, zeroing it. */ vclock_create(&relay->recv_vclock); xrow_decode_vclock_xc(&xrow, &relay->recv_vclock); - fiber_cond_signal(&relay->reader_cond); + if (relay->status_msg.msg.route != NULL) + continue; + struct vclock *send_vclock; + if (relay->version_id < version_id(1, 7, 4)) + send_vclock = &relay->r->vclock; + else + send_vclock = &relay->recv_vclock; + if (vclock_sum(&relay->status_msg.vclock) == + vclock_sum(send_vclock)) + continue; + static const struct cmsg_hop route[] = { + {tx_status_update, NULL} + }; + cmsg_init(&relay->status_msg.msg, route); + vclock_copy(&relay->status_msg.vclock, + send_vclock); + relay->status_msg.relay = relay; + cpipe_push(&relay->tx_pipe, &relay->status_msg.msg); } } catch (Exception *e) { relay_set_error(relay, e); @@ -434,6 +426,27 @@ relay_send_heartbeat(struct relay *relay) } } +static int +relay_send_cb(struct xrow_header *row, void *data) +{ + try { + struct relay *relay = (struct relay *)data; + relay_send_row(&relay->stream, row); + return 0; + } catch (Exception *e) { + return -1; + } +} + +static void +relay_endpoint_cb(struct ev_loop *loop, ev_watcher *watcher, int events) +{ + (void) loop; + (void) events; + struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data; + cbus_process(endpoint); +} + /** * A libev callback invoked when a relay client socket is ready * for read. This currently only happens when the client closes @@ -443,21 +456,16 @@ static int relay_subscribe_f(va_list ap) { struct relay *relay = va_arg(ap, struct relay *); - struct recovery *r = relay->r; coio_enable(); relay_set_cord_name(relay->io.fd); /* Create cpipe to tx for propagating vclock. */ cbus_endpoint_create(&relay->endpoint, tt_sprintf("relay_%p", relay), - fiber_schedule_cb, fiber()); + relay_endpoint_cb, &relay->endpoint); cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe, &relay->relay_pipe, NULL, NULL, cbus_process); - /* Setup WAL watcher for sending new rows to the replica. */ - wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, - relay_process_wal_event, cbus_process); - /* Start fiber for receiving replica acks. */ char name[FIBER_NAME_MAX]; snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader"); @@ -473,50 +481,39 @@ relay_subscribe_f(va_list ap) */ relay_send_heartbeat(relay); - /* - * Run the event loop until the connection is broken - * or an error occurs. - */ while (!fiber_is_cancelled()) { - double timeout = replication_timeout; - struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, - ERRINJ_DOUBLE); - if (inj != NULL && inj->dparam != 0) - timeout = inj->dparam; - - fiber_cond_wait_deadline(&relay->reader_cond, - relay->last_row_time + timeout); - /* - * The fiber can be woken by IO cancel, by a timeout of - * status messaging or by an acknowledge to status message. - * Handle cbus messages first. + * Start relaying data. First we recover all existing wal + * files then try to continue data streaming from wal memory. + * + * NOTE: it looks more efficient if we try to relay from wal + * first but it breaks tests behavior working around collected + * logs. As it has only little impact we could fix it in + * future. */ - cbus_process(&relay->endpoint); - /* Check for a heartbeat timeout. */ - if (ev_monotonic_now(loop()) - relay->last_row_time > timeout) - relay_send_heartbeat(relay); + try { + relay->r = recovery_new(relay->wal_dir, false, + &relay->relay_vclock); + auto relay_guard = make_scoped_guard([&] { + recovery_delete(relay->r); + relay->r = NULL; + }); + recover_remaining_wals(relay->r, &relay->stream, + NULL, true); + } catch (Exception *e) { + e->log(); + relay_set_error(relay, e); + break; + } /* - * Check that the vclock has been updated and the previous - * status message is delivered + * All known 'file' data were sent it is high time to + * switch to wal memory relaying. */ - if (relay->status_msg.msg.route != NULL) - continue; - struct vclock *send_vclock; - if (relay->version_id < version_id(1, 7, 4)) - send_vclock = &r->vclock; - else - send_vclock = &relay->recv_vclock; - if (vclock_sum(&relay->status_msg.vclock) == - vclock_sum(send_vclock)) - continue; - static const struct cmsg_hop route[] = { - {tx_status_update, NULL} + if (wal_relay(&relay->relay_vclock, relay_send_cb, relay, + tt_sprintf("relay_%p", relay)) != 0) { + relay_set_error(relay, diag_last_error(&fiber()->diag)); + break; }; - cmsg_init(&relay->status_msg.msg, route); - vclock_copy(&relay->status_msg.vclock, send_vclock); - relay->status_msg.relay = relay; - cpipe_push(&relay->tx_pipe, &relay->status_msg.msg); } /* @@ -528,9 +525,6 @@ relay_subscribe_f(va_list ap) diag_log(); say_crit("exiting the relay loop"); - /* Clear WAL watcher. */ - wal_clear_watcher(&relay->wal_watcher, cbus_process); - /* Join ack reader fiber. */ fiber_cancel(reader); fiber_join(reader); @@ -557,6 +551,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, * unless it has already been registered by initial * join. */ + vclock_copy(&relay->relay_vclock, replica_clock); if (replica->gc == NULL) { replica->gc = gc_consumer_register(replica_clock, "replica %s", tt_uuid_str(&replica->uuid)); @@ -571,15 +566,16 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, }); vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock); - relay->r = recovery_new(cfg_gets("wal_dir"), false, - replica_clock); + relay->wal_dir = strdup(cfg_gets("wal_dir")); vclock_copy(&relay->tx.vclock, replica_clock); relay->version_id = replica_version_id; + relay->r = NULL; int rc = cord_costart(&relay->cord, "subscribe", relay_subscribe_f, relay); if (rc == 0) rc = cord_cojoin(&relay->cord); + free(relay->wal_dir); if (rc != 0) diag_raise(); } @@ -616,7 +612,10 @@ static void relay_send_row(struct xstream *stream, struct xrow_header *packet) { struct relay *relay = container_of(stream, struct relay, stream); - assert(iproto_type_is_dml(packet->type)); + if (packet->type != IPROTO_OK) { + assert(iproto_type_is_dml(packet->type)); + vclock_follow_xrow(&relay->relay_vclock, packet); + } /* * Transform replica local requests to IPROTO_NOP so as to * promote vclock on the replica without actually modifying diff --git a/src/box/wal.c b/src/box/wal.c index 6cdb0db15..0457f3d46 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -159,6 +159,8 @@ struct wal_writer struct rlist watchers; /** Wal memory buffer. */ struct wal_mem wal_mem; + /** Wal memory condition. */ + struct fiber_cond wal_mem_cond; }; enum wal_msg_type { @@ -1065,6 +1067,7 @@ wal_write_batch(struct wal_writer *writer, struct wal_msg *wal_msg) writer->checkpoint_wal_size += rc; last_committed = stailq_last(&wal_msg->commit); vclock_merge(&writer->vclock, &vclock_diff); + fiber_cond_broadcast(&writer->wal_mem_cond); /* * Notify TX if the checkpoint threshold has been exceeded. @@ -1163,7 +1166,9 @@ wal_cord_f(va_list ap) { (void) ap; struct wal_writer *writer = &wal_writer_singleton; + fiber_cond_create(&writer->wal_mem_cond); wal_mem_create(&writer->wal_mem); + wal_mem_svp(&writer->wal_mem, &writer->vclock); /** Initialize eio in this thread */ coio_enable(); @@ -1457,7 +1462,8 @@ wal_set_watcher(struct wal_watcher *watcher, const char *name, { wal_watcher_notify_perform, &watcher->wal_pipe }; watcher->route[1] = (struct cmsg_hop) { wal_watcher_notify_complete, NULL }; - cbus_pair("wal", name, &watcher->wal_pipe, &watcher->watcher_pipe, + + cbus_pair("wal", name, &watcher->wal_pipe, &watcher->watcher_pipe, wal_watcher_attach, watcher, process_cb); } @@ -1494,3 +1500,149 @@ wal_atfork() if (xlog_is_open(&vy_log_writer.xlog)) xlog_atfork(&vy_log_writer.xlog); } + +struct wal_relay_msg { + struct cmsg base; + struct cpipe wal_pipe; + struct cpipe relay_pipe; + + struct vclock *vclock; + wal_relay_cb on_wal_relay; + void *cb_data; + struct fiber *fiber; + struct cmsg cancel_msg; + struct fiber_cond done_cond; + bool done; + int rc; + struct diag diag; +}; + +static void +wal_relay_done(struct cmsg *base) +{ + struct wal_relay_msg *msg = + container_of(base, struct wal_relay_msg, base); + msg->done = true; + fiber_cond_signal(&msg->done_cond); +} + +static int +wal_relay_f(va_list ap) +{ + struct wal_writer *writer = &wal_writer_singleton; + struct wal_relay_msg *msg = va_arg(ap, struct wal_relay_msg *); + struct vclock *vclock = msg->vclock; + wal_relay_cb on_wal_relay = msg->on_wal_relay; + void *cb_data = msg->cb_data; + + double last_row_time = ev_monotonic_now(loop()); + + struct wal_mem_cursor cursor; + if (wal_mem_cursor_create(&writer->wal_mem, &cursor, vclock) != 0) + goto done; + while (!fiber_is_cancelled()) { + struct xrow_header *row; + void *data; + size_t size; + int rc = wal_mem_cursor_next(&writer->wal_mem, &cursor, + &row, &data, &size); + if (rc < 0) { + /* Outdated cursor. */ + break; + } + if (rc == 0 && vclock_get(vclock, row->replica_id) >= row->lsn) + continue; + if (rc > 0) { + double timeout = replication_timeout; + struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, + ERRINJ_DOUBLE); + if (inj != NULL && inj->dparam != 0) + timeout = inj->dparam; + + /* + * Nothing to send so wait for the next row + * and send a hearth beat if timeout exceeded. + */ + fiber_cond_wait_deadline(&writer->wal_mem_cond, + last_row_time + timeout); + if (fiber_is_cancelled()) + break; + if (ev_monotonic_now(loop()) - last_row_time > + timeout) { + struct xrow_header hearth_beat; + xrow_encode_timestamp(&hearth_beat, instance_id, + ev_now(loop())); + row = &hearth_beat; + } else + continue; + } + last_row_time = ev_monotonic_now(loop()); + if (on_wal_relay(row, cb_data) != 0) { + diag_move(&fiber()->diag, &msg->diag); + break; + } + } + static struct cmsg_hop done_route[] = { + {wal_relay_done, NULL} + }; +done: + cmsg_init(&msg->base, done_route); + cpipe_push(&msg->relay_pipe, &msg->base); + msg->fiber = NULL; + return 0; +} + +static void +wal_relay_attach(void *data) +{ + struct wal_relay_msg *msg = (struct wal_relay_msg *)data; + msg->fiber = fiber_new("wal relay fiber", wal_relay_f); + fiber_start(msg->fiber, msg); +} + +static void +wal_relay_cancel(struct cmsg *base) +{ + struct wal_relay_msg *msg = container_of(base, struct wal_relay_msg, + cancel_msg); + if (msg->fiber != NULL) + fiber_cancel(msg->fiber); +} + +int +wal_relay(struct vclock *vclock, wal_relay_cb on_wal_relay, void *cb_data, + const char *endpoint_name) +{ + struct wal_relay_msg wal_relay_msg; + wal_relay_msg.vclock = vclock; + wal_relay_msg.on_wal_relay = on_wal_relay; + wal_relay_msg.cb_data = cb_data; + diag_create(&wal_relay_msg.diag); + wal_relay_msg.cancel_msg.route = NULL; + + fiber_cond_create(&wal_relay_msg.done_cond); + wal_relay_msg.done = false; + + cbus_pair("wal", endpoint_name, &wal_relay_msg.wal_pipe, + &wal_relay_msg.relay_pipe, + wal_relay_attach, &wal_relay_msg, cbus_process); + + while (!wal_relay_msg.done) { + if (fiber_is_cancelled() && + wal_relay_msg.cancel_msg.route == NULL) { + static struct cmsg_hop cancel_route[]= { + {wal_relay_cancel, NULL}}; + cmsg_init(&wal_relay_msg.cancel_msg, cancel_route); + cpipe_push(&wal_relay_msg.wal_pipe, &wal_relay_msg.cancel_msg); + } + fiber_cond_wait(&wal_relay_msg.done_cond); + } + + cbus_unpair(&wal_relay_msg.wal_pipe, &wal_relay_msg.relay_pipe, + NULL, NULL, cbus_process); + if (!diag_is_empty(&wal_relay_msg.diag)) { + diag_move(&wal_relay_msg.diag, &fiber()->diag); + return -1; + } + return 0; +} diff --git a/src/box/wal.h b/src/box/wal.h index 1a7156d97..bd298cebe 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -235,6 +235,12 @@ wal_write_vy_log(struct journal_entry *req); void wal_rotate_vy_log(); +typedef int (*wal_relay_cb)(struct xrow_header *header, void *data); + +int +wal_relay(struct vclock *vclock, wal_relay_cb on_wal_relay, void *cb_data, + const char *endpoint_name); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/wal_mem.c b/src/box/wal_mem.c index fdfc6f93d..b01604d55 100644 --- a/src/box/wal_mem.c +++ b/src/box/wal_mem.c @@ -247,14 +247,13 @@ wal_mem_cursor_next(struct wal_mem *wal_mem, } struct wal_mem_buf *mem_buf; - size_t last_row_index; next_buffer: mem_buf = wal_mem->buf + - wal_mem->last_buf_index % WAL_MEM_BUF_COUNT; - last_row_index = ibuf_used(&mem_buf->rows) / - sizeof(struct wal_mem_buf_row); - if (last_row_index == wal_mem_cursor->row_index) { + wal_mem_cursor->buf_index % WAL_MEM_BUF_COUNT; + size_t buf_row_count = ibuf_used(&mem_buf->rows) / + sizeof(struct wal_mem_buf_row); + if (buf_row_count == wal_mem_cursor->row_index) { /* No more rows in the current buffer. */ if (wal_mem->last_buf_index == wal_mem_cursor->buf_index) /* No more rows in the memory. */ @@ -269,5 +268,6 @@ next_buffer: *row = &buf_row->xrow; *data = buf_row->data; *size = buf_row->size; + ++wal_mem_cursor->row_index; return 0; } -- 2.22.0