[tarantool-patches] [PATCH v3 4/4] replication: use wal memory buffer to fetch rows

Georgy Kirichenko georgy at tarantool.org
Wed Oct 9 19:45:46 MSK 2019


Fetch data from wal in-memory buffer. Wal allows to start a fiber
which creates a xrow buffer cursor with given vclock and then fetches
row from the xrow buffer one by one and calls given callback for each
row. Also the wal relaying fiber send a heartbeat message if all
rows were processed there were no rows written for replication timeout
period.
Relay connects to wal with a replica known vclock and tries to
relay data. In case of outdated vclock (wal could not create a cursor
or fetch new row from the cursor) the relay makes a fallback in
order to read logged data from file and then makes another try
to connect to wal with updated vclock and so waiter.
In file mode a relay already has a data to send to a replica so from
not the relay  has not any duty to send heartbeat messages - it
is done by wal relay fiber while it waits for new transactions
written by wal.

Closes #3794
---
 src/box/relay.cc                              | 180 +++++++++---------
 src/box/wal.c                                 | 158 +++++++++++++++
 src/box/wal.h                                 |  60 ++++++
 src/lib/core/cbus.c                           |   4 +
 src/lib/core/errinj.h                         |   1 +
 test/box/errinj.result                        |   2 +
 test/replication/force_recovery.result        |   8 +
 test/replication/force_recovery.test.lua      |   2 +
 test/replication/replica_rejoin.result        |   8 +
 test/replication/replica_rejoin.test.lua      |   2 +
 .../show_error_on_disconnect.result           |   8 +
 .../show_error_on_disconnect.test.lua         |   2 +
 test/replication/suite.ini                    |   2 +-
 test/xlog/panic_on_wal_error.result           |  12 ++
 test/xlog/panic_on_wal_error.test.lua         |   3 +
 test/xlog/suite.ini                           |   2 +-
 16 files changed, 365 insertions(+), 89 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 21674119d..68e79ad51 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -76,16 +76,16 @@ struct relay {
 	uint64_t sync;
 	/** Recovery instance to read xlog from the disk */
 	struct recovery *r;
+	/** Wal directory. */
+	char *wal_dir;
 	/** Xstream argument to recovery */
 	struct xstream stream;
 	/** Vclock to stop playing xlogs */
 	struct vclock stop_vclock;
 	/** Remote replica */
 	struct replica *replica;
-	/** WAL event watcher. */
-	struct wal_watcher wal_watcher;
-	/** Relay reader cond. */
-	struct fiber_cond reader_cond;
+	/** WAL memory relay. */
+	struct wal_relay wal_relay;
 	/** Relay diagnostics. */
 	struct diag diag;
 	/** Vclock recieved from replica. */
@@ -117,6 +117,8 @@ struct relay {
 		/** Known relay vclock. */
 		struct vclock vclock;
 	} tx;
+	/** vclock sent by relay. */
+	struct vclock relay_vclock;
 };
 
 struct diag*
@@ -161,9 +163,9 @@ relay_new(struct replica *replica)
 	}
 	relay->replica = replica;
 	relay->last_row_time = ev_monotonic_now(loop());
-	fiber_cond_create(&relay->reader_cond);
 	diag_create(&relay->diag);
 	relay->state = RELAY_OFF;
+	relay->r = NULL;
 	return relay;
 }
 
@@ -211,7 +213,8 @@ relay_exit(struct relay *relay)
 	 * cursor, which must be closed in the same thread
 	 * that opened it (it uses cord's slab allocator).
 	 */
-	recovery_delete(relay->r);
+	if (relay->r != NULL)
+		recovery_delete(relay->r);
 	relay->r = NULL;
 }
 
@@ -235,8 +238,8 @@ relay_delete(struct relay *relay)
 {
 	if (relay->state == RELAY_FOLLOW)
 		relay_stop(relay);
-	fiber_cond_destroy(&relay->reader_cond);
 	diag_destroy(&relay->diag);
+	free(relay->wal_dir);
 	TRASH(relay);
 	free(relay);
 }
@@ -357,31 +360,6 @@ relay_set_error(struct relay *relay, struct error *e)
 		diag_add_error(&relay->diag, e);
 }
 
-static void
-relay_process_wal_event(struct wal_watcher *watcher, unsigned events)
-{
-	struct relay *relay = container_of(watcher, struct relay, wal_watcher);
-	if (fiber_is_cancelled()) {
-		/*
-		 * The relay is exiting. Rescanning the WAL at this
-		 * point would be pointless and even dangerous,
-		 * because the relay could have written a packet
-		 * fragment to the socket before being cancelled
-		 * so that writing another row to the socket would
-		 * lead to corrupted replication stream and, as
-		 * a result, permanent replication breakdown.
-		 */
-		return;
-	}
-	try {
-		recover_remaining_wals(relay->r, &relay->stream, NULL,
-				       (events & WAL_EVENT_ROTATE) != 0);
-	} catch (Exception *e) {
-		relay_set_error(relay, e);
-		fiber_cancel(fiber());
-	}
-}
-
 /*
  * Relay reader fiber function.
  * Read xrow encoded vclocks sent by the replica.
@@ -404,7 +382,24 @@ relay_reader_f(va_list ap)
 			/* vclock is followed while decoding, zeroing it. */
 			vclock_create(&relay->recv_vclock);
 			xrow_decode_vclock_xc(&xrow, &relay->recv_vclock);
-			fiber_cond_signal(&relay->reader_cond);
+			if (relay->status_msg.msg.route != NULL)
+				continue;
+			struct vclock *send_vclock;
+			if (relay->version_id < version_id(1, 7, 4))
+				send_vclock = &relay->r->vclock;
+			else
+				send_vclock = &relay->recv_vclock;
+			if (vclock_sum(&relay->status_msg.vclock) ==
+			    vclock_sum(send_vclock))
+				continue;
+			static const struct cmsg_hop route[] = {
+				{tx_status_update, NULL}
+			};
+			cmsg_init(&relay->status_msg.msg, route);
+			vclock_copy(&relay->status_msg.vclock,
+				    send_vclock);
+			relay->status_msg.relay = relay;
+			cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
 		}
 	} catch (Exception *e) {
 		relay_set_error(relay, e);
@@ -430,6 +425,27 @@ relay_send_heartbeat(struct relay *relay)
 	}
 }
 
+static int
+relay_send_cb(struct xrow_header *row, void *data)
+{
+	try {
+		struct relay *relay = (struct relay *)data;
+		relay_send_row(&relay->stream, row);
+	} catch (Exception *e) {
+		return -1;
+	}
+	return 0;
+}
+
+static void
+relay_endpoint_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
+{
+	(void) loop;
+	(void) events;
+	struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data;
+	cbus_process(endpoint);
+}
+
 /**
  * A libev callback invoked when a relay client socket is ready
  * for read. This currently only happens when the client closes
@@ -439,21 +455,16 @@ static int
 relay_subscribe_f(va_list ap)
 {
 	struct relay *relay = va_arg(ap, struct relay *);
-	struct recovery *r = relay->r;
 
 	coio_enable();
 	relay_set_cord_name(relay->io.fd);
 
 	/* Create cpipe to tx for propagating vclock. */
 	cbus_endpoint_create(&relay->endpoint, tt_sprintf("relay_%p", relay),
-			     fiber_schedule_cb, fiber());
+			     relay_endpoint_cb, &relay->endpoint);
 	cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
 		  &relay->relay_pipe, NULL, NULL, cbus_process);
 
-	/* Setup WAL watcher for sending new rows to the replica. */
-	wal_set_watcher(&relay->wal_watcher, relay->endpoint.name,
-			relay_process_wal_event, cbus_process);
-
 	/* Start fiber for receiving replica acks. */
 	char name[FIBER_NAME_MAX];
 	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
@@ -469,50 +480,28 @@ relay_subscribe_f(va_list ap)
 	 */
 	relay_send_heartbeat(relay);
 
-	/*
-	 * Run the event loop until the connection is broken
-	 * or an error occurs.
-	 */
 	while (!fiber_is_cancelled()) {
-		double timeout = replication_timeout;
-		struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
-					    ERRINJ_DOUBLE);
-		if (inj != NULL && inj->dparam != 0)
-			timeout = inj->dparam;
-
-		fiber_cond_wait_deadline(&relay->reader_cond,
-					 relay->last_row_time + timeout);
-
-		/*
-		 * The fiber can be woken by IO cancel, by a timeout of
-		 * status messaging or by an acknowledge to status message.
-		 * Handle cbus messages first.
-		 */
-		cbus_process(&relay->endpoint);
-		/* Check for a heartbeat timeout. */
-		if (ev_monotonic_now(loop()) - relay->last_row_time > timeout)
-			relay_send_heartbeat(relay);
-		/*
-		 * Check that the vclock has been updated and the previous
-		 * status message is delivered
-		 */
-		if (relay->status_msg.msg.route != NULL)
-			continue;
-		struct vclock *send_vclock;
-		if (relay->version_id < version_id(1, 7, 4))
-			send_vclock = &r->vclock;
-		else
-			send_vclock = &relay->recv_vclock;
-		if (vclock_sum(&relay->status_msg.vclock) ==
-		    vclock_sum(send_vclock))
-			continue;
-		static const struct cmsg_hop route[] = {
-			{tx_status_update, NULL}
+		/* Try to relay direct from wal memory buffer. */
+		if (wal_relay(&relay->wal_relay, &relay->relay_vclock,
+			      relay_send_cb, relay,
+			      tt_sprintf("relay_%p", relay)) != 0) {
+			relay_set_error(relay, diag_last_error(&fiber()->diag));
+			break;
 		};
-		cmsg_init(&relay->status_msg.msg, route);
-		vclock_copy(&relay->status_msg.vclock, send_vclock);
-		relay->status_msg.relay = relay;
-		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
+		/* Recover xlogs from files. */
+		try {
+			relay->r = recovery_new(relay->wal_dir, false,
+					        &relay->relay_vclock);
+			auto relay_guard = make_scoped_guard([&] {
+				recovery_delete(relay->r);
+				relay->r = NULL;
+			});
+			recover_remaining_wals(relay->r, &relay->stream,
+					       NULL, true);
+		} catch (Exception *e) {
+			relay_set_error(relay, e);
+			break;
+		}
 	}
 
 	/*
@@ -524,9 +513,6 @@ relay_subscribe_f(va_list ap)
 	diag_log();
 	say_crit("exiting the relay loop");
 
-	/* Clear WAL watcher. */
-	wal_clear_watcher(&relay->wal_watcher, cbus_process);
-
 	/* Join ack reader fiber. */
 	fiber_cancel(reader);
 	fiber_join(reader);
@@ -553,6 +539,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	 * unless it has already been registered by initial
 	 * join.
 	 */
+	vclock_copy(&relay->relay_vclock, replica_clock);
 	if (replica->gc == NULL) {
 		replica->gc = gc_consumer_register(replica_clock, "replica %s",
 						   tt_uuid_str(&replica->uuid));
@@ -567,8 +554,13 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	});
 
 	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
-	relay->r = recovery_new(cfg_gets("wal_dir"), false,
-			        replica_clock);
+	relay->r = NULL;
+	relay->wal_dir = strdup(cfg_gets("wal_dir"));
+	if (relay->wal_dir == NULL) {
+		diag_set(OutOfMemory, strlen(cfg_gets("wal_dir")),
+			 "runtime", "wal_dir");
+		diag_raise();
+	}
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
 
@@ -612,7 +604,21 @@ static void
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
-	assert(iproto_type_is_dml(packet->type));
+	if (packet->type != IPROTO_OK) {
+		assert(iproto_type_is_dml(packet->type));
+		/*
+		 * Because of asynchronous replication both master
+		 * and replica may have different transaction
+		 * order in their logs. As we start relaying
+		 * transactions from the first unknow one there
+		 * could be some other already known by replica
+		 * and there is no point to send them.
+		 */
+		if (vclock_get(&relay->relay_vclock, packet->replica_id) >=
+		    packet->lsn)
+			return;
+		vclock_follow_xrow(&relay->relay_vclock, packet);
+	}
 	/*
 	 * Transform replica local requests to IPROTO_NOP so as to
 	 * promote vclock on the replica without actually modifying
diff --git a/src/box/wal.c b/src/box/wal.c
index 63e0c728d..adbe5eea1 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -165,6 +165,8 @@ struct wal_writer
 	 * without xlog files access.
 	 */
 	struct xrow_buf xrow_buf;
+	/* xrow buffer condition signaled when buffer write was done. */
+	struct fiber_cond xrow_buf_cond;
 };
 
 struct wal_msg {
@@ -1075,6 +1077,7 @@ wal_write_to_disk(struct cmsg *msg)
 	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
 	vclock_merge(&writer->vclock, &vclock_diff);
+	fiber_cond_broadcast(&writer->xrow_buf_cond);
 
 	/*
 	 * Notify TX if the checkpoint threshold has been exceeded.
@@ -1140,6 +1143,7 @@ wal_writer_f(va_list ap)
 	(void) ap;
 	struct wal_writer *writer = &wal_writer_singleton;
 	xrow_buf_create(&writer->xrow_buf);
+	fiber_cond_create(&writer->xrow_buf_cond);
 
 	/** Initialize eio in this thread */
 	coio_enable();
@@ -1460,3 +1464,157 @@ wal_atfork()
 	if (xlog_is_open(&vy_log_writer.xlog))
 		xlog_atfork(&vy_log_writer.xlog);
 }
+
+/* Wake relay when wal_relay finished. */
+static void
+wal_relay_done(struct cmsg *base)
+{
+	struct wal_relay *msg =
+		container_of(base, struct wal_relay, base);
+	msg->done = true;
+	fiber_cond_signal(&msg->done_cond);
+}
+
+/* Wal relay fiber function. */
+static int
+wal_relay_f(va_list ap)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+	struct wal_relay *msg = va_arg(ap, struct wal_relay *);
+	struct vclock *vclock = msg->vclock;
+	wal_relay_cb on_wal_relay = msg->on_wal_relay;
+	void *cb_data = msg->cb_data;
+
+	double last_row_time = ev_monotonic_now(loop());
+
+
+	struct xrow_buf_cursor cursor;
+	if (xrow_buf_cursor_create(&writer->xrow_buf, &cursor, vclock) != 0)
+		goto done;
+	/* Cursor was created and then we can process rows one by one. */
+	while (!fiber_is_cancelled()) {
+		struct xrow_header *row;
+		void *data;
+		size_t size;
+		int rc = xrow_buf_cursor_next(&writer->xrow_buf, &cursor,
+					     &row, &data, &size);
+		if (rc < 0) {
+			/*
+			 * Wal memory buffer was rotated and we are not in
+			 * memory.
+			 */
+			goto done;
+		}
+		if (rc > 0) {
+			/*
+			 * There are no more rows in a buffer. Wait
+			 * until wal wrote new ones or timeout was
+			 * exceeded and send a heartbeat message.
+			 */
+			double timeout = replication_timeout;
+			struct errinj *inj = errinj(ERRINJ_RELAY_REPORT_INTERVAL,
+						    ERRINJ_DOUBLE);
+			if (inj != NULL && inj->dparam != 0)
+				timeout = inj->dparam;
+
+			fiber_cond_wait_deadline(&writer->xrow_buf_cond,
+						 last_row_time + timeout);
+			if (ev_monotonic_now(loop()) - last_row_time >
+			    timeout) {
+				/* Timeout was exceeded - send a heartbeat. */
+				struct xrow_header hearth_beat;
+				xrow_encode_timestamp(&hearth_beat, instance_id,
+						      ev_now(loop()));
+				last_row_time = ev_monotonic_now(loop());
+				if (on_wal_relay(&hearth_beat, cb_data) != 0) {
+					diag_move(&fiber()->diag, &msg->diag);
+					goto done;
+				}
+			}
+			continue;
+		}
+		ERROR_INJECT(ERRINJ_WAL_MEM_IGNORE, goto done; );
+		last_row_time = ev_monotonic_now(loop());
+		if (on_wal_relay(row, cb_data) != 0) {
+			diag_move(&fiber()->diag, &msg->diag);
+			goto done;
+		}
+	}
+	static struct cmsg_hop done_route[] = {
+		{wal_relay_done, NULL}
+	};
+done:
+	cmsg_init(&msg->base, done_route);
+	cpipe_push(&msg->relay_pipe, &msg->base);
+	msg->fiber = NULL;
+	return 0;
+}
+
+static void
+wal_relay_attach(void *data)
+{
+	struct wal_relay *msg = (struct wal_relay *)data;
+	msg->fiber = fiber_new("wal relay fiber", wal_relay_f);
+	fiber_start(msg->fiber, msg);
+}
+
+static void
+wal_relay_cancel(struct cmsg *base)
+{
+	struct wal_relay *msg = container_of(base, struct wal_relay,
+						 cancel_msg);
+	/*
+	 * A relay was cancelled so cancel corresponding
+	 * fiber in the wal thread if it still alive.
+	 */
+	if (msg->fiber != NULL)
+		fiber_cancel(msg->fiber);
+}
+
+int
+wal_relay(struct wal_relay *wal_relay, struct vclock *vclock,
+	  wal_relay_cb on_wal_relay, void *cb_data, const char *endpoint_name)
+{
+	wal_relay->vclock = vclock;
+	wal_relay->on_wal_relay = on_wal_relay;
+	wal_relay->cb_data = cb_data;
+	diag_create(&wal_relay->diag);
+	wal_relay->cancel_msg.route = NULL;
+
+	fiber_cond_create(&wal_relay->done_cond);
+	wal_relay->done = false;
+
+	cbus_pair("wal", endpoint_name, &wal_relay->wal_pipe,
+		  &wal_relay->relay_pipe,
+		  wal_relay_attach, wal_relay, cbus_process);
+
+	/*
+	 * We do not use cbus_call because we should be able to
+	 * process this fiber cancelation and send a cancel request
+	 * to the wal cord to force wal datach.
+	 */
+	while (!wal_relay->done) {
+		if (fiber_is_cancelled() &&
+		    wal_relay->cancel_msg.route == NULL) {
+			/* Send a cancel message to a wal relay fiber. */
+			static struct cmsg_hop cancel_route[]= {
+				{wal_relay_cancel, NULL}};
+			cmsg_init(&wal_relay->cancel_msg, cancel_route);
+			cpipe_push(&wal_relay->wal_pipe, &wal_relay->cancel_msg);
+		}
+		fiber_cond_wait(&wal_relay->done_cond);
+	}
+
+	cbus_unpair(&wal_relay->wal_pipe, &wal_relay->relay_pipe,
+		    NULL, NULL, cbus_process);
+
+	if (!diag_is_empty(&wal_relay->diag)) {
+		diag_move(&wal_relay->diag, &fiber()->diag);
+		return -1;
+	}
+	if (fiber_is_cancelled()) {
+		diag_set(FiberIsCancelled);
+		return -1;
+	}
+	return 0;
+}
diff --git a/src/box/wal.h b/src/box/wal.h
index 6725f26d3..b5f7bf712 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -241,6 +241,66 @@ wal_write_vy_log(struct journal_entry *req);
 void
 wal_rotate_vy_log();
 
+typedef int (*wal_relay_cb)(struct xrow_header *header, void *data);
+
+/**
+ * Wal relay maintains wal memory tracking and allows
+ * to retrieve logged xrows direct from the wal memory.
+ */
+struct wal_relay {
+	struct cmsg base;
+	/** Cbus pipe to wal cord. */
+	struct cpipe wal_pipe;
+	/** Cbus pipe from wal cord. */
+	struct cpipe relay_pipe;
+
+	/** Vclock to start relaying. */
+	struct vclock *vclock;
+	/** Callback to call for each row. */
+	wal_relay_cb on_wal_relay;
+	/** Pointer param to use with the callback. */
+	void *cb_data;
+	/**
+	 * A fiber created in a wal tread. This fiber fetches
+	 * rows one by one from the wal memory and/or watches
+	 * for new logged data.
+	 */
+	struct fiber *fiber;
+	/* Message to cancel relaying fiber. */
+	struct cmsg cancel_msg;
+	/* Fiber condition to wait until relaying was stopped. */
+	struct fiber_cond done_cond;
+	/* Turns to true when relaying was stopped. */
+	bool done;
+	/* Return code. */
+	int rc;
+	/* Diagnostic area. */
+	struct diag diag;
+};
+
+/**
+ * A function to start fetching rows direct from wal memory buffer.
+ * This function initiates connection with a wal and starts
+ * a fiber which handles wal memory cursor and yields until
+ * the fiber exited because of the cursor was outdated or a
+ * row sending error. When a fiber called this function was
+ * cancelled then special cancel message will be send in order
+ * to stop relaying fiber.
+ *
+ * @param wal_relay a wal relay structure to put all temporay
+ * values in
+ * @param vclock a vclock to start relaying from
+ * @param on_wal_relay a callback to call for every fetched row
+ * @param cb_data a pointer to pass into the callback
+ * @endpoint_name a relay endpoint name to estables cbus connection
+ *
+ * @retval 0 relaying was finished because of cursor is our of date
+ * @retval -1 relaying was finished because of an error.
+ */
+int
+wal_relay(struct wal_relay *wal_relay, struct vclock *vclock,
+	  wal_relay_cb on_wal_relay, void *cb_data, const char *endpoint_name);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/lib/core/cbus.c b/src/lib/core/cbus.c
index b3b1280e7..b7e6d769b 100644
--- a/src/lib/core/cbus.c
+++ b/src/lib/core/cbus.c
@@ -284,6 +284,9 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
 	/* Trigger task processing when the queue becomes non-empty. */
 	bool output_was_empty;
 
+	int old_cancel_state;
+	pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
+
 	tt_pthread_mutex_lock(&endpoint->mutex);
 	output_was_empty = stailq_empty(&endpoint->output);
 	/** Flush input */
@@ -297,6 +300,7 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
 
 		ev_async_send(endpoint->consumer, &endpoint->async);
 	}
+	pthread_setcancelstate(old_cancel_state, NULL);
 }
 
 void
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index e75a620d0..0f919e213 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -134,6 +134,7 @@ struct errinj {
 	_(ERRINJ_SQL_NAME_NORMALIZATION, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_COIO_SENDFILE_CHUNK, ERRINJ_INT, {.iparam = -1}) \
 	_(ERRINJ_SWIM_FD_ONLY, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_WAL_MEM_IGNORE, ERRINJ_BOOL, {.bparam = false}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 73e661a71..bdb8793b2 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -25,6 +25,8 @@ errinj.info()
     state: 0
   ERRINJ_WAL_WRITE:
     state: false
+  ERRINJ_WAL_MEM_IGNORE:
+    state: false
   ERRINJ_HTTPC_EXECUTE:
     state: false
   ERRINJ_VYRUN_DATA_READ:
diff --git a/test/replication/force_recovery.result b/test/replication/force_recovery.result
index f50452858..e48c12657 100644
--- a/test/replication/force_recovery.result
+++ b/test/replication/force_recovery.result
@@ -16,6 +16,10 @@ _ = box.space.test:create_index('primary')
 box.schema.user.grant('guest', 'replication')
 ---
 ...
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
+---
+- ok
+...
 -- Deploy a replica.
 test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'")
 ---
@@ -86,6 +90,10 @@ test_run:cmd("switch default")
 box.cfg{force_recovery = false}
 ---
 ...
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false)
+---
+- ok
+...
 -- Cleanup.
 test_run:cmd("stop server test")
 ---
diff --git a/test/replication/force_recovery.test.lua b/test/replication/force_recovery.test.lua
index 54307814b..c08bb9c02 100644
--- a/test/replication/force_recovery.test.lua
+++ b/test/replication/force_recovery.test.lua
@@ -8,6 +8,7 @@ _ = box.schema.space.create('test')
 _ = box.space.test:create_index('primary')
 box.schema.user.grant('guest', 'replication')
 
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
 -- Deploy a replica.
 test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'")
 test_run:cmd("start server test")
@@ -33,6 +34,7 @@ box.space.test:select()
 box.info.replication[1].upstream.status == 'stopped' or box.info
 test_run:cmd("switch default")
 box.cfg{force_recovery = false}
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false)
 
 -- Cleanup.
 test_run:cmd("stop server test")
diff --git a/test/replication/replica_rejoin.result b/test/replication/replica_rejoin.result
index f71292da1..187634c62 100644
--- a/test/replication/replica_rejoin.result
+++ b/test/replication/replica_rejoin.result
@@ -184,6 +184,10 @@ test_run:cmd("stop server replica")
 - true
 ...
 test_run:cmd("restart server default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
+---
+- ok
+...
 checkpoint_count = box.cfg.checkpoint_count
 ---
 ...
@@ -368,6 +372,10 @@ test_run:cmd("switch default")
 ---
 - true
 ...
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false)
+---
+- ok
+...
 box.cfg{replication = ''}
 ---
 ...
diff --git a/test/replication/replica_rejoin.test.lua b/test/replication/replica_rejoin.test.lua
index 22a91d8d7..3ee98bc85 100644
--- a/test/replication/replica_rejoin.test.lua
+++ b/test/replication/replica_rejoin.test.lua
@@ -70,6 +70,7 @@ box.space.test:replace{1, 2, 3} -- bumps LSN on the replica
 test_run:cmd("switch default")
 test_run:cmd("stop server replica")
 test_run:cmd("restart server default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
 checkpoint_count = box.cfg.checkpoint_count
 box.cfg{checkpoint_count = 1}
 for i = 1, 3 do box.space.test:delete{i * 10} end
@@ -135,6 +136,7 @@ box.space.test:replace{2}
 
 -- Cleanup.
 test_run:cmd("switch default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false)
 box.cfg{replication = ''}
 test_run:cmd("stop server replica")
 test_run:cmd("cleanup server replica")
diff --git a/test/replication/show_error_on_disconnect.result b/test/replication/show_error_on_disconnect.result
index 48003db06..e6920c160 100644
--- a/test/replication/show_error_on_disconnect.result
+++ b/test/replication/show_error_on_disconnect.result
@@ -20,6 +20,10 @@ test_run:cmd("switch master_quorum1")
 ---
 - true
 ...
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
+---
+- ok
+...
 repl = box.cfg.replication
 ---
 ...
@@ -30,6 +34,10 @@ test_run:cmd("switch master_quorum2")
 ---
 - true
 ...
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
+---
+- ok
+...
 box.space.test:insert{1}
 ---
 - [1]
diff --git a/test/replication/show_error_on_disconnect.test.lua b/test/replication/show_error_on_disconnect.test.lua
index 1b0ea4373..2a944dfc3 100644
--- a/test/replication/show_error_on_disconnect.test.lua
+++ b/test/replication/show_error_on_disconnect.test.lua
@@ -10,9 +10,11 @@ SERVERS = {'master_quorum1', 'master_quorum2'}
 test_run:create_cluster(SERVERS)
 test_run:wait_fullmesh(SERVERS)
 test_run:cmd("switch master_quorum1")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
 repl = box.cfg.replication
 box.cfg{replication = ""}
 test_run:cmd("switch master_quorum2")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
 box.space.test:insert{1}
 box.snapshot()
 box.space.test:insert{2}
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index ac35b94a7..1d6a58259 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
 script =  master.lua
 description = tarantool/box, replication
 disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua force_recovery.test.lua show_error_on_disconnect.test.lua replica_rejoin.test.lua
 config = suite.cfg
 lua_libs = lua/fast_replica.lua lua/rlimit.lua
 use_unix_sockets = True
diff --git a/test/xlog/panic_on_wal_error.result b/test/xlog/panic_on_wal_error.result
index 22f14f912..897116b3b 100644
--- a/test/xlog/panic_on_wal_error.result
+++ b/test/xlog/panic_on_wal_error.result
@@ -19,6 +19,10 @@ _ = box.space.test:create_index('pk')
 -- reopen xlog
 --
 test_run:cmd("restart server default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
+---
+- ok
+...
 box.space.test ~= nil
 ---
 - true
@@ -68,6 +72,10 @@ test_run:cmd("stop server replica")
 - true
 ...
 test_run:cmd("restart server default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
+---
+- ok
+...
 box.space.test:auto_increment{'after snapshot'}
 ---
 - [2, 'after snapshot']
@@ -153,6 +161,10 @@ test_run:cmd("switch default")
 ---
 - true
 ...
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false)
+---
+- ok
+...
 test_run:cmd("stop server replica")
 ---
 - true
diff --git a/test/xlog/panic_on_wal_error.test.lua b/test/xlog/panic_on_wal_error.test.lua
index 2e95431c6..d973a00ff 100644
--- a/test/xlog/panic_on_wal_error.test.lua
+++ b/test/xlog/panic_on_wal_error.test.lua
@@ -10,6 +10,7 @@ _ = box.space.test:create_index('pk')
 -- reopen xlog
 --
 test_run:cmd("restart server default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
 box.space.test ~= nil
 -- insert some stuff
 -- 
@@ -32,6 +33,7 @@ box.space.test:select{}
 test_run:cmd("switch default")
 test_run:cmd("stop server replica")
 test_run:cmd("restart server default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)
 box.space.test:auto_increment{'after snapshot'}
 box.space.test:auto_increment{'after snapshot - one more row'}
 --
@@ -67,6 +69,7 @@ box.space.test:select{}
 --
 --
 test_run:cmd("switch default")
+box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", false)
 test_run:cmd("stop server replica")
 test_run:cmd("cleanup server replica")
 --
diff --git a/test/xlog/suite.ini b/test/xlog/suite.ini
index 689d2b871..c208c73c4 100644
--- a/test/xlog/suite.ini
+++ b/test/xlog/suite.ini
@@ -4,7 +4,7 @@ description = tarantool write ahead log tests
 script = xlog.lua
 disabled = snap_io_rate.test.lua upgrade.test.lua
 valgrind_disabled =
-release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua checkpoint_threshold.test.lua
+release_disabled = errinj.test.lua panic_on_lsn_gap.test.lua panic_on_broken_lsn.test.lua checkpoint_threshold.test.lua panic_on_wal_error.test.lua
 config = suite.cfg
 use_unix_sockets = True
 use_unix_sockets_iproto = True
-- 
2.23.0





More information about the Tarantool-patches mailing list