[tarantool-patches] [PATCH v2 4/4] replication: use wal memory buffer to fetch rows
Georgy Kirichenko
georgy at tarantool.org
Wed Sep 18 12:36:11 MSK 2019
A relay tries to create a wal memory cursor and follow them to relay
data to it's replica. If a relay failed to attach to a wal memory buffer
or went out of the buffer then the relay recovers xlogs from files and
the makes a new try to attach.
Closes: #3794
---
src/box/relay.cc | 169 +++++++++---------
src/box/wal.c | 155 ++++++++++++++++
src/box/wal.h | 36 ++++
src/lib/core/cbus.c | 4 +
src/lib/core/errinj.h | 1 +
test/box/errinj.result | 2 +
test/replication/force_recovery.result | 8 +
test/replication/force_recovery.test.lua | 2 +
test/replication/replica_rejoin.result | 8 +
test/replication/replica_rejoin.test.lua | 2 +
.../show_error_on_disconnect.result | 8 +
.../show_error_on_disconnect.test.lua | 2 +
test/replication/suite.ini | 2 +-
test/xlog/panic_on_wal_error.result | 12 ++
test/xlog/panic_on_wal_error.test.lua | 3 +
test/xlog/suite.ini | 2 +-
16 files changed, 327 insertions(+), 89 deletions(-)
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 21674119d..1e65d6d56 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -76,16 +76,16 @@ struct relay {
uint64_t sync;
/** Recovery instance to read xlog from the disk */
struct recovery *r;
+ /** Wal directory. */
+ char *wal_dir;
/** Xstream argument to recovery */
struct xstream stream;
/** Vclock to stop playing xlogs */
struct vclock stop_vclock;
/** Remote replica */
struct replica *replica;
- /** WAL event watcher. */
- struct wal_watcher wal_watcher;
- /** Relay reader cond. */
- struct fiber_cond reader_cond;
+ /** WAL memory relay. */
+ struct wal_relay wal_relay;
/** Relay diagnostics. */
struct diag diag;
/** Vclock recieved from replica. */
@@ -117,6 +117,8 @@ struct relay {
/** Known relay vclock. */
struct vclock vclock;
} tx;
+ /** vclock sent by relay. */
+ struct vclock relay_vclock;
};
struct diag*
@@ -161,9 +163,9 @@ relay_new(struct replica *replica)
}
relay->replica = replica;
relay->last_row_time = ev_monotonic_now(loop());
- fiber_cond_create(&relay->reader_cond);
diag_create(&relay->diag);
relay->state = RELAY_OFF;
+ relay->r = NULL;
return relay;
}
@@ -211,7 +213,8 @@ relay_exit(struct relay *relay)
* cursor, which must be closed in the same thread
* that opened it (it uses cord's slab allocator).
*/
- recovery_delete(relay->r);
+ if (relay->r != NULL)
+ recovery_delete(relay->r);
relay->r = NULL;
}
@@ -235,8 +238,8 @@ relay_delete(struct relay *relay)
{
if (relay->state == RELAY_FOLLOW)
relay_stop(relay);
- fiber_cond_destroy(&relay->reader_cond);
diag_destroy(&relay->diag);
+ free(relay->wal_dir);
TRASH(relay);
free(relay);
}
@@ -357,31 +360,6 @@ relay_set_error(struct relay *relay, struct error *e)
diag_add_error(&relay->diag, e);
}
-static void
-relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
-{
- struct relay *relay = container_of(watcher, struct relay, wal_watcher);
- if (fiber_is_cancelled()) {
- /*
- * The relay is exiting. Rescanning the WAL at this
- * point would be pointless and even dangerous,
- * because the relay could have written a packet
- * fragment to the socket before being cancelled
- * so that writing another row to the socket would
- * lead to corrupted replication stream and, as
- * a result, permanent replication breakdown.
- */
- return;
- }
- try {
- recover_remaining_wals(relay->r, &relay->stream, NULL,
- (events & WAL_EVENT_ROTATE) != 0);
- } catch (Exception *e) {
- relay_set_error(relay, e);
- fiber_cancel(fiber());
- }
-}
-
/*
* Relay reader fiber function.
* Read xrow encoded vclocks sent by the replica.
@@ -404,7 +382,24 @@ relay_reader_f(va_list ap)
/* vclock is followed while decoding, zeroing it. */
vclock_create(&relay->recv_vclock);
xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
- fiber_cond_signal(&relay->reader_cond);
+ if (relay->status_msg.msg.route != NULL)
+ continue;
+ struct vclock *send_vclock;
+ if (relay->version_id < version_id(1, 7, 4))
+ send_vclock = &relay->r->vclock;
+ else
+ send_vclock = &relay->recv_vclock;
+ if (vclock_sum(&relay->status_msg.vclock) ==
+ vclock_sum(send_vclock))
+ continue;
+ static const struct cmsg_hop route[] = {
+ {tx_status_update, NULL}
+ };
+ cmsg_init(&relay->status_msg.msg, route);
+ vclock_copy(&relay->status_msg.vclock,
+ send_vclock);
+ relay->status_msg.relay = relay;
+ cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
}
} catch (Exception *e) {
relay_set_error(relay, e);
@@ -430,6 +425,27 @@ relay_send_heartbeat(struct relay *relay)
}
}
+static int
+relay_send_cb(struct xrow_header *row, void *data)
+{
+ try {
+ struct relay *relay = (struct relay *)data;
+ relay_send_row(&relay->stream, row);
+ } catch (Exception *e) {
+ return -1;
+ }
+ return 0;
+}
+
+static void
+relay_endpoint_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
+{
+ (void) loop;
+ (void) events;
+ struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data;
+ cbus_process(endpoint);
+}
+
/**
* A libev callback invoked when a relay client socket is ready
* for read. This currently only happens when the client closes
@@ -439,21 +455,16 @@ static int
relay_subscribe_f(va_list ap)
{
struct relay *relay = va_arg(ap, struct relay *);
- struct recovery *r = relay->r;
coio_enable();
relay_set_cord_name(relay->io.fd);
/* Create cpipe to tx for propagating vclock. */
cbus_endpoint_create(&relay->endpoint, tt_sprintf("relay_%p", relay),
- fiber_schedule_cb, fiber());
+ relay_endpoint_cb, &relay->endpoint);
cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
&relay->relay_pipe, NULL, NULL, cbus_process);
- /* Setup WAL watcher for sending new rows to the replica. */
- wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
- relay_process_wal_event, cbus_process);
-
/* Start fiber for receiving replica acks. */
char name[FIBER_NAME_MAX];
snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
@@ -469,50 +480,28 @@ relay_subscribe_f(va_list ap)
*/
relay_send_heartbeat(relay);
- /*
- * Run the event loop until the connection is broken
- * or an error occurs.
- */
while (!fiber_is_cancelled()) {
- double timeout = replication_timeout;
- struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
- ERRINJ_DOUBLE);
- if (inj != NULL && inj->dparam != 0)
- timeout = inj->dparam;
-
- fiber_cond_wait_deadline(&relay->reader_cond,
- relay->last_row_time + timeout);
-
- /*
- * The fiber can be woken by IO cancel, by a timeout of
- * status messaging or by an acknowledge to status message.
- * Handle cbus messages first.
- */
- cbus_process(&relay->endpoint);
- /* Check for a heartbeat timeout. */
- if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
- relay_send_heartbeat(relay);
- /*
- * Check that the vclock has been updated and the previous
- * status message is delivered
- */
- if (relay->status_msg.msg.route != NULL)
- continue;
- struct vclock *send_vclock;
- if (relay->version_id < version_id(1, 7, 4))
- send_vclock = &r->vclock;
- else
- send_vclock = &relay->recv_vclock;
- if (vclock_sum(&relay->status_msg.vclock) ==
- vclock_sum(send_vclock))
- continue;
- static const struct cmsg_hop route[] = {
- {tx_status_update, NULL}
+ /* Try to relay direct from wal memory buffer. */
+ if (wal_relay(&relay->wal_relay, &relay->relay_vclock,
+ relay_send_cb, relay,
+ tt_sprintf("relay_%p", relay)) != 0) {
+ relay_set_error(relay, diag_last_error(&fiber()->diag));
+ break;
};
- cmsg_init(&relay->status_msg.msg, route);
- vclock_copy(&relay->status_msg.vclock, send_vclock);
- relay->status_msg.relay = relay;
- cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
+ /* Recover xlogs from files. */
+ try {
+ relay->r = recovery_new(relay->wal_dir, false,
+ &relay->relay_vclock);
+ auto relay_guard = make_scoped_guard([&] {
+ recovery_delete(relay->r);
+ relay->r = NULL;
+ });
+ recover_remaining_wals(relay->r, &relay->stream,
+ NULL, true);
+ } catch (Exception *e) {
+ relay_set_error(relay, e);
+ break;
+ }
}
/*
@@ -524,9 +513,6 @@ relay_subscribe_f(va_list ap)
diag_log();
say_crit("exiting the relay loop");
- /* Clear WAL watcher. */
- wal_clear_watcher(&relay->wal_watcher, cbus_process);
-
/* Join ack reader fiber. */
fiber_cancel(reader);
fiber_join(reader);
@@ -553,6 +539,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
* unless it has already been registered by initial
* join.
*/
+ vclock_copy(&relay->relay_vclock, replica_clock);
if (replica->gc == NULL) {
replica->gc = gc_consumer_register(replica_clock, "replica %s",
tt_uuid_str(&replica->uuid));
@@ -567,8 +554,13 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
});
vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
- relay->r = recovery_new(cfg_gets("wal_dir"), false,
- replica_clock);
+ relay->r = NULL;
+ relay->wal_dir = strdup(cfg_gets("wal_dir"));
+ if (relay->wal_dir == NULL) {
+ diag_set(OutOfMemory, strlen(cfg_gets("wal_dir")),
+ "runtime", "wal_dir");
+ diag_raise();
+ }
vclock_copy(&relay->tx.vclock, replica_clock);
relay->version_id = replica_version_id;
@@ -612,7 +604,10 @@ static void
relay_send_row(struct xstream *stream, struct xrow_header *packet)
{
struct relay *relay = container_of(stream, struct relay, stream);
- assert(iproto_type_is_dml(packet->type));
+ if (packet->type != IPROTO_OK) {
+ assert(iproto_type_is_dml(packet->type));
+ vclock_follow_xrow(&relay->relay_vclock, packet);
+ }
/*
* Transform replica local requests to IPROTO_NOP so as to
* promote vclock on the replica without actually modifying
diff --git a/src/box/wal.c b/src/box/wal.c
index e77bd1ae1..0d900270c 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -159,6 +159,8 @@ struct wal_writer
struct rlist watchers;
/** Xrow buffer. */
struct xrow_buf xrow_buf;
+ /* xrow buffer condition signaled when buffer write was done. */
+ struct fiber_cond xrow_buf_cond;
};
struct wal_msg {
@@ -1066,6 +1068,7 @@ wal_write_to_disk(struct cmsg *msg)
writer->checkpoint_wal_size += rc;
last_committed = stailq_last(&wal_msg->commit);
vclock_merge(&writer->vclock, &vclock_diff);
+ fiber_cond_broadcast(&writer->xrow_buf_cond);
/*
* Notify TX if the checkpoint threshold has been exceeded.
@@ -1131,6 +1134,7 @@ wal_writer_f(va_list ap)
(void) ap;
struct wal_writer *writer = &wal_writer_singleton;
xrow_buf_create(&writer->xrow_buf);
+ fiber_cond_create(&writer->xrow_buf_cond);
/** Initialize eio in this thread */
coio_enable();
@@ -1451,3 +1455,154 @@ wal_atfork()
if (xlog_is_open(&vy_log_writer.xlog))
xlog_atfork(&vy_log_writer.xlog);
}
+
+/* Wake relay when wal_relay exited. */
+static void
+wal_relay_done(struct cmsg *base)
+{
+ struct wal_relay *msg =
+ container_of(base, struct wal_relay, base);
+ msg->done = true;
+ fiber_cond_signal(&msg->done_cond);
+}
+
+/* Wal relay fiber function. */
+static int
+wal_relay_f(va_list ap)
+{
+ struct wal_writer *writer = &wal_writer_singleton;
+ struct wal_relay *msg = va_arg(ap, struct wal_relay *);
+ struct vclock *vclock = msg->vclock;
+ wal_relay_cb on_wal_relay = msg->on_wal_relay;
+ void *cb_data = msg->cb_data;
+
+ double last_row_time = ev_monotonic_now(loop());
+
+ struct xrow_buf_cursor cursor;
+ /* Attach to a wal memory. */
+ if (xrow_buf_cursor_create(&writer->xrow_buf, &cursor, vclock) != 0)
+ goto done;
+ while (!fiber_is_cancelled()) {
+ struct xrow_header *row;
+ void *data;
+ size_t size;
+ /* Fetch the next row. */
+ int rc = xrow_buf_cursor_next(&writer->xrow_buf, &cursor,
+ &row, &data, &size);
+ if (rc < 0) {
+ /*
+ * Wal memory buffer was rotated and we are not in
+ * memory.
+ */
+ goto done;
+ }
+ /* Check if the row is already sent. */
+ if (rc == 0 && vclock_get(vclock, row->replica_id) >= row->lsn)
+ continue;
+ ERROR_INJECT(ERRINJ_WAL_MEM_IGNORE, goto done; );
+ if (rc > 0) {
+ /* All wal memory was relayed, wait for new data. */
+ double timeout = replication_timeout;
+ struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
+ ERRINJ_DOUBLE);
+ if (inj != NULL && inj->dparam != 0)
+ timeout = inj->dparam;
+
+ /*
+ * Nothing to send so wait for the next row
+ * and send a hearth beat if timeout exceeded.
+ */
+ fiber_cond_wait_deadline(&writer->xrow_buf_cond,
+ last_row_time + timeout);
+ if (ev_monotonic_now(loop()) - last_row_time >
+ timeout) {
+ /*
+ * There were no new rows so we are going
+ * to send a heartbeat.
+ */
+ struct xrow_header hearth_beat;
+ xrow_encode_timestamp(&hearth_beat, instance_id,
+ ev_now(loop()));
+ row = &hearth_beat;
+ } else
+ continue;
+ }
+ last_row_time = ev_monotonic_now(loop());
+ /* Invoke handler callback. */
+ if (on_wal_relay(row, cb_data) != 0) {
+ diag_move(&fiber()->diag, &msg->diag);
+ goto done;
+ }
+ }
+ static struct cmsg_hop done_route[] = {
+ {wal_relay_done, NULL}
+ };
+done:
+ /* Signal wal relay was finished. */
+ cmsg_init(&msg->base, done_route);
+ cpipe_push(&msg->relay_pipe, &msg->base);
+ msg->fiber = NULL;
+ return 0;
+}
+
+static void
+wal_relay_attach(void *data)
+{
+ struct wal_relay *msg = (struct wal_relay *)data;
+ msg->fiber = fiber_new("wal relay fiber", wal_relay_f);
+ fiber_start(msg->fiber, msg);
+}
+
+static void
+wal_relay_cancel(struct cmsg *base)
+{
+ struct wal_relay *msg = container_of(base, struct wal_relay,
+ cancel_msg);
+ if (msg->fiber != NULL)
+ fiber_cancel(msg->fiber);
+}
+
+int
+wal_relay(struct wal_relay *wal_relay, struct vclock *vclock,
+ wal_relay_cb on_wal_relay, void *cb_data, const char *endpoint_name)
+{
+ wal_relay->vclock = vclock;
+ wal_relay->on_wal_relay = on_wal_relay;
+ wal_relay->cb_data = cb_data;
+ diag_create(&wal_relay->diag);
+ wal_relay->cancel_msg.route = NULL;
+
+ fiber_cond_create(&wal_relay->done_cond);
+ wal_relay->done = false;
+
+ /* Establish a connection with wal thread. */
+ cbus_pair("wal", endpoint_name, &wal_relay->wal_pipe,
+ &wal_relay->relay_pipe,
+ wal_relay_attach, wal_relay, cbus_process);
+
+ while (!wal_relay->done) {
+ if (fiber_is_cancelled() &&
+ wal_relay->cancel_msg.route == NULL) {
+ /* Send a cancel message to a wal relay fiber. */
+ static struct cmsg_hop cancel_route[]= {
+ {wal_relay_cancel, NULL}};
+ cmsg_init(&wal_relay->cancel_msg, cancel_route);
+ cpipe_push(&wal_relay->wal_pipe, &wal_relay->cancel_msg);
+ }
+ fiber_cond_wait(&wal_relay->done_cond);
+ }
+
+ /* Disconnect from wal thread. */
+ cbus_unpair(&wal_relay->wal_pipe, &wal_relay->relay_pipe,
+ NULL, NULL, cbus_process);
+
+ if (!diag_is_empty(&wal_relay->diag)) {
+ diag_move(&wal_relay->diag, &fiber()->diag);
+ return -1;
+ }
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ return -1;
+ }
+ return 0;
+}
diff --git a/src/box/wal.h b/src/box/wal.h
index 6725f26d3..7446fb6da 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -241,6 +241,42 @@ wal_write_vy_log(struct journal_entry *req);
void
wal_rotate_vy_log();
+typedef int (*wal_relay_cb)(struct xrow_header *header, void *data);
+
+/**
+ * Struct wal memory relay structure. */
+struct wal_relay {
+ struct cmsg base;
+ /* Cbus pipe to wal cord. */
+ struct cpipe wal_pipe;
+ /* Cbus pipe from wal cord. */
+ struct cpipe relay_pipe;
+
+ /* Vclock to start relaying. */
+ struct vclock *vclock;
+ /* Callback to call for each relaying row. */
+ wal_relay_cb on_wal_relay;
+ /* Data pointer to pass to callback. */
+ void *cb_data;
+ /* Relaying fiber. */
+ struct fiber *fiber;
+ /* Message to cancel relaying fiber. */
+ struct cmsg cancel_msg;
+ /* Fiber condition to wait until relaying was stopped. */
+ struct fiber_cond done_cond;
+ /* Turns to true when relaying was stopped. */
+ bool done;
+ /* Return code. */
+ int rc;
+ /* Diagnostic area. */
+ struct diag diag;
+};
+
+
+int
+wal_relay(struct wal_relay *wal_relay, struct vclock *vclock,
+ wal_relay_cb on_wal_relay, void *cb_data, const char *endpoint_name);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/lib/core/cbus.c b/src/lib/core/cbus.c
index b3b1280e7..b7e6d769b 100644
--- a/src/lib/core/cbus.c
+++ b/src/lib/core/cbus.c
@@ -284,6 +284,9 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
/* Trigger task processing when the queue becomes non-empty. */
bool output_was_empty;
+ int old_cancel_state;
+ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
+
tt_pthread_mutex_lock(&endpoint->mutex);
output_was_empty = stailq_empty(&endpoint->output);
/** Flush input */
@@ -297,6 +300,7 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
ev_async_send(endpoint->consumer, &endpoint->async);
}
+ pthread_setcancelstate(old_cancel_state, NULL);
}
void
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index e75a620d0..0f919e213 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -134,6 +134,7 @@ struct errinj {
_(ERRINJ_SQL_NAME_NORMALIZATION, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_COIO_SENDFILE_CHUNK, ERRINJ_INT, {.iparam = -1}) \
_(ERRINJ_SWIM_FD_ONLY, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_WAL_MEM_IGNORE, ERRINJ_BOOL, {.bparam = false}) \
ENUM0(errinj_id, ERRINJ_LIST);
extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 73e661a71..bdb8793b2 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -25,6 +25,8 @@ errinj.info()
state: 0
ERRINJ_WAL_WRITE:
state: false
+ ERRINJ_WAL_MEM_IGNORE:
+ state: false
ERRINJ_HTTPC_EXECUTE:
state: false
ERRINJ_VYRUN_DATA_READ:
diff --git a/test/replication/force_recovery.result b/test/replication/force_recovery.result
index f50452858..e48c12657 100644
--- a/test/replication/force_recovery.result
+++ b/test/replication/force_recovery.result
@@ -16,6 +16,10 @@ _ = box.space.test:create_index('primary')
box.schema.user.grant('guest', 'replication')
---
...
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
+---
+- ok
+...
-- Deploy a replica.
test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'")
---
@@ -86,6 +90,10 @@ test_run:cmd("switch default")
box.cfg{force_recovery = false}
---
...
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false)
+---
+- ok
+...
-- Cleanup.
test_run:cmd("stop server test")
---
diff --git a/test/replication/force_recovery.test.lua b/test/replication/force_recovery.test.lua
index 54307814b..c08bb9c02 100644
--- a/test/replication/force_recovery.test.lua
+++ b/test/replication/force_recovery.test.lua
@@ -8,6 +8,7 @@ _ = box.schema.space.create('test')
_ = box.space.test:create_index('primary')
box.schema.user.grant('guest', 'replication')
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
-- Deploy a replica.
test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'")
test_run:cmd("start server test")
@@ -33,6 +34,7 @@ box.space.test:select()
box.info.replication[1].upstream.status == 'stopped' or box.info
test_run:cmd("switch default")
box.cfg{force_recovery = false}
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false)
-- Cleanup.
test_run:cmd("stop server test")
diff --git a/test/replication/replica_rejoin.result b/test/replication/replica_rejoin.result
index f71292da1..187634c62 100644
--- a/test/replication/replica_rejoin.result
+++ b/test/replication/replica_rejoin.result
@@ -184,6 +184,10 @@ test_run:cmd("stop server replica")
- true
...
test_run:cmd("restart server default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
+---
+- ok
+...
checkpoint_count = box.cfg.checkpoint_count
---
...
@@ -368,6 +372,10 @@ test_run:cmd("switch default")
---
- true
...
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false)
+---
+- ok
+...
box.cfg{replication = ''}
---
...
diff --git a/test/replication/replica_rejoin.test.lua b/test/replication/replica_rejoin.test.lua
index 22a91d8d7..3ee98bc85 100644
--- a/test/replication/replica_rejoin.test.lua
+++ b/test/replication/replica_rejoin.test.lua
@@ -70,6 +70,7 @@ box.space.test:replace{1, 2, 3} -- bumps LSN on the replica
test_run:cmd("switch default")
test_run:cmd("stop server replica")
test_run:cmd("restart server default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
checkpoint_count = box.cfg.checkpoint_count
box.cfg{checkpoint_count = 1}
for i = 1, 3 do box.space.test:delete{i * 10} end
@@ -135,6 +136,7 @@ box.space.test:replace{2}
-- Cleanup.
test_run:cmd("switch default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false)
box.cfg{replication = ''}
test_run:cmd("stop server replica")
test_run:cmd("cleanup server replica")
diff --git a/test/replication/show_error_on_disconnect.result b/test/replication/show_error_on_disconnect.result
index 48003db06..e6920c160 100644
--- a/test/replication/show_error_on_disconnect.result
+++ b/test/replication/show_error_on_disconnect.result
@@ -20,6 +20,10 @@ test_run:cmd("switch master_quorum1")
---
- true
...
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
+---
+- ok
+...
repl = box.cfg.replication
---
...
@@ -30,6 +34,10 @@ test_run:cmd("switch master_quorum2")
---
- true
...
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
+---
+- ok
+...
box.space.test:insert{1}
---
- [1]
diff --git a/test/replication/show_error_on_disconnect.test.lua b/test/replication/show_error_on_disconnect.test.lua
index 1b0ea4373..2a944dfc3 100644
--- a/test/replication/show_error_on_disconnect.test.lua
+++ b/test/replication/show_error_on_disconnect.test.lua
@@ -10,9 +10,11 @@ SERVERS = {'master_quorum1', 'master_quorum2'}
test_run:create_cluster(SERVERS)
test_run:wait_fullmesh(SERVERS)
test_run:cmd("switch master_quorum1")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
repl = box.cfg.replication
box.cfg{replication = ""}
test_run:cmd("switch master_quorum2")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
box.space.test:insert{1}
box.snapshot()
box.space.test:insert{2}
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index ac35b94a7..1d6a58259 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
script = master.lua
description = tarantool/box, replication
disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua force_recovery.test.lua show_error_on_disconnect.test.lua replica_rejoin.test.lua
config = suite.cfg
lua_libs = lua/fast_replica.lua lua/rlimit.lua
use_unix_sockets = True
diff --git a/test/xlog/panic_on_wal_error.result b/test/xlog/panic_on_wal_error.result
index 22f14f912..897116b3b 100644
--- a/test/xlog/panic_on_wal_error.result
+++ b/test/xlog/panic_on_wal_error.result
@@ -19,6 +19,10 @@ _ = box.space.test:create_index('pk')
-- reopen xlog
--
test_run:cmd("restart server default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
+---
+- ok
+...
box.space.test ~= nil
---
- true
@@ -68,6 +72,10 @@ test_run:cmd("stop server replica")
- true
...
test_run:cmd("restart server default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
+---
+- ok
+...
box.space.test:auto_increment{'after snapshot'}
---
- [2, 'after snapshot']
@@ -153,6 +161,10 @@ test_run:cmd("switch default")
---
- true
...
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false)
+---
+- ok
+...
test_run:cmd("stop server replica")
---
- true
diff --git a/test/xlog/panic_on_wal_error.test.lua b/test/xlog/panic_on_wal_error.test.lua
index 2e95431c6..d973a00ff 100644
--- a/test/xlog/panic_on_wal_error.test.lua
+++ b/test/xlog/panic_on_wal_error.test.lua
@@ -10,6 +10,7 @@ _ = box.space.test:create_index('pk')
-- reopen xlog
--
test_run:cmd("restart server default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
box.space.test ~= nil
-- insert some stuff
--
@@ -32,6 +33,7 @@ box.space.test:select{}
test_run:cmd("switch default")
test_run:cmd("stop server replica")
test_run:cmd("restart server default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
box.space.test:auto_increment{'after snapshot'}
box.space.test:auto_increment{'after snapshot - one more row'}
--
@@ -67,6 +69,7 @@ box.space.test:select{}
--
--
test_run:cmd("switch default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false)
test_run:cmd("stop server replica")
test_run:cmd("cleanup server replica")
--
diff --git a/test/xlog/suite.ini b/test/xlog/suite.ini
index 689d2b871..c208c73c4 100644
--- a/test/xlog/suite.ini
+++ b/test/xlog/suite.ini
@@ -4,7 +4,7 @@ description = tarantool write ahead log tests
script = xlog.lua
disabled = snap_io_rate.test.lua upgrade.test.lua
valgrind_disabled =
-release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua checkpoint_threshold.test.lua
+release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua checkpoint_threshold.test.lua panic_on_wal_error.test.lua
config = suite.cfg
use_unix_sockets = True
use_unix_sockets_iproto = True
--
2.23.0
More information about the Tarantool-patches
mailing list