From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp33.i.mail.ru (smtp33.i.mail.ru [94.100.177.93]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 9941F4696CC for ; Wed, 12 Feb 2020 12:39:30 +0300 (MSK) From: Georgy Kirichenko Date: Wed, 12 Feb 2020 12:39:20 +0300 Message-Id: <0181d8fc816ed2117f1df80bf7a614e422a0c94e.1581500169.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v4 11/11] replication: use wal memory buffer to fetch rows List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Fetch data from wal in-memory buffer. Wal allows to start a fiber which creates a xrow buffer cursor with given vclock and then fetches row from the xrow buffer one by one and calls given callback for each row. Also the wal relaying fiber send a heartbeat message if all rows were processed there were no rows written for replication timeout period. In case of outdated vclock (wal could not create a cursor or fetch new row from the cursor) a relay switch to reading logged data from file up to the current vclock and then makes next attempt to fetch data from wal memory. In file mode there is always data to send to a replica so relay do not have to heartbeat messages. >From this point relay creates a cord only when switches to reading from file. Frequent memory-file oscillation is not very likely because two consideration: 1. If replica is to slow (slower than master writes) - it will switch to disk and then fall behind 2. If replica is fast enough - it will catch memory and then consume memory before the memory buffer rotation. In order to split wal and relay logic a relay filter function were introduced which should be passed while relay attaches to wal. Note: wal exit is not graceful - tx sends a break loop message and wal just stops cbus processing without any care about other fibers which could still use cbus. To overcome this there is a special trigger which is signaled just before cbus pipe destroy. Close #3794 Part of #980 --- src/box/box.cc | 9 - src/box/lua/info.c | 4 +- src/box/recovery.cc | 17 +- src/box/relay.cc | 517 +++++------------- src/box/relay.h | 6 +- src/box/replication.cc | 3 + src/box/wal.c | 510 +++++++++++++++-- src/box/wal.h | 92 +++- src/lib/core/errinj.h | 1 + test/box-py/iproto.test.py | 9 +- test/box/errinj.result | 134 ++--- test/replication/force_recovery.result | 8 + test/replication/force_recovery.test.lua | 2 + test/replication/replica_rejoin.result | 8 + test/replication/replica_rejoin.test.lua | 2 + .../show_error_on_disconnect.result | 8 + .../show_error_on_disconnect.test.lua | 2 + test/replication/suite.ini | 2 +- test/xlog/panic_on_wal_error.result | 12 + test/xlog/panic_on_wal_error.test.lua | 3 + test/xlog/suite.ini | 2 +- 21 files changed, 838 insertions(+), 513 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index 17495a211..f629abe70 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1604,14 +1604,6 @@ box_process_register(struct ev_io *io, struct xrow_header *header) xrow_encode_vclock_xc(&row, &replicaset.vclock); row.sync = header->sync; coio_write_xrow(io, &row); - - /* - * Advance the WAL consumer state to the position where - * registration was complete and assign it to the - * replica. - */ - replica = replica_by_uuid(&instance_uuid); - wal_relay_status_update(replica->id, &stop_vclock); } void @@ -1756,7 +1748,6 @@ box_process_join(struct ev_io *io, struct xrow_header *header) if (coio_write_xrow(io, &row) < 0) diag_raise(); replica = replica_by_uuid(&instance_uuid); - wal_relay_status_update(replica->id, &stop_vclock); } void diff --git a/src/box/lua/info.c b/src/box/lua/info.c index aba9a4b7c..60ef5b0b0 100644 --- a/src/box/lua/info.c +++ b/src/box/lua/info.c @@ -135,7 +135,9 @@ lbox_pushrelay(lua_State *L, struct relay *relay) lua_pushstring(L, "follow"); lua_settable(L, -3); lua_pushstring(L, "vclock"); - lbox_pushvclock(L, relay_vclock(relay)); + struct vclock vclock; + relay_vclock(relay, &vclock); + lbox_pushvclock(L, &vclock); lua_settable(L, -3); lua_pushstring(L, "idle"); lua_pushnumber(L, ev_monotonic_now(loop()) - diff --git a/src/box/recovery.cc b/src/box/recovery.cc index e4aad1296..657c55d10 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -257,9 +257,11 @@ recover_xlog(struct recovery *r, struct xstream *stream, * the file is fully read: it's fully read only * when EOF marker has been read, see i.eof_read */ - if (stop_vclock != NULL && - r->vclock.signature >= stop_vclock->signature) - return 0; + if (stop_vclock != NULL) { + int rc = vclock_compare(&r->vclock, stop_vclock); + if (rc >= 0 && rc != VCLOCK_ORDER_UNDEFINED) + return 0; + } int64_t current_lsn = vclock_get(&r->vclock, row.replica_id); if (row.lsn <= current_lsn) continue; /* already applied, skip */ @@ -363,9 +365,12 @@ recover_current_wal: if (xlog_cursor_is_eof(&r->cursor)) recovery_close_log(r); - if (stop_vclock != NULL && vclock_compare(&r->vclock, stop_vclock) != 0) { - diag_set(XlogGapError, &r->vclock, stop_vclock); - return -1; + if (stop_vclock != NULL) { + int rc = vclock_compare(&r->vclock, stop_vclock); + if (rc < 0 || rc == VCLOCK_ORDER_UNDEFINED) { + diag_set(XlogGapError, &r->vclock, stop_vclock); + return -1; + } } region_free(&fiber()->gc); diff --git a/src/box/relay.cc b/src/box/relay.cc index 13c8f4c28..980e05b2f 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -44,7 +44,6 @@ #include "engine.h" #include "gc.h" #include "iproto_constants.h" -#include "recovery.h" #include "replication.h" #include "trigger.h" #include "vclock.h" @@ -54,38 +53,20 @@ #include "xstream.h" #include "wal.h" -/** - * Cbus message to send status updates from relay to tx thread. - */ -struct relay_status_msg { - /** Parent */ - struct cmsg msg; - /** Relay instance */ - struct relay *relay; - /** Replica vclock. */ - struct vclock vclock; -}; - /** State of a replication relay. */ struct relay { - /** The thread in which we relay data to the replica. */ - struct cord cord; /** Replica connection */ struct ev_io io; /** Request sync */ uint64_t sync; - /** Recovery instance to read xlog from the disk */ - struct recovery *r; /** Xstream argument to recovery */ struct xstream stream; /** Vclock to stop playing xlogs */ struct vclock stop_vclock; /** Remote replica */ struct replica *replica; - /** WAL event watcher. */ - struct wal_watcher wal_watcher; - /** Relay reader cond. */ - struct fiber_cond reader_cond; + /** WAL memory relay. */ + struct wal_relay wal_relay; /** Relay diagnostics. */ struct diag diag; /** Vclock recieved from replica. */ @@ -98,25 +79,10 @@ struct relay { */ struct vclock local_vclock_at_subscribe; - /** Relay endpoint */ - struct cbus_endpoint endpoint; - /** A pipe from 'relay' thread to 'tx' */ - struct cpipe tx_pipe; - /** A pipe from 'tx' thread to 'relay' */ - struct cpipe relay_pipe; - /** Status message */ - struct relay_status_msg status_msg; - /** Time when last row was sent to peer. */ - double last_row_time; /** Relay sync state. */ enum relay_state state; - - struct { - /* Align to prevent false-sharing with tx thread */ - alignas(CACHELINE_SIZE) - /** Known relay vclock. */ - struct vclock vclock; - } tx; + /** Fiber processing this relay. */ + struct fiber *fiber; }; struct diag* @@ -131,24 +97,22 @@ relay_get_state(const struct relay *relay) return relay->state; } -const struct vclock * -relay_vclock(const struct relay *relay) +void +relay_vclock(const struct relay *relay, struct vclock *vclock) { - return &relay->tx.vclock; + wal_relay_vclock(&relay->wal_relay, vclock); } double relay_last_row_time(const struct relay *relay) { - return relay->last_row_time; + return wal_relay_last_row_time(&relay->wal_relay); } static int relay_send(struct relay *relay, struct xrow_header *packet); static int relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row); -static int -relay_send_row(struct xstream *stream, struct xrow_header *row); struct relay * relay_new(struct replica *replica) @@ -160,18 +124,14 @@ relay_new(struct replica *replica) return NULL; } relay->replica = replica; - relay->last_row_time = ev_monotonic_now(loop()); - fiber_cond_create(&relay->reader_cond); diag_create(&relay->diag); relay->state = RELAY_OFF; return relay; } static void -relay_start(struct relay *relay, int fd, uint64_t sync, - int (*stream_write)(struct xstream *, struct xrow_header *)) +relay_start(struct relay *relay, int fd, uint64_t sync) { - xstream_create(&relay->stream, stream_write); /* * Clear the diagnostics at start, in case it has the old * error message which we keep around to display in @@ -181,18 +141,15 @@ relay_start(struct relay *relay, int fd, uint64_t sync, coio_create(&relay->io, fd); relay->sync = sync; relay->state = RELAY_FOLLOW; - relay->last_row_time = ev_monotonic_now(loop()); + relay->fiber = fiber(); } void relay_cancel(struct relay *relay) { /* Check that the thread is running first. */ - if (relay->cord.id != 0) { - if (tt_pthread_cancel(relay->cord.id) == ESRCH) - return; - tt_pthread_join(relay->cord.id, NULL); - } + if (relay->fiber != NULL) + fiber_cancel(relay->fiber); } /** @@ -201,33 +158,17 @@ relay_cancel(struct relay *relay) static void relay_exit(struct relay *relay) { + (void) relay; struct errinj *inj = errinj(ERRINJ_RELAY_EXIT_DELAY, ERRINJ_DOUBLE); if (inj != NULL && inj->dparam > 0) fiber_sleep(inj->dparam); - - /* - * Destroy the recovery context. We MUST do it in - * the relay thread, because it contains an xlog - * cursor, which must be closed in the same thread - * that opened it (it uses cord's slab allocator). - */ - recovery_delete(relay->r); - relay->r = NULL; } static void relay_stop(struct relay *relay) { - if (relay->r != NULL) - recovery_delete(relay->r); - relay->r = NULL; relay->state = RELAY_STOPPED; - /* - * Needed to track whether relay thread is running or not - * for relay_cancel(). Id is reset to a positive value - * upon cord_create(). - */ - relay->cord.id = 0; + relay->fiber = NULL; } void @@ -235,27 +176,11 @@ relay_delete(struct relay *relay) { if (relay->state == RELAY_FOLLOW) relay_stop(relay); - fiber_cond_destroy(&relay->reader_cond); diag_destroy(&relay->diag); TRASH(relay); free(relay); } -static void -relay_set_cord_name(int fd) -{ - char name[FIBER_NAME_MAX]; - struct sockaddr_storage peer; - socklen_t addrlen = sizeof(peer); - if (getpeername(fd, ((struct sockaddr*)&peer), &addrlen) == 0) { - snprintf(name, sizeof(name), "relay/%s", - sio_strfaddr((struct sockaddr *)&peer, addrlen)); - } else { - snprintf(name, sizeof(name), "relay/"); - } - cord_set_name(name); -} - void relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) { @@ -263,7 +188,7 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) if (relay == NULL) diag_raise(); - relay_start(relay, fd, sync, relay_send_initial_join_row); + relay_start(relay, fd, sync); auto relay_guard = make_scoped_guard([=] { relay_stop(relay); relay_delete(relay); @@ -291,26 +216,58 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) if (coio_write_xrow(&relay->io, &row) < 0) diag_raise(); + xstream_create(&relay->stream, relay_send_initial_join_row); /* Send read view to the replica. */ engine_join_xc(&ctx, &relay->stream); } -int -relay_final_join_f(va_list ap) +/* + * Filter callback function used by wal relay in order to + * transform all local rows into a NOPs. + */ +static ssize_t +relay_final_join_filter(struct wal_relay *wal_relay, struct xrow_header **row) { - struct relay *relay = va_arg(ap, struct relay *); - auto guard = make_scoped_guard([=] { relay_exit(relay); }); - - coio_enable(); - relay_set_cord_name(relay->io.fd); - - /* Send all WALs until stop_vclock */ - assert(relay->stream.write != NULL); - if (recover_remaining_wals(relay->r, &relay->stream, - &relay->stop_vclock, true) != 0) - diag_raise(); - assert(vclock_compare(&relay->r->vclock, &relay->stop_vclock) == 0); - return 0; + (void) wal_relay; + ssize_t rc = WAL_RELAY_FILTER_PASS; + struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN, + ERRINJ_INT); + if (inj != NULL && (*row)->lsn == inj->iparam) { + struct xrow_header *filtered_row = (struct xrow_header *) + region_alloc(&fiber()->gc, sizeof(*filtered_row)); + if (filtered_row == NULL) { + diag_set(OutOfMemory, sizeof(struct xrow_header), + "region", "struct xrow_header"); + return WAL_RELAY_FILTER_ERR; + } + *filtered_row = **row; + filtered_row->lsn = inj->iparam - 1; + say_warn("injected broken lsn: %lld", + (long long) filtered_row->lsn); + *row = filtered_row; + rc = WAL_RELAY_FILTER_ROW; + } + /* + * Transform replica local requests to IPROTO_NOP so as to + * promote vclock on the replica without actually modifying + * any data. + */ + if ((*row)->group_id == GROUP_LOCAL) { + struct xrow_header *filtered_row = (struct xrow_header *) + region_alloc(&fiber()->gc, sizeof(*filtered_row)); + if (filtered_row == NULL) { + diag_set(OutOfMemory, sizeof(struct xrow_header), + "region", "struct xrow_header"); + return WAL_RELAY_FILTER_ERR; + } + *filtered_row = **row; + filtered_row->type = IPROTO_NOP; + filtered_row->group_id = GROUP_DEFAULT; + filtered_row->bodycnt = 0; + *row = filtered_row; + rc = WAL_RELAY_FILTER_ROW; + } + return rc; } void @@ -321,21 +278,16 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, if (relay == NULL) diag_raise(); - relay_start(relay, fd, sync, relay_send_row); + relay_start(relay, fd, sync); auto relay_guard = make_scoped_guard([=] { relay_stop(relay); relay_delete(relay); }); - relay->r = recovery_new(cfg_gets("wal_dir"), false, - start_vclock); vclock_copy(&relay->stop_vclock, stop_vclock); - int rc = cord_costart(&relay->cord, "final_join", - relay_final_join_f, relay); - if (rc == 0) - rc = cord_cojoin(&relay->cord); - if (rc != 0) + if (wal_relay(&relay->wal_relay, start_vclock, stop_vclock, + relay_final_join_filter, fd, relay->replica) != 0) diag_raise(); ERROR_INJECT(ERRINJ_RELAY_FINAL_JOIN, @@ -347,35 +299,6 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, }); } -/** - * The message which updated tx thread with a new vclock has returned back - * to the relay. - */ -static void -relay_status_update(struct cmsg *msg) -{ - struct relay_status_msg *status = (struct relay_status_msg *)msg; - msg->route = NULL; - fiber_cond_signal(&status->relay->reader_cond); -} - -/** - * Deliver a fresh relay vclock to tx thread. - */ -static void -tx_status_update(struct cmsg *msg) -{ - struct relay_status_msg *status = (struct relay_status_msg *)msg; - if (!status->relay->replica->anon) - wal_relay_status_update(status->relay->replica->id, &status->vclock); - vclock_copy(&status->relay->tx.vclock, &status->vclock); - static const struct cmsg_hop route[] = { - {relay_status_update, NULL} - }; - cmsg_init(msg, route); - cpipe_push(&status->relay->relay_pipe, msg); -} - static void relay_set_error(struct relay *relay, struct error *e) { @@ -384,186 +307,88 @@ relay_set_error(struct relay *relay, struct error *e) diag_add_error(&relay->diag, e); } -static void -relay_process_wal_event(struct wal_watcher *watcher, unsigned events) +/* + * Filter callback function used while subscribe phase. + */ +static ssize_t +relay_subscribe_filter(struct wal_relay *wal_relay, struct xrow_header **row) { - struct relay *relay = container_of(watcher, struct relay, wal_watcher); - if (fiber_is_cancelled()) { + if ((*row)->type != IPROTO_OK) { + assert(iproto_type_is_dml((*row)->type)); /* - * The relay is exiting. Rescanning the WAL at this - * point would be pointless and even dangerous, - * because the relay could have written a packet - * fragment to the socket before being cancelled - * so that writing another row to the socket would - * lead to corrupted replication stream and, as - * a result, permanent replication breakdown. + * Because of asynchronous replication both master + * and replica may have different transaction + * order in their logs. As we start relaying + * transactions from the first unknow one there + * could be some other already known by replica + * and there is no point to send them. */ - return; - } - if (recover_remaining_wals(relay->r, &relay->stream, NULL, - (events & WAL_EVENT_ROTATE) != 0) != 0) { - relay_set_error(relay, diag_last_error(diag_get())); - fiber_cancel(fiber()); + if (vclock_get(&wal_relay->vclock, (*row)->replica_id) >= + (*row)->lsn) + return WAL_RELAY_FILTER_SKIP; } -} - -/* - * Relay reader fiber function. - * Read xrow encoded vclocks sent by the replica. - */ -int -relay_reader_f(va_list ap) -{ - struct relay *relay = va_arg(ap, struct relay *); - struct fiber *relay_f = va_arg(ap, struct fiber *); - - struct ibuf ibuf; - struct ev_io io; - coio_create(&io, relay->io.fd); - ibuf_create(&ibuf, &cord()->slabc, 1024); - try { - while (!fiber_is_cancelled()) { - struct xrow_header xrow; - if (coio_read_xrow_timeout(&io, &ibuf, &xrow, - replication_disconnect_timeout()) < 0) - diag_raise(); - /* vclock is followed while decoding, zeroing it. */ - vclock_create(&relay->recv_vclock); - xrow_decode_vclock_xc(&xrow, &relay->recv_vclock); - fiber_cond_signal(&relay->reader_cond); + ssize_t rc = WAL_RELAY_FILTER_PASS; + + struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN, + ERRINJ_INT); + if (inj != NULL && (*row)->lsn == inj->iparam) { + struct xrow_header *filtered_row = (struct xrow_header *) + region_alloc(&fiber()->gc, sizeof(*filtered_row)); + if (filtered_row == NULL) { + diag_set(OutOfMemory, sizeof(struct xrow_header), + "region", "struct xrow_header"); + return WAL_RELAY_FILTER_ERR; } - } catch (Exception *e) { - relay_set_error(relay, e); - fiber_cancel(relay_f); + *filtered_row = **row; + filtered_row->lsn = inj->iparam - 1; + say_warn("injected broken lsn: %lld", + (long long) filtered_row->lsn); + *row = filtered_row; + rc = WAL_RELAY_FILTER_ROW; } - ibuf_destroy(&ibuf); - return 0; -} - -/** - * Send a heartbeat message over a connected relay. - */ -static void -relay_send_heartbeat(struct relay *relay) -{ - struct xrow_header row; - xrow_encode_timestamp(&row, instance_id, ev_now(loop())); - try { - relay_send(relay, &row); - } catch (Exception *e) { - relay_set_error(relay, e); - fiber_cancel(fiber()); - } -} - -/** - * A libev callback invoked when a relay client socket is ready - * for read. This currently only happens when the client closes - * its socket, and we get an EOF. - */ -static int -relay_subscribe_f(va_list ap) -{ - struct relay *relay = va_arg(ap, struct relay *); - struct recovery *r = relay->r; - - coio_enable(); - relay_set_cord_name(relay->io.fd); - - /* Create cpipe to tx for propagating vclock. */ - cbus_endpoint_create(&relay->endpoint, tt_sprintf("relay_%p", relay), - fiber_schedule_cb, fiber()); - cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe, - &relay->relay_pipe, NULL, NULL, cbus_process); - - /* Setup WAL watcher for sending new rows to the replica. */ - wal_set_watcher(&relay->wal_watcher, relay->endpoint.name, - relay_process_wal_event, cbus_process); - - /* Start fiber for receiving replica acks. */ - char name[FIBER_NAME_MAX]; - snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader"); - struct fiber *reader = fiber_new_xc(name, relay_reader_f); - fiber_set_joinable(reader, true); - fiber_start(reader, relay, fiber()); /* - * If the replica happens to be up to date on subscribe, - * don't wait for timeout to happen - send a heartbeat - * message right away to update the replication lag as - * soon as possible. - */ - relay_send_heartbeat(relay); - - /* - * Run the event loop until the connection is broken - * or an error occurs. + * Transform replica local requests to IPROTO_NOP so as to + * promote vclock on the replica without actually modifying + * any data. */ - while (!fiber_is_cancelled()) { - double timeout = replication_timeout; - struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, - ERRINJ_DOUBLE); - if (inj != NULL && inj->dparam != 0) - timeout = inj->dparam; - - fiber_cond_wait_deadline(&relay->reader_cond, - relay->last_row_time + timeout); - - /* - * The fiber can be woken by IO cancel, by a timeout of - * status messaging or by an acknowledge to status message. - * Handle cbus messages first. - */ - cbus_process(&relay->endpoint); - /* Check for a heartbeat timeout. */ - if (ev_monotonic_now(loop()) - relay->last_row_time > timeout) - relay_send_heartbeat(relay); - /* - * Check that the vclock has been updated and the previous - * status message is delivered - */ - if (relay->status_msg.msg.route != NULL) - continue; - struct vclock *send_vclock; - if (relay->version_id < version_id(1, 7, 4)) - send_vclock = &r->vclock; - else - send_vclock = &relay->recv_vclock; - if (vclock_sum(&relay->status_msg.vclock) == - vclock_sum(send_vclock)) - continue; - static const struct cmsg_hop route[] = { - {tx_status_update, NULL} - }; - cmsg_init(&relay->status_msg.msg, route); - vclock_copy(&relay->status_msg.vclock, send_vclock); - relay->status_msg.relay = relay; - cpipe_push(&relay->tx_pipe, &relay->status_msg.msg); + if ((*row)->group_id == GROUP_LOCAL) { + if ((*row)->replica_id == 0) + return WAL_RELAY_FILTER_SKIP; + struct xrow_header *filtered_row = (struct xrow_header *) + region_alloc(&fiber()->gc, sizeof(*filtered_row)); + if (filtered_row == NULL) { + diag_set(OutOfMemory, sizeof(struct xrow_header), + "region", "struct xrow_header"); + return WAL_RELAY_FILTER_ERR; + } + *filtered_row = **row; + filtered_row->type = IPROTO_NOP; + filtered_row->group_id = GROUP_DEFAULT; + filtered_row->bodycnt = 0; + *row = filtered_row; + rc = WAL_RELAY_FILTER_ROW; } - /* - * Log the error that caused the relay to break the loop. - * Don't clear the error for status reporting. + * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE + * request. If this is FINAL JOIN (i.e. relay->replica is NULL), + * we must relay all rows, even those originating from the replica + * itself (there may be such rows if this is rebootstrap). If this + * SUBSCRIBE, only send a row if it is not from the same replica + * (i.e. don't send replica's own rows back) or if this row is + * missing on the other side (i.e. in case of sudden power-loss, + * data was not written to WAL, so remote master can't recover + * it). In the latter case packet's LSN is less than or equal to + * local master's LSN at the moment it received 'SUBSCRIBE' request. */ - assert(!diag_is_empty(&relay->diag)); - diag_add_error(diag_get(), diag_last_error(&relay->diag)); - diag_log(); - say_crit("exiting the relay loop"); - - /* Clear garbage collector trigger and WAL watcher. */ - wal_clear_watcher(&relay->wal_watcher, cbus_process); - - /* Join ack reader fiber. */ - fiber_cancel(reader); - fiber_join(reader); - - /* Destroy cpipe to tx. */ - cbus_unpair(&relay->tx_pipe, &relay->relay_pipe, - NULL, NULL, cbus_process); - cbus_endpoint_destroy(&relay->endpoint, cbus_process); - - relay_exit(relay); - return -1; + struct relay *relay = container_of(wal_relay, struct relay, wal_relay); + if (wal_relay->replica == NULL || + (*row)->replica_id != wal_relay->replica->id || + (*row)->lsn <= vclock_get(&relay->local_vclock_at_subscribe, + (*row)->replica_id)) { + return rc; + } + return WAL_RELAY_FILTER_SKIP; } /** Replication acceptor fiber handler. */ @@ -574,29 +399,21 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, assert(replica->anon || replica->id != REPLICA_ID_NIL); struct relay *relay = replica->relay; assert(relay->state != RELAY_FOLLOW); - if (!replica->anon) - wal_relay_status_update(replica->id, replica_clock); - relay_start(relay, fd, sync, relay_send_row); + relay_start(relay, fd, sync); auto relay_guard = make_scoped_guard([=] { relay_stop(relay); replica_on_relay_stop(replica); }); vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock); - relay->r = recovery_new(cfg_gets("wal_dir"), false, - replica_clock); - if (relay->r == NULL) - diag_raise(); - vclock_copy(&relay->tx.vclock, replica_clock); relay->version_id = replica_version_id; - int rc = cord_costart(&relay->cord, "subscribe", - relay_subscribe_f, relay); - if (rc == 0) - rc = cord_cojoin(&relay->cord); - if (rc != 0) - diag_raise(); + if (wal_relay(&relay->wal_relay, replica_clock, NULL, + relay_subscribe_filter, fd, relay->replica) != 0) + relay_set_error(relay, diag_last_error(&fiber()->diag)); + relay_exit(relay); + diag_raise(); } static int @@ -605,7 +422,6 @@ relay_send(struct relay *relay, struct xrow_header *packet) ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); packet->sync = relay->sync; - relay->last_row_time = ev_monotonic_now(loop()); if (coio_write_xrow(&relay->io, packet) < 0) return -1; fiber_gc(); @@ -628,54 +444,3 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row) return relay_send(relay, row); return 0; } - -/** Send a single row to the client. */ -static int -relay_send_row(struct xstream *stream, struct xrow_header *packet) -{ - struct relay *relay = container_of(stream, struct relay, stream); - assert(iproto_type_is_dml(packet->type)); - /* - * Transform replica local requests to IPROTO_NOP so as to - * promote vclock on the replica without actually modifying - * any data. - */ - if (packet->group_id == GROUP_LOCAL) { - /* - * Replica-local requests generated while replica - * was anonymous have a zero instance id. Just - * skip all these rows. - */ - if (packet->replica_id == REPLICA_ID_NIL) - return 0; - packet->type = IPROTO_NOP; - packet->group_id = GROUP_DEFAULT; - packet->bodycnt = 0; - } - /* - * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE - * request. If this is FINAL JOIN (i.e. relay->replica is NULL), - * we must relay all rows, even those originating from the replica - * itself (there may be such rows if this is rebootstrap). If this - * SUBSCRIBE, only send a row if it is not from the same replica - * (i.e. don't send replica's own rows back) or if this row is - * missing on the other side (i.e. in case of sudden power-loss, - * data was not written to WAL, so remote master can't recover - * it). In the latter case packet's LSN is less than or equal to - * local master's LSN at the moment it received 'SUBSCRIBE' request. - */ - if (relay->replica == NULL || - packet->replica_id != relay->replica->id || - packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe, - packet->replica_id)) { - struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN, - ERRINJ_INT); - if (inj != NULL && packet->lsn == inj->iparam) { - packet->lsn = inj->iparam - 1; - say_warn("injected broken lsn: %lld", - (long long) packet->lsn); - } - return relay_send(relay, packet); - } - return 0; -} diff --git a/src/box/relay.h b/src/box/relay.h index e1782d78f..43d4e7ab3 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -80,10 +80,10 @@ relay_get_state(const struct relay *relay); /** * Returns relay's vclock * @param relay relay - * @returns relay's vclock + * @param relay's vclock */ -const struct vclock * -relay_vclock(const struct relay *relay); +void +relay_vclock(const struct relay *relay, struct vclock *vclock); /** * Returns relay's last_row_time diff --git a/src/box/replication.cc b/src/box/replication.cc index 869177656..a7a513fe1 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -228,6 +228,9 @@ replica_set_id(struct replica *replica, uint32_t replica_id) { assert(replica_id < VCLOCK_MAX); assert(replica->id == REPLICA_ID_NIL); /* replica id is read-only */ + /* If replica was anon then unregister it from wal. */ + if (replica->anon) + wal_relay_delete(0); replica->id = replica_id; if (tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)) { diff --git a/src/box/wal.c b/src/box/wal.c index b483b8cc4..886663e0c 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -45,6 +45,9 @@ #include "replication.h" #include "mclock.h" #include "xrow_buf.h" +#include "recovery.h" +#include "coio.h" +#include "xrow_io.h" enum { /** @@ -183,6 +186,14 @@ struct wal_writer * without xlog files access. */ struct xrow_buf xrow_buf; + /** xrow buffer condition signaled when a buffer write was done. */ + struct fiber_cond xrow_buf_cond; + /** + * Wal exit is not gracefull so there is a helper trigger + * which is used in order to infor all relays that wal was + * destroyed. + */ + struct rlist on_wal_exit; }; struct wal_msg { @@ -448,6 +459,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode, fiber_cond_create(&writer->wal_gc_cond); writer->gc_wal_vclock = NULL; vclock_create(&writer->gc_first_vclock); + + rlist_create(&writer->on_wal_exit); } /** Destroy a WAL writer structure. */ @@ -1208,6 +1221,7 @@ wal_write_to_disk(struct cmsg *msg) stailq_concat(&wal_msg->rollback, &input); } else { xrow_buf_tx_commit(&writer->xrow_buf); + fiber_cond_signal(&writer->xrow_buf_cond); /* * Schedule processed entries to commit * and update the wal vclock. @@ -1302,6 +1316,7 @@ wal_writer_f(va_list ap) * should be done in the wal thread. */ xrow_buf_create(&writer->xrow_buf); + fiber_cond_create(&writer->xrow_buf_cond); /** Initialize eio in this thread */ coio_enable(); @@ -1347,6 +1362,9 @@ wal_writer_f(va_list ap) if (xlog_is_open(&vy_log_writer.xlog)) xlog_close(&vy_log_writer.xlog, false); + /* Inform relays that wal is exiting. */ + trigger_run(&writer->on_wal_exit, NULL); + cpipe_destroy(&writer->tx_prio_pipe); xrow_buf_destroy(&writer->xrow_buf); return 0; @@ -1614,49 +1632,6 @@ wal_notify_watchers(struct wal_writer *writer, unsigned events) wal_watcher_notify(watcher, events); } -struct wal_relay_status_update_msg { - struct cbus_call_msg base; - uint32_t replica_id; - struct vclock vclock; -}; - -static int -wal_relay_status_update_f(struct cbus_call_msg *base) -{ - struct wal_writer *writer = &wal_writer_singleton; - struct wal_relay_status_update_msg *msg = - container_of(base, struct wal_relay_status_update_msg, base); - struct vclock old_vclock; - mclock_extract_row(&writer->mclock, msg->replica_id, &old_vclock); - if (writer->gc_wal_vclock != NULL && - vclock_order_changed(&old_vclock, writer->gc_wal_vclock, - &msg->vclock)) - fiber_cond_signal(&writer->wal_gc_cond); - mclock_update(&writer->mclock, msg->replica_id, &msg->vclock); - return 0; -} - -void -wal_relay_status_update(uint32_t replica_id, const struct vclock *vclock) -{ - struct wal_writer *writer = &wal_writer_singleton; - struct wal_relay_status_update_msg msg; - /* - * We do not take anonymous replica in account. There is - * no way to distinguish them but anonynous replica could - * be rebootstrapped at any time. - */ - if (replica_id == 0) - return; - msg.replica_id = replica_id; - vclock_copy(&msg.vclock, vclock); - bool cancellable = fiber_set_cancellable(false); - cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, - &msg.base, wal_relay_status_update_f, NULL, - TIMEOUT_INFINITY); - fiber_set_cancellable(cancellable); -} - struct wal_relay_delete_msg { struct cmsg base; uint32_t replica_id; @@ -1705,3 +1680,452 @@ wal_atfork() if (xlog_is_open(&vy_log_writer.xlog)) xlog_atfork(&vy_log_writer.xlog); } + +/* + * Relay reader fiber function. + * Read xrow encoded vclocks sent by the replica. + */ +static int +wal_relay_reader_f(va_list ap) +{ + struct wal_writer *writer = va_arg(ap, struct wal_writer *); + struct wal_relay *wal_relay = va_arg(ap, struct wal_relay *); + uint32_t replica_id = wal_relay->replica->id; + + mclock_update(&writer->mclock, replica_id, &wal_relay->replica_vclock); + fiber_cond_signal(&writer->wal_gc_cond); + + struct ibuf ibuf; + struct ev_io io; + coio_create(&io, wal_relay->fd); + ibuf_create(&ibuf, &cord()->slabc, 1024); + while (!fiber_is_cancelled()) { + struct xrow_header row; + if (coio_read_xrow_timeout(&io, &ibuf, &row, + replication_disconnect_timeout()) < 0) { + if (diag_is_empty(&wal_relay->diag)) + diag_move(&fiber()->diag, &wal_relay->diag); + break; + } + + struct vclock cur_vclock; + /* vclock is followed while decoding, zeroing it. */ + vclock_create(&cur_vclock); + if (xrow_decode_vclock(&row, &cur_vclock) < 0) + break; + + if (writer->gc_wal_vclock != NULL && + vclock_order_changed(&wal_relay->replica_vclock, + writer->gc_wal_vclock, &cur_vclock)) + fiber_cond_signal(&writer->wal_gc_cond); + vclock_copy(&wal_relay->replica_vclock, &cur_vclock); + mclock_update(&writer->mclock, replica_id, &cur_vclock); + } + ibuf_destroy(&ibuf); + fiber_cancel(wal_relay->fiber); + return 0; +} + +struct wal_relay_stream { + struct xstream stream; + struct wal_relay *wal_relay; + struct ev_io io; +}; + +static int +wal_relay_stream_write(struct xstream *stream, struct xrow_header *row) +{ + struct wal_relay_stream *wal_relay_stream = + container_of(stream, struct wal_relay_stream, stream); + struct wal_relay *wal_relay = wal_relay_stream->wal_relay; + /* + * Remember the original row because filter could + * change it. + */ + struct xrow_header *orig_row = row; + switch (wal_relay->on_filter(wal_relay, &row)) { + case WAL_RELAY_FILTER_PASS: + case WAL_RELAY_FILTER_ROW: + break; + case WAL_RELAY_FILTER_SKIP: + return 0; + case WAL_RELAY_FILTER_ERR: + return -1; + } + ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); + + vclock_follow_xrow(&wal_relay->vclock, orig_row); + int rc = coio_write_xrow(&wal_relay_stream->io, row); + struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); + if (inj != NULL && inj->dparam > 0) + fiber_sleep(inj->dparam); + + return rc >= 0? 0: -1; +} + +/* Structure to provide arguments for file relaying cord. */ +struct wal_relay_from_file_args { + /* Wal writer. */ + struct wal_writer *writer; + /* Wal realy structure. */ + struct wal_relay *wal_relay; + /* Vclock to stop relaying on. */ + struct vclock stop_vclock; +}; + +/* + * Relay from file cord function. This cord read log and + * sends data to replica. + */ +static int +wal_relay_from_file_f(va_list ap) +{ + struct wal_relay_from_file_args *args = + va_arg(ap, struct wal_relay_from_file_args *); + /* Recover xlogs from files. */ + struct recovery *recovery = recovery_new(args->writer->wal_dir.dirname, + false, + &args->wal_relay->vclock); + if (recovery == NULL) + return -1; + struct wal_relay_stream wal_relay_stream; + xstream_create(&wal_relay_stream.stream, wal_relay_stream_write); + wal_relay_stream.wal_relay = args->wal_relay; + coio_create(&wal_relay_stream.io, args->wal_relay->fd); + + if (recover_remaining_wals(recovery, &wal_relay_stream.stream, + &args->stop_vclock, true) != 0) { + recovery_delete(recovery); + return -1; + } + recovery_delete(recovery); + return 0; +} + +static int +wal_relay_from_file(struct wal_writer *writer, struct wal_relay *wal_relay) +{ + struct wal_relay_from_file_args args; + args.writer = writer; + args.wal_relay = wal_relay; + + vclock_create(&args.stop_vclock); + if (vclock_is_set(&wal_relay->stop_vclock)) + vclock_copy(&args.stop_vclock, &wal_relay->stop_vclock); + else + vclock_copy(&args.stop_vclock, &writer->vclock); + + int rc = cord_costart(&wal_relay->cord, "file relay", + wal_relay_from_file_f, &args); + if (rc == 0) + rc = cord_cojoin(&wal_relay->cord); + return rc; +} + +static int +wal_relay_send_hearthbeat(struct ev_io *io) +{ + struct xrow_header hearthbeat; + xrow_encode_timestamp(&hearthbeat, instance_id, ev_now(loop())); + return coio_write_xrow(io, &hearthbeat); +} + +/* Wal relay fiber function. */ +static int +wal_relay_from_memory(struct wal_writer *writer, struct wal_relay *wal_relay) +{ + double last_row_time = 0; + struct xrow_buf_cursor cursor; + if (xrow_buf_cursor_create(&writer->xrow_buf, &cursor, + &wal_relay->vclock) != 0) + return 0; + struct ev_io io; + coio_create(&io, wal_relay->fd); + /* Cursor was created and then we can process rows one by one. */ + while (!fiber_is_cancelled()) { + if (vclock_is_set(&wal_relay->stop_vclock)) { + int rc = vclock_compare(&wal_relay->stop_vclock, + &wal_relay->vclock); + if (rc <= 0 && rc != VCLOCK_ORDER_UNDEFINED) + return 1; + } + struct xrow_header *row; + void *data; + size_t size; + int rc = xrow_buf_cursor_next(&writer->xrow_buf, &cursor, + &row, &data, &size); + if (rc < 0) { + /* + * Wal memory buffer was rotated and we are not in + * memory. + */ + return 0; + } + if (rc > 0) { + /* + * There are no more rows in a buffer. Wait + * until wal wrote new ones or timeout was + * exceeded and send a heartbeat message. + */ + double timeout = replication_timeout; + struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL, + ERRINJ_DOUBLE); + if (inj != NULL && inj->dparam != 0) + timeout = inj->dparam; + + fiber_cond_wait_deadline(&writer->xrow_buf_cond, + last_row_time + timeout); + if (ev_monotonic_now(loop()) - last_row_time > + timeout) { + /* Timeout was exceeded - send a heartbeat. */ + if (wal_relay_send_hearthbeat(&io) < 0) + return -1; + last_row_time = ev_monotonic_now(loop()); + } + continue; + } + ERROR_INJECT(ERRINJ_WAL_MEM_IGNORE, return 0); + /* + * Remember the original row because filter could + * change it. + */ + struct xrow_header *orig_row = row; + switch (wal_relay->on_filter(wal_relay, &row)) { + case WAL_RELAY_FILTER_PASS: + case WAL_RELAY_FILTER_ROW: + break; + case WAL_RELAY_FILTER_SKIP: + continue; + case WAL_RELAY_FILTER_ERR: + return -1; + } + + ERROR_INJECT(ERRINJ_RELAY_SEND_DELAY, { return 0;}); + + last_row_time = ev_monotonic_now(loop()); + if (coio_write_xrow(&io, row) < 0) + return -1; + vclock_follow_xrow(&wal_relay->vclock, orig_row); + struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); + if (inj != NULL && inj->dparam > 0) + fiber_sleep(inj->dparam); + } + return -1; +} + +static int +wal_relay_on_wal_exit(struct trigger *trigger, void *event) +{ + (void) event; + struct wal_relay *wal_relay = (struct wal_relay *)trigger->data; + if (wal_relay->cord.id > 0) + pthread_cancel(wal_relay->cord.id); + fiber_cancel(wal_relay->fiber); + wal_relay->is_wal_exit = true; + return 0; +} + +/* Wake relay when wal_relay finished. */ +static void +wal_relay_done(struct cmsg *base) +{ + struct wal_relay *msg = + container_of(base, struct wal_relay, base); + msg->done = true; + fiber_cond_signal(&msg->done_cond); +} + +static int +wal_relay_f(va_list ap) +{ + struct wal_writer *writer = &wal_writer_singleton; + struct wal_relay *wal_relay = va_arg(ap, struct wal_relay *); + + struct trigger on_wal_exit; + trigger_create(&on_wal_exit, wal_relay_on_wal_exit, wal_relay, NULL); + trigger_add(&writer->on_wal_exit, &on_wal_exit); + + struct fiber *reader = NULL; + if (wal_relay->replica != NULL && wal_relay->replica->id != REPLICA_ID_NIL) { + /* Start fiber for receiving replica acks. */ + char name[FIBER_NAME_MAX]; + snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader"); + reader = fiber_new(name, wal_relay_reader_f); + if (reader == NULL) { + diag_move(&fiber()->diag, &wal_relay->diag); + return 0; + } + fiber_set_joinable(reader, true); + fiber_start(reader, writer, wal_relay); + + struct ev_io io; + coio_create(&io, wal_relay->fd); + if (wal_relay_send_hearthbeat(&io) < 0) + goto done; + } + + while (wal_relay_from_memory(writer, wal_relay) == 0 && + wal_relay_from_file(writer, wal_relay) == 0); + +done: + if (wal_relay->is_wal_exit) + return 0; + trigger_clear(&on_wal_exit); + if (diag_is_empty(&wal_relay->diag)) + diag_move(&fiber()->diag, &wal_relay->diag); + + if (reader != NULL) { + /* Join ack reader fiber. */ + fiber_cancel(reader); + fiber_join(reader); + } + if (wal_relay->is_wal_exit) + return 0; + + static struct cmsg_hop done_route[] = { + {wal_relay_done, NULL} + }; + cmsg_init(&wal_relay->base, done_route); + cpipe_push(&writer->tx_prio_pipe, &wal_relay->base); + wal_relay->fiber = NULL; + return 0; +} + +static void +wal_relay_attach(struct cmsg *base) +{ + struct wal_relay *wal_relay = container_of(base, struct wal_relay, base); + wal_relay->fiber = fiber_new("wal relay fiber", wal_relay_f); + wal_relay->cord.id = 0; + wal_relay->is_wal_exit = false; + fiber_start(wal_relay->fiber, wal_relay); +} + +static void +wal_relay_cancel(struct cmsg *base) +{ + struct wal_relay *wal_relay = container_of(base, struct wal_relay, + cancel_msg); + /* + * A relay was cancelled so cancel corresponding + * fiber in the wal thread if it still alive. + */ + if (wal_relay->fiber != NULL) + fiber_cancel(wal_relay->fiber); +} + +int +wal_relay(struct wal_relay *wal_relay, const struct vclock *vclock, + const struct vclock *stop_vclock, wal_relay_filter_cb on_filter, int fd, + struct replica *replica) +{ + struct wal_writer *writer = &wal_writer_singleton; + vclock_copy(&wal_relay->vclock, vclock); + vclock_create(&wal_relay->stop_vclock); + if (stop_vclock != NULL) + vclock_copy(&wal_relay->stop_vclock, stop_vclock); + else + vclock_clear(&wal_relay->stop_vclock); + wal_relay->on_filter = on_filter; + wal_relay->fd = fd; + wal_relay->replica = replica; + diag_create(&wal_relay->diag); + wal_relay->cancel_msg.route = NULL; + + fiber_cond_create(&wal_relay->done_cond); + wal_relay->done = false; + + static struct cmsg_hop route[] = { + {wal_relay_attach, NULL} + }; + cmsg_init(&wal_relay->base, route); + cpipe_push(&writer->wal_pipe, &wal_relay->base); + + /* + * We do not use cbus_call because we should be able to + * process this fiber cancelation and send a cancel request + * to the wal cord to force wal detach. + */ + while (!wal_relay->done) { + if (fiber_is_cancelled() && + wal_relay->cancel_msg.route == NULL) { + /* Send a cancel message to a wal relay fiber. */ + static struct cmsg_hop cancel_route[]= { + {wal_relay_cancel, NULL}}; + cmsg_init(&wal_relay->cancel_msg, cancel_route); + cpipe_push(&writer->wal_pipe, &wal_relay->cancel_msg); + } + fiber_cond_wait(&wal_relay->done_cond); + } + + if (!diag_is_empty(&wal_relay->diag)) { + diag_move(&wal_relay->diag, &fiber()->diag); + return -1; + } + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + return -1; + } + return 0; +} + +struct wal_relay_vclock_msg { + struct cbus_call_msg base; + const struct wal_relay *wal_relay; + struct vclock *vclock; +}; + +static int +wal_relay_vclock_f(struct cbus_call_msg *base) +{ + struct wal_relay_vclock_msg *msg = + container_of(base, struct wal_relay_vclock_msg, base); + vclock_copy(msg->vclock, &msg->wal_relay->replica_vclock); + return 0; +} + +int +wal_relay_vclock(const struct wal_relay *wal_relay, struct vclock *vclock) +{ + struct wal_writer *writer = &wal_writer_singleton; + + struct wal_relay_vclock_msg msg; + msg.wal_relay = wal_relay; + msg.vclock = vclock; + bool cancellable = fiber_set_cancellable(false); + int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, + &msg.base, wal_relay_vclock_f, NULL, + TIMEOUT_INFINITY); + fiber_set_cancellable(cancellable); + return rc; +} + +struct wal_relay_last_row_time_msg { + struct cbus_call_msg base; + const struct wal_relay *wal_relay; + double last_row_time; +}; + +static int +wal_relay_last_row_time_f(struct cbus_call_msg *base) +{ + struct wal_relay_last_row_time_msg *msg = + container_of(base, struct wal_relay_last_row_time_msg, base); + msg->last_row_time = msg->wal_relay->last_row_time; + return 0; +} + +double +wal_relay_last_row_time(const struct wal_relay *wal_relay) +{ + struct wal_writer *writer = &wal_writer_singleton; + + struct wal_relay_last_row_time_msg msg; + msg.wal_relay = wal_relay; + bool cancellable = fiber_set_cancellable(false); + cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, + &msg.base, wal_relay_last_row_time_f, NULL, + TIMEOUT_INFINITY); + fiber_set_cancellable(cancellable); + return msg.last_row_time; +} diff --git a/src/box/wal.h b/src/box/wal.h index 86887656d..a84c976c7 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -36,6 +36,7 @@ #include "cbus.h" #include "journal.h" #include "vclock.h" +#include "xstream.h" struct fiber; struct wal_writer; @@ -236,12 +237,6 @@ wal_set_gc_first_vclock(const struct vclock *vclock); void wal_set_checkpoint_threshold(int64_t threshold); -/** - * Update a wal consumer vclock position. - */ -void -wal_relay_status_update(uint32_t replica_id, const struct vclock *vclock); - /** * Unregister a wal consumer. */ @@ -263,6 +258,91 @@ wal_write_vy_log(struct journal_entry *req); void wal_rotate_vy_log(); +struct replica; +struct wal_relay; + +#define WAL_RELAY_FILTER_ERR -1 +#define WAL_RELAY_FILTER_PASS 0 +#define WAL_RELAY_FILTER_ROW 1 +#define WAL_RELAY_FILTER_SKIP 2 + +typedef ssize_t (*wal_relay_filter_cb)(struct wal_relay *wal_relay, + struct xrow_header **row); + +/** + * Wal relay maintains wal memory tracking and allows + * to retrieve logged xrows direct from the wal memory. + */ +struct wal_relay { + struct cmsg base; + /** Current wal reay position. */ + struct vclock vclock; + /** Vclock to stop relaying. */ + struct vclock stop_vclock; + /** Replica socket handle. */ + int fd; + /** + * Filter function callback points which row should + * be passed to replica, replaced by NOP or other row + * or skiiped out. + */ + wal_relay_filter_cb on_filter; + /** + * Relay working fiber preserved in order to cancel + * when relaying is canceled. + */ + struct fiber *fiber; + /** Message to cancel relaying fiber. */ + struct cmsg cancel_msg; + /** Fiber condition is signalled relaying is stopped. */ + struct fiber_cond done_cond; + /** Turns to true when relaying was stopped. */ + bool done; + /** Return code. */ + int rc; + /** Diagnostic area. */ + struct diag diag; + /** Replica which consumes relayed logs. */ + struct replica *replica; + /** Vclock reported by replica. */ + struct vclock replica_vclock; + /** Last transmission time. */ + double last_row_time; + /** Cord spawned to relay from files. */ + struct cord cord; + /** True if the relay was signalled about wal exit. */ + bool is_wal_exit; +}; + +/** + * A function to start fetching rows direct from wal memory buffer. + * This function initiates connection with a wal and starts + * a fiber which handles wal memory cursor and yields until + * the fiber exited because of the cursor was outdated or a + * row sending error. When a fiber called this function was + * cancelled then special cancel message will be send in order + * to stop relaying fiber. + * + * @param wal_relay a wal relay structure to put all temporay + * values in + * @param vclock a vclock to start relaying from + * @param on_filter a callback to patch relaying rows + * @param fd replica socket handler + * @param replica client replica which consumer logs + * @retval 0 relaying was finished because of cursor is our of date + * @retval -1 relaying was finished because of an error. + */ +int +wal_relay(struct wal_relay *wal_relay, const struct vclock *vclock, + const struct vclock *stop_vclock, wal_relay_filter_cb on_filter, + int fd, struct replica *replica); + +int +wal_relay_vclock(const struct wal_relay *wal_relay, struct vclock *vclock); + +double +wal_relay_last_row_time(const struct wal_relay *wal_relay); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h index 672da2119..c0025000a 100644 --- a/src/lib/core/errinj.h +++ b/src/lib/core/errinj.h @@ -135,6 +135,7 @@ struct errinj { _(ERRINJ_COIO_SENDFILE_CHUNK, ERRINJ_INT, {.iparam = -1}) \ _(ERRINJ_SWIM_FD_ONLY, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_DYN_MODULE_COUNT, ERRINJ_INT, {.iparam = 0}) \ + _(ERRINJ_WAL_MEM_IGNORE, ERRINJ_BOOL, {.bparam = false}) \ ENUM0(errinj_id, ERRINJ_LIST); extern struct errinj errinjs[]; diff --git a/test/box-py/iproto.test.py b/test/box-py/iproto.test.py index 77637d8ed..788df58f8 100644 --- a/test/box-py/iproto.test.py +++ b/test/box-py/iproto.test.py @@ -293,9 +293,13 @@ uuid = '0d5bd431-7f3e-4695-a5c2-82de0a9cbc95' header = { IPROTO_CODE: REQUEST_TYPE_JOIN, IPROTO_SYNC: 2334 } body = { IPROTO_SERVER_UUID: uuid } resp = test_request(header, body) +# In memory replication would not support the +# sync field while final join or subscribe for +# replied rows because it requires +# to reencode of each row stored in a memory buffer. if resp['header'][IPROTO_SYNC] == 2334: i = 1 - while i < 3: + while i < 2: resp = receive_response() if resp['header'][IPROTO_SYNC] != 2334: print 'Bad sync on response with number ', i @@ -306,6 +310,9 @@ if resp['header'][IPROTO_SYNC] == 2334: print 'Sync ok' else: print 'Bad first sync' +# read until the third OK +while receive_response()['header'][IPROTO_CODE] != REQUEST_TYPE_OK: + pass # # Try incorrect JOIN. SYNC must be also returned. diff --git a/test/box/errinj.result b/test/box/errinj.result index babe36b1b..62c0832ef 100644 --- a/test/box/errinj.result +++ b/test/box/errinj.result @@ -23,132 +23,134 @@ errinj.info() --- - ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT: state: 0 - ERRINJ_WAL_WRITE: - state: false - ERRINJ_RELAY_BREAK_LSN: + ERRINJ_WAL_BREAK_LSN: state: -1 - ERRINJ_HTTPC_EXECUTE: - state: false ERRINJ_VYRUN_DATA_READ: state: false - ERRINJ_SWIM_FD_ONLY: - state: false - ERRINJ_SQL_NAME_NORMALIZATION: - state: false ERRINJ_VY_SCHED_TIMEOUT: state: 0 - ERRINJ_COIO_SENDFILE_CHUNK: - state: -1 ERRINJ_HTTP_RESPONSE_ADD_WAIT: state: false - ERRINJ_WAL_WRITE_PARTIAL: - state: -1 - ERRINJ_VY_GC: - state: false - ERRINJ_WAL_DELAY: - state: false - ERRINJ_INDEX_ALLOC: - state: false ERRINJ_WAL_WRITE_EOF: state: false - ERRINJ_WAL_SYNC: - state: false - ERRINJ_BUILD_INDEX: - state: -1 ERRINJ_BUILD_INDEX_DELAY: state: false - ERRINJ_VY_RUN_FILE_RENAME: - state: false - ERRINJ_VY_COMPACTION_DELAY: - state: false - ERRINJ_VY_DUMP_DELAY: - state: false ERRINJ_VY_DELAY_PK_LOOKUP: state: false - ERRINJ_VY_TASK_COMPLETE: - state: false - ERRINJ_PORT_DUMP: + ERRINJ_VY_POINT_ITER_WAIT: state: false - ERRINJ_WAL_BREAK_LSN: - state: -1 ERRINJ_WAL_IO: state: false - ERRINJ_WAL_FALLOCATE: - state: 0 - ERRINJ_DYN_MODULE_COUNT: - state: 0 ERRINJ_VY_INDEX_FILE_RENAME: state: false ERRINJ_TUPLE_FORMAT_COUNT: state: -1 ERRINJ_TUPLE_ALLOC: state: false - ERRINJ_VY_RUN_WRITE_DELAY: + ERRINJ_VY_RUN_FILE_RENAME: state: false ERRINJ_VY_READ_PAGE: state: false ERRINJ_RELAY_REPORT_INTERVAL: state: 0 - ERRINJ_VY_LOG_FILE_RENAME: - state: false - ERRINJ_VY_READ_PAGE_TIMEOUT: - state: 0 + ERRINJ_RELAY_BREAK_LSN: + state: -1 ERRINJ_XLOG_META: state: false - ERRINJ_SIO_READ_MAX: - state: -1 ERRINJ_SNAP_COMMIT_DELAY: state: false - ERRINJ_WAL_WRITE_DISK: + ERRINJ_VY_RUN_WRITE: state: false - ERRINJ_SNAP_WRITE_DELAY: + ERRINJ_BUILD_INDEX: + state: -1 + ERRINJ_RELAY_FINAL_JOIN: + state: false + ERRINJ_REPLICA_JOIN_DELAY: state: false ERRINJ_LOG_ROTATE: state: false - ERRINJ_VY_RUN_WRITE: + ERRINJ_MEMTX_DELAY_GC: state: false - ERRINJ_CHECK_FORMAT_DELAY: + ERRINJ_XLOG_GARBAGE: + state: false + ERRINJ_VY_READ_PAGE_DELAY: + state: false + ERRINJ_SWIM_FD_ONLY: state: false + ERRINJ_WAL_WRITE: + state: false + ERRINJ_HTTPC_EXECUTE: + state: false + ERRINJ_SQL_NAME_NORMALIZATION: + state: false + ERRINJ_WAL_WRITE_PARTIAL: + state: -1 + ERRINJ_VY_GC: + state: false + ERRINJ_WAL_DELAY: + state: false + ERRINJ_XLOG_READ: + state: -1 + ERRINJ_WAL_SYNC: + state: false + ERRINJ_VY_TASK_COMPLETE: + state: false + ERRINJ_PORT_DUMP: + state: false + ERRINJ_COIO_SENDFILE_CHUNK: + state: -1 + ERRINJ_DYN_MODULE_COUNT: + state: 0 + ERRINJ_SIO_READ_MAX: + state: -1 + ERRINJ_WAL_MEM_IGNORE: + state: false + ERRINJ_RELAY_TIMEOUT: + state: 0 + ERRINJ_VY_DUMP_DELAY: + state: false + ERRINJ_VY_SQUASH_TIMEOUT: + state: 0 ERRINJ_VY_LOG_FLUSH_DELAY: state: false - ERRINJ_RELAY_FINAL_JOIN: + ERRINJ_RELAY_SEND_DELAY: state: false - ERRINJ_REPLICA_JOIN_DELAY: + ERRINJ_VY_COMPACTION_DELAY: state: false - ERRINJ_RELAY_FINAL_SLEEP: + ERRINJ_VY_LOG_FILE_RENAME: state: false ERRINJ_VY_RUN_DISCARD: state: false ERRINJ_WAL_ROTATE: state: false - ERRINJ_RELAY_EXIT_DELAY: + ERRINJ_VY_READ_PAGE_TIMEOUT: state: 0 - ERRINJ_VY_POINT_ITER_WAIT: + ERRINJ_VY_INDEX_DUMP: + state: -1 + ERRINJ_TUPLE_FIELD: state: false - ERRINJ_MEMTX_DELAY_GC: + ERRINJ_SNAP_WRITE_DELAY: state: false ERRINJ_IPROTO_TX_DELAY: state: false - ERRINJ_XLOG_READ: - state: -1 - ERRINJ_TUPLE_FIELD: + ERRINJ_RELAY_EXIT_DELAY: + state: 0 + ERRINJ_RELAY_FINAL_SLEEP: state: false - ERRINJ_XLOG_GARBAGE: + ERRINJ_WAL_WRITE_DISK: state: false - ERRINJ_VY_INDEX_DUMP: - state: -1 - ERRINJ_VY_READ_PAGE_DELAY: + ERRINJ_CHECK_FORMAT_DELAY: state: false ERRINJ_TESTING: state: false - ERRINJ_RELAY_SEND_DELAY: + ERRINJ_VY_RUN_WRITE_DELAY: state: false - ERRINJ_VY_SQUASH_TIMEOUT: + ERRINJ_WAL_FALLOCATE: state: 0 ERRINJ_VY_LOG_FLUSH: state: false - ERRINJ_RELAY_TIMEOUT: - state: 0 + ERRINJ_INDEX_ALLOC: + state: false ... errinj.set("some-injection", true) --- diff --git a/test/replication/force_recovery.result b/test/replication/force_recovery.result index f50452858..e48c12657 100644 --- a/test/replication/force_recovery.result +++ b/test/replication/force_recovery.result @@ -16,6 +16,10 @@ _ = box.space.test:create_index('primary') box.schema.user.grant('guest', 'replication') --- ... +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true) +--- +- ok +... -- Deploy a replica. test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'") --- @@ -86,6 +90,10 @@ test_run:cmd("switch default") box.cfg{force_recovery = false} --- ... +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false) +--- +- ok +... -- Cleanup. test_run:cmd("stop server test") --- diff --git a/test/replication/force_recovery.test.lua b/test/replication/force_recovery.test.lua index 54307814b..c08bb9c02 100644 --- a/test/replication/force_recovery.test.lua +++ b/test/replication/force_recovery.test.lua @@ -8,6 +8,7 @@ _ = box.schema.space.create('test') _ = box.space.test:create_index('primary') box.schema.user.grant('guest', 'replication') +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true) -- Deploy a replica. test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'") test_run:cmd("start server test") @@ -33,6 +34,7 @@ box.space.test:select() box.info.replication[1].upstream.status == 'stopped' or box.info test_run:cmd("switch default") box.cfg{force_recovery = false} +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false) -- Cleanup. test_run:cmd("stop server test") diff --git a/test/replication/replica_rejoin.result b/test/replication/replica_rejoin.result index f71292da1..187634c62 100644 --- a/test/replication/replica_rejoin.result +++ b/test/replication/replica_rejoin.result @@ -184,6 +184,10 @@ test_run:cmd("stop server replica") - true ... test_run:cmd("restart server default") +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true) +--- +- ok +... checkpoint_count = box.cfg.checkpoint_count --- ... @@ -368,6 +372,10 @@ test_run:cmd("switch default") --- - true ... +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false) +--- +- ok +... box.cfg{replication = ''} --- ... diff --git a/test/replication/replica_rejoin.test.lua b/test/replication/replica_rejoin.test.lua index 22a91d8d7..3ee98bc85 100644 --- a/test/replication/replica_rejoin.test.lua +++ b/test/replication/replica_rejoin.test.lua @@ -70,6 +70,7 @@ box.space.test:replace{1, 2, 3} -- bumps LSN on the replica test_run:cmd("switch default") test_run:cmd("stop server replica") test_run:cmd("restart server default") +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true) checkpoint_count = box.cfg.checkpoint_count box.cfg{checkpoint_count = 1} for i = 1, 3 do box.space.test:delete{i * 10} end @@ -135,6 +136,7 @@ box.space.test:replace{2} -- Cleanup. test_run:cmd("switch default") +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false) box.cfg{replication = ''} test_run:cmd("stop server replica") test_run:cmd("cleanup server replica") diff --git a/test/replication/show_error_on_disconnect.result b/test/replication/show_error_on_disconnect.result index 48003db06..e6920c160 100644 --- a/test/replication/show_error_on_disconnect.result +++ b/test/replication/show_error_on_disconnect.result @@ -20,6 +20,10 @@ test_run:cmd("switch master_quorum1") --- - true ... +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true) +--- +- ok +... repl = box.cfg.replication --- ... @@ -30,6 +34,10 @@ test_run:cmd("switch master_quorum2") --- - true ... +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true) +--- +- ok +... box.space.test:insert{1} --- - [1] diff --git a/test/replication/show_error_on_disconnect.test.lua b/test/replication/show_error_on_disconnect.test.lua index 1b0ea4373..2a944dfc3 100644 --- a/test/replication/show_error_on_disconnect.test.lua +++ b/test/replication/show_error_on_disconnect.test.lua @@ -10,9 +10,11 @@ SERVERS = {'master_quorum1', 'master_quorum2'} test_run:create_cluster(SERVERS) test_run:wait_fullmesh(SERVERS) test_run:cmd("switch master_quorum1") +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true) repl = box.cfg.replication box.cfg{replication = ""} test_run:cmd("switch master_quorum2") +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true) box.space.test:insert{1} box.snapshot() box.space.test:insert{2} diff --git a/test/replication/suite.ini b/test/replication/suite.ini index ed1de3140..23be12528 100644 --- a/test/replication/suite.ini +++ b/test/replication/suite.ini @@ -3,7 +3,7 @@ core = tarantool script = master.lua description = tarantool/box, replication disabled = consistent.test.lua -release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua +release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua force_recovery.test.lua show_error_on_disconnect.test.lua replica_rejoin.test.lua config = suite.cfg lua_libs = lua/fast_replica.lua lua/rlimit.lua use_unix_sockets = True diff --git a/test/xlog/panic_on_wal_error.result b/test/xlog/panic_on_wal_error.result index 22f14f912..897116b3b 100644 --- a/test/xlog/panic_on_wal_error.result +++ b/test/xlog/panic_on_wal_error.result @@ -19,6 +19,10 @@ _ = box.space.test:create_index('pk') -- reopen xlog -- test_run:cmd("restart server default") +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true) +--- +- ok +... box.space.test ~= nil --- - true @@ -68,6 +72,10 @@ test_run:cmd("stop server replica") - true ... test_run:cmd("restart server default") +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true) +--- +- ok +... box.space.test:auto_increment{'after snapshot'} --- - [2, 'after snapshot'] @@ -153,6 +161,10 @@ test_run:cmd("switch default") --- - true ... +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false) +--- +- ok +... test_run:cmd("stop server replica") --- - true diff --git a/test/xlog/panic_on_wal_error.test.lua b/test/xlog/panic_on_wal_error.test.lua index 2e95431c6..d973a00ff 100644 --- a/test/xlog/panic_on_wal_error.test.lua +++ b/test/xlog/panic_on_wal_error.test.lua @@ -10,6 +10,7 @@ _ = box.space.test:create_index('pk') -- reopen xlog -- test_run:cmd("restart server default") +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true) box.space.test ~= nil -- insert some stuff -- @@ -32,6 +33,7 @@ box.space.test:select{} test_run:cmd("switch default") test_run:cmd("stop server replica") test_run:cmd("restart server default") +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true) box.space.test:auto_increment{'after snapshot'} box.space.test:auto_increment{'after snapshot - one more row'} -- @@ -67,6 +69,7 @@ box.space.test:select{} -- -- test_run:cmd("switch default") +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false) test_run:cmd("stop server replica") test_run:cmd("cleanup server replica") -- diff --git a/test/xlog/suite.ini b/test/xlog/suite.ini index 689d2b871..c208c73c4 100644 --- a/test/xlog/suite.ini +++ b/test/xlog/suite.ini @@ -4,7 +4,7 @@ description = tarantool write ahead log tests script = xlog.lua disabled = snap_io_rate.test.lua upgrade.test.lua valgrind_disabled = -release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua checkpoint_threshold.test.lua +release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua checkpoint_threshold.test.lua panic_on_wal_error.test.lua config = suite.cfg use_unix_sockets = True use_unix_sockets_iproto = True -- 2.25.0