[PATCH v2] box: enable WAL before making initial checkpoint

Vladimir Davydov vdavydov.dev at gmail.com
Mon Feb 11 15:34:26 MSK 2019


While a replica is bootstrapped from a remote master, vinyl engine
may need to perform compaction, which means that it may write to
the _vinyl_deferred_delete system space. Compaction proceeds fully
asynchronously, i.e. a write may occur after the join stage is
complete, but before the WAL is initialized, in which case the new
replica will crash. To make sure a race like that won't happen, let's
setup WAL before making the initial checkpoint. The WAL writer is now
initialized right before starting the WAL thread and so we don't need
to split WAL struct into the thread and the writer anymore.

Closes #3968
---
https://github.com/tarantool/tarantool/issues/3968
https://github.com/tarantool/tarantool/tree/dv/gh-3968-vy-fix-replica-join-crash

Changes in v2:
 - Instead of setting up a fake delay journal, let's enable WAL
   before making initial checkpoint.

v1: https://www.freelists.org/post/tarantool-patches/PATCH-box-delay-all-writes-before-wal-writer-is-initialized

 src/box/box.cc                    |  68 ++++++++++--------
 src/box/wal.c                     | 145 ++++++++++++++++++--------------------
 src/box/wal.h                     |  18 +++--
 test/vinyl/replica_quota.result   |  34 +++++++--
 test/vinyl/replica_quota.test.lua |  25 +++++--
 5 files changed, 167 insertions(+), 123 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index e12a1cba..fde3ecba 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1691,7 +1691,7 @@ box_free(void)
 		sequence_free();
 		gc_free();
 		engine_shutdown();
-		wal_thread_stop();
+		wal_free();
 	}
 }
 
@@ -1761,12 +1761,13 @@ bootstrap_master(const struct tt_uuid *replicaset_uuid)
 	/* Set UUID of a new replica set */
 	box_set_replicaset_uuid(replicaset_uuid);
 
+	/* Enable WAL subsystem. */
+	if (wal_enable() != 0)
+		diag_raise();
+
 	/* Make the initial checkpoint */
-	if (engine_begin_checkpoint() ||
-	    engine_commit_checkpoint(&replicaset.vclock))
+	if (gc_checkpoint() != 0)
 		panic("failed to create a checkpoint");
-
-	gc_add_checkpoint(&replicaset.vclock);
 }
 
 /**
@@ -1813,9 +1814,6 @@ bootstrap_from_master(struct replica *master)
 
 	applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
 
-	/* Clear the pointer to journal before it goes out of scope */
-	journal_set(NULL);
-
 	/* Finalize the new replica */
 	engine_end_recovery_xc();
 
@@ -1823,12 +1821,22 @@ bootstrap_from_master(struct replica *master)
 	applier_resume_to_state(applier, APPLIER_READY, TIMEOUT_INFINITY);
 	assert(applier->state == APPLIER_READY);
 
+	/*
+	 * An engine may write to WAL on its own during the join
+	 * stage (e.g. Vinyl's deferred DELETEs). That's OK - those
+	 * records will pass through the recovery journal and wind
+	 * up in the initial checkpoint. However, we must enable
+	 * the WAL right before starting checkpointing so that
+	 * records written during and after the initial checkpoint
+	 * go to the real WAL and can be recovered after restart.
+	 * This also clears the recovery journal created on stack.
+	 */
+	if (wal_enable() != 0)
+		diag_raise();
+
 	/* Make the initial checkpoint */
-	if (engine_begin_checkpoint() ||
-	    engine_commit_checkpoint(&replicaset.vclock))
+	if (gc_checkpoint() != 0)
 		panic("failed to create a checkpoint");
-
-	gc_add_checkpoint(&replicaset.vclock);
 }
 
 /**
@@ -2000,6 +2008,16 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		box_sync_replication(false);
 	}
 	recovery_finalize(recovery);
+
+	/*
+	 * We must enable WAL before finalizing engine recovery,
+	 * because an engine may start writing to WAL right after
+	 * this point (e.g. deferred DELETE statements in Vinyl).
+	 * This also clears the recovery journal created on stack.
+	 */
+	if (wal_enable() != 0)
+		diag_raise();
+
 	engine_end_recovery_xc();
 
 	/* Check replica set UUID. */
@@ -2009,9 +2027,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
 			  tt_uuid_str(replicaset_uuid),
 			  tt_uuid_str(&REPLICASET_UUID));
 	}
-
-	/* Clear the pointer to journal before it goes out of scope */
-	journal_set(NULL);
 }
 
 static void
@@ -2083,7 +2098,15 @@ box_cfg_xc(void)
 	port_init();
 	iproto_init();
 	sql_init();
-	wal_thread_start();
+
+	int64_t wal_max_rows = box_check_wal_max_rows(cfg_geti64("rows_per_wal"));
+	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
+	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
+	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
+		     wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
+		     on_wal_checkpoint_threshold) != 0) {
+		diag_raise();
+	}
 
 	title("loading");
 
@@ -2128,8 +2151,6 @@ box_cfg_xc(void)
 		/* Bootstrap a new master */
 		bootstrap(&instance_uuid, &replicaset_uuid,
 			  &is_bootstrap_leader);
-		checkpoint = gc_last_checkpoint();
-		assert(checkpoint != NULL);
 	}
 	fiber_gc();
 
@@ -2143,17 +2164,6 @@ box_cfg_xc(void)
 		}
 	}
 
-	/* Start WAL writer */
-	int64_t wal_max_rows = box_check_wal_max_rows(cfg_geti64("rows_per_wal"));
-	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
-	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
-	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
-		     wal_max_size, &INSTANCE_UUID, &replicaset.vclock,
-		     &checkpoint->vclock, on_wal_garbage_collection,
-		     on_wal_checkpoint_threshold) != 0) {
-		diag_raise();
-	}
-
 	rmean_cleanup(rmean_box);
 
 	/* Follow replica */
diff --git a/src/box/wal.c b/src/box/wal.c
index cdcaabc0..0b49548c 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -66,19 +66,6 @@ wal_write(struct journal *, struct journal_entry *);
 static int64_t
 wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
 
-/* WAL thread. */
-struct wal_thread {
-	/** 'wal' thread doing the writes. */
-	struct cord cord;
-	/** A pipe from 'tx' thread to 'wal' */
-	struct cpipe wal_pipe;
-	/**
-	 * Return pipe from 'wal' to tx'. This is a
-	 * priority pipe and DOES NOT support yield.
-	 */
-	struct cpipe tx_prio_pipe;
-};
-
 /*
  * WAL writer - maintain a Write Ahead Log for every change
  * in the data state.
@@ -100,6 +87,8 @@ struct wal_writer
 	 * the wal-tx bus and are rolled back "on arrival".
 	 */
 	struct stailq rollback;
+	/** A pipe from 'tx' thread to 'wal' */
+	struct cpipe wal_pipe;
 	/* ----------------- wal ------------------- */
 	/** A setting from instance configuration - rows_per_wal */
 	int64_t wal_max_rows;
@@ -109,6 +98,13 @@ struct wal_writer
 	enum wal_mode wal_mode;
 	/** wal_dir, from the configuration file. */
 	struct xdir wal_dir;
+	/** 'wal' thread doing the writes. */
+	struct cord cord;
+	/**
+	 * Return pipe from 'wal' to tx'. This is a
+	 * priority pipe and DOES NOT support yield.
+	 */
+	struct cpipe tx_prio_pipe;
 	/**
 	 * The vector clock of the WAL writer. It's a bit behind
 	 * the vector clock of the transaction thread, since it
@@ -184,7 +180,6 @@ struct vy_log_writer {
 };
 
 static struct vy_log_writer vy_log_writer;
-static struct wal_thread wal_thread;
 static struct wal_writer wal_writer_singleton;
 
 enum wal_mode
@@ -200,7 +195,7 @@ static void
 tx_schedule_commit(struct cmsg *msg);
 
 static struct cmsg_hop wal_request_route[] = {
-	{wal_write_to_disk, &wal_thread.tx_prio_pipe},
+	{wal_write_to_disk, &wal_writer_singleton.tx_prio_pipe},
 	{tx_schedule_commit, NULL},
 };
 
@@ -349,8 +344,6 @@ static void
 wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 		  const char *wal_dirname, int64_t wal_max_rows,
 		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-		  const struct vclock *vclock,
-		  const struct vclock *checkpoint_vclock,
 		  wal_on_garbage_collection_f on_garbage_collection,
 		  wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
@@ -372,8 +365,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	writer->checkpoint_threshold = INT64_MAX;
 	writer->checkpoint_triggered = false;
 
-	vclock_copy(&writer->vclock, vclock);
-	vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
+	vclock_create(&writer->vclock);
+	vclock_create(&writer->checkpoint_vclock);
 	rlist_create(&writer->watchers);
 
 	writer->on_garbage_collection = on_garbage_collection;
@@ -387,21 +380,9 @@ wal_writer_destroy(struct wal_writer *writer)
 	xdir_destroy(&writer->wal_dir);
 }
 
-/** WAL thread routine. */
+/** WAL writer thread routine. */
 static int
-wal_thread_f(va_list ap);
-
-/** Start WAL thread and setup pipes to and from TX. */
-void
-wal_thread_start()
-{
-	if (cord_costart(&wal_thread.cord, "wal", wal_thread_f, NULL) != 0)
-		panic("failed to start WAL thread");
-
-	/* Create a pipe to WAL thread. */
-	cpipe_create(&wal_thread.wal_pipe, "wal");
-	cpipe_set_max_input(&wal_thread.wal_pipe, IOV_MAX);
-}
+wal_writer_f(va_list ap);
 
 static int
 wal_open_f(struct cbus_call_msg *msg)
@@ -440,7 +421,7 @@ wal_open(struct wal_writer *writer)
 	 * thread.
 	 */
 	struct cbus_call_msg msg;
-	if (cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg,
+	if (cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg,
 		      wal_open_f, NULL, TIMEOUT_INFINITY) == 0) {
 		/*
 		 * Success: we can now append to
@@ -472,28 +453,38 @@ wal_open(struct wal_writer *writer)
 	return 0;
 }
 
-/**
- * Initialize WAL writer.
- *
- * @pre   The instance has completed recovery from a snapshot
- *        and/or existing WALs. All WALs opened in read-only
- *        mode are closed. WAL thread has been started.
- */
 int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-	 const struct vclock *vclock, const struct vclock *checkpoint_vclock,
 	 wal_on_garbage_collection_f on_garbage_collection,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	assert(wal_max_rows > 1);
 
+	/* Initialize the state. */
 	struct wal_writer *writer = &wal_writer_singleton;
 	wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
-			  wal_max_size, instance_uuid, vclock,
-			  checkpoint_vclock, on_garbage_collection,
+			  wal_max_size, instance_uuid, on_garbage_collection,
 			  on_checkpoint_threshold);
 
+	/* Start WAL thread. */
+	if (cord_costart(&writer->cord, "wal", wal_writer_f, NULL) != 0)
+		return -1;
+
+	/* Create a pipe to WAL thread. */
+	cpipe_create(&writer->wal_pipe, "wal");
+	cpipe_set_max_input(&writer->wal_pipe, IOV_MAX);
+	return 0;
+}
+
+int
+wal_enable(void)
+{
+	struct wal_writer *writer = &wal_writer_singleton;
+
+	/* Initialize the writer vclock from the recovery state. */
+	vclock_copy(&writer->vclock, &replicaset.vclock);
+
 	/*
 	 * Scan the WAL directory to build an index of all
 	 * existing WAL files. Required for garbage collection,
@@ -502,29 +493,28 @@ wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	if (xdir_scan(&writer->wal_dir))
 		return -1;
 
+	/* Open the most recent WAL file. */
 	if (wal_open(writer) != 0)
 		return -1;
 
+	/* Enable journalling. */
 	journal_set(&writer->base);
 	return 0;
 }
 
-/**
- * Stop WAL thread, wait until it exits, and destroy WAL writer
- * if it was initialized. Called on shutdown.
- */
 void
-wal_thread_stop()
+wal_free(void)
 {
-	cbus_stop_loop(&wal_thread.wal_pipe);
+	struct wal_writer *writer = &wal_writer_singleton;
 
-	if (cord_join(&wal_thread.cord)) {
+	cbus_stop_loop(&writer->wal_pipe);
+
+	if (cord_join(&writer->cord)) {
 		/* We can't recover from this in any reasonable way. */
 		panic_syserror("WAL writer: thread join failed");
 	}
 
-	if (journal_is_initialized(&wal_writer_singleton.base))
-		wal_writer_destroy(&wal_writer_singleton);
+	wal_writer_destroy(writer);
 }
 
 void
@@ -533,7 +523,7 @@ wal_sync(void)
 	struct wal_writer *writer = &wal_writer_singleton;
 	if (writer->wal_mode == WAL_NONE)
 		return;
-	cbus_flush(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, NULL);
+	cbus_flush(&writer->wal_pipe, &writer->tx_prio_pipe, NULL);
 }
 
 static int
@@ -588,7 +578,7 @@ wal_begin_checkpoint(struct wal_checkpoint *checkpoint)
 		return -1;
 	}
 	bool cancellable = fiber_set_cancellable(false);
-	int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+	int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
 			   &checkpoint->base, wal_begin_checkpoint_f, NULL,
 			   TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
@@ -628,7 +618,7 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint)
 		return;
 	}
 	bool cancellable = fiber_set_cancellable(false);
-	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+	cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
 		  &checkpoint->base, wal_commit_checkpoint_f, NULL,
 		  TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
@@ -658,7 +648,7 @@ wal_set_checkpoint_threshold(int64_t threshold)
 	struct wal_set_checkpoint_threshold_msg msg;
 	msg.checkpoint_threshold = threshold;
 	bool cancellable = fiber_set_cancellable(false);
-	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+	cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
 		  &msg.base, wal_set_checkpoint_threshold_f, NULL,
 		  TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
@@ -707,7 +697,7 @@ wal_collect_garbage(const struct vclock *vclock)
 	struct wal_gc_msg msg;
 	msg.vclock = vclock;
 	bool cancellable = fiber_set_cancellable(false);
-	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg.base,
+	cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg.base,
 		  wal_collect_garbage_f, NULL, TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
 }
@@ -841,7 +831,7 @@ out:
 					      &msg->vclock) < 0)
 				vclock_copy(&msg->vclock, &writer->vclock);
 			cmsg_init(&msg->base, route);
-			cpipe_push(&wal_thread.tx_prio_pipe, &msg->base);
+			cpipe_push(&writer->tx_prio_pipe, &msg->base);
 		} else
 			say_warn("failed to allocate gc notification message");
 	}
@@ -874,14 +864,14 @@ wal_writer_begin_rollback(struct wal_writer *writer)
 		 * valve is closed by non-empty writer->rollback
 		 * list.
 		 */
-		{ wal_writer_clear_bus, &wal_thread.wal_pipe },
-		{ wal_writer_clear_bus, &wal_thread.tx_prio_pipe },
+		{ wal_writer_clear_bus, &wal_writer_singleton.wal_pipe },
+		{ wal_writer_clear_bus, &wal_writer_singleton.tx_prio_pipe },
 		/*
 		 * Step 2: writer->rollback queue contains all
 		 * messages which need to be rolled back,
 		 * perform the rollback.
 		 */
-		{ tx_schedule_rollback, &wal_thread.wal_pipe },
+		{ tx_schedule_rollback, &wal_writer_singleton.wal_pipe },
 		/*
 		 * Step 3: re-open the WAL for writing.
 		 */
@@ -893,7 +883,7 @@ wal_writer_begin_rollback(struct wal_writer *writer)
 	 * all input until rollback mode is off.
 	 */
 	cmsg_init(&writer->in_rollback, rollback_route);
-	cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback);
+	cpipe_push(&writer->tx_prio_pipe, &writer->in_rollback);
 }
 
 static void
@@ -1013,7 +1003,7 @@ wal_write_to_disk(struct cmsg *msg)
 		struct cmsg *msg = malloc(sizeof(*msg));
 		if (msg != NULL) {
 			cmsg_init(msg, route);
-			cpipe_push(&wal_thread.tx_prio_pipe, msg);
+			cpipe_push(&writer->tx_prio_pipe, msg);
 			writer->checkpoint_triggered = true;
 		} else {
 			say_warn("failed to allocate checkpoint "
@@ -1056,11 +1046,12 @@ done:
 	wal_notify_watchers(writer, WAL_EVENT_WRITE);
 }
 
-/** WAL thread main loop.  */
+/** WAL writer main loop.  */
 static int
-wal_thread_f(va_list ap)
+wal_writer_f(va_list ap)
 {
 	(void) ap;
+	struct wal_writer *writer = &wal_writer_singleton;
 
 	/** Initialize eio in this thread */
 	coio_enable();
@@ -1072,12 +1063,10 @@ wal_thread_f(va_list ap)
 	 * endpoint, to ensure that WAL messages are delivered
 	 * even when tx fiber pool is used up by net messages.
 	 */
-	cpipe_create(&wal_thread.tx_prio_pipe, "tx_prio");
+	cpipe_create(&writer->tx_prio_pipe, "tx_prio");
 
 	cbus_loop(&endpoint);
 
-	struct wal_writer *writer = &wal_writer_singleton;
-
 	/*
 	 * Create a new empty WAL on shutdown so that we don't
 	 * have to rescan the last WAL to find the instance vclock.
@@ -1101,7 +1090,7 @@ wal_thread_f(va_list ap)
 	if (xlog_is_open(&vy_log_writer.xlog))
 		xlog_close(&vy_log_writer.xlog, false);
 
-	cpipe_destroy(&wal_thread.tx_prio_pipe);
+	cpipe_destroy(&writer->tx_prio_pipe);
 	return 0;
 }
 
@@ -1131,8 +1120,8 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	}
 
 	struct wal_msg *batch;
-	if (!stailq_empty(&wal_thread.wal_pipe.input) &&
-	    (batch = wal_msg(stailq_first_entry(&wal_thread.wal_pipe.input,
+	if (!stailq_empty(&writer->wal_pipe.input) &&
+	    (batch = wal_msg(stailq_first_entry(&writer->wal_pipe.input,
 						struct cmsg, fifo)))) {
 
 		stailq_add_tail_entry(&batch->commit, entry, fifo);
@@ -1151,11 +1140,11 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 		 * thread right away.
 		 */
 		stailq_add_tail_entry(&batch->commit, entry, fifo);
-		cpipe_push(&wal_thread.wal_pipe, &batch->base);
+		cpipe_push(&writer->wal_pipe, &batch->base);
 	}
 	batch->approx_len += entry->approx_len;
-	wal_thread.wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
-	cpipe_flush_input(&wal_thread.wal_pipe);
+	writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
+	cpipe_flush_input(&writer->wal_pipe);
 	/**
 	 * It's not safe to spuriously wakeup this fiber
 	 * since in that case it will ignore a possible
@@ -1213,10 +1202,11 @@ wal_write_vy_log_f(struct cbus_call_msg *msg)
 int
 wal_write_vy_log(struct journal_entry *entry)
 {
+	struct wal_writer *writer = &wal_writer_singleton;
 	struct wal_write_vy_log_msg msg;
 	msg.entry= entry;
 	bool cancellable = fiber_set_cancellable(false);
-	int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+	int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
 			   &msg.base, wal_write_vy_log_f, NULL,
 			   TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
@@ -1235,9 +1225,10 @@ wal_rotate_vy_log_f(struct cbus_call_msg *msg)
 void
 wal_rotate_vy_log()
 {
+	struct wal_writer *writer = &wal_writer_singleton;
 	struct cbus_call_msg msg;
 	bool cancellable = fiber_set_cancellable(false);
-	cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg,
+	cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg,
 		  wal_rotate_vy_log_f, NULL, TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
 }
diff --git a/src/box/wal.h b/src/box/wal.h
index a9452f2b..4e500d2a 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -68,18 +68,26 @@ typedef void (*wal_on_garbage_collection_f)(const struct vclock *vclock);
  */
 typedef void (*wal_on_checkpoint_threshold_f)(void);
 
-void
-wal_thread_start();
-
+/**
+ * Start WAL thread and initialize WAL writer.
+ */
 int
 wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
-	 const struct vclock *vclock, const struct vclock *checkpoint_vclock,
 	 wal_on_garbage_collection_f on_garbage_collection,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold);
 
+/**
+ * Setup WAL writer as journaling subsystem.
+ */
+int
+wal_enable(void);
+
+/**
+ * Stop WAL thread and free WAL writer resources.
+ */
 void
-wal_thread_stop();
+wal_free(void);
 
 struct wal_watcher_msg {
 	struct cmsg cmsg;
diff --git a/test/vinyl/replica_quota.result b/test/vinyl/replica_quota.result
index 50e39719..bd09e764 100644
--- a/test/vinyl/replica_quota.result
+++ b/test/vinyl/replica_quota.result
@@ -10,18 +10,40 @@ s = box.schema.space.create('test', { engine = 'vinyl' })
 _ = s:create_index('pk', {run_count_per_level = 1})
 ---
 ...
--- Send > 2 MB to replica.
-pad = string.rep('x', 1100)
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
 ---
 ...
-for i = 1,1000 do s:insert{i, pad} end
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+pad = string.rep('x', 10000);
+---
+...
+function fill()
+    for i = 1, 50 do
+        box.begin()
+        for j = 1, 10 do
+            s:replace{math.random(100), math.random(100), pad}
+        end
+        box.commit()
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Send > 1 MB to replica.
+fill()
 ---
 ...
 box.snapshot()
 ---
 - ok
 ...
-for i = 1001,2000 do s:insert{i, pad} end
+fill()
 ---
 ...
 -- Replica has memory limit set to 1 MB so replication would hang
@@ -46,7 +68,7 @@ _ = test_run:wait_lsn('replica', 'default')
 _ = test_run:cmd("stop server replica")
 ---
 ...
-for i = 2001,3000 do s:insert{i, pad} end
+fill()
 ---
 ...
 _ = test_run:cmd("start server replica")
@@ -67,7 +89,7 @@ box.snapshot()
 ---
 - ok
 ...
-for i = 3001,4000 do s:insert{i, pad} end
+fill()
 ---
 ...
 _ = test_run:cmd("start server replica") -- join
diff --git a/test/vinyl/replica_quota.test.lua b/test/vinyl/replica_quota.test.lua
index e04abbc2..1f373fd4 100644
--- a/test/vinyl/replica_quota.test.lua
+++ b/test/vinyl/replica_quota.test.lua
@@ -4,12 +4,25 @@ box.schema.user.grant('guest', 'replication')
 
 s = box.schema.space.create('test', { engine = 'vinyl' })
 _ = s:create_index('pk', {run_count_per_level = 1})
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
 
--- Send > 2 MB to replica.
-pad = string.rep('x', 1100)
-for i = 1,1000 do s:insert{i, pad} end
+test_run:cmd("setopt delimiter ';'")
+pad = string.rep('x', 10000);
+function fill()
+    for i = 1, 50 do
+        box.begin()
+        for j = 1, 10 do
+            s:replace{math.random(100), math.random(100), pad}
+        end
+        box.commit()
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Send > 1 MB to replica.
+fill()
 box.snapshot()
-for i = 1001,2000 do s:insert{i, pad} end
+fill()
 
 -- Replica has memory limit set to 1 MB so replication would hang
 -- if the scheduler didn't work on the destination.
@@ -26,7 +39,7 @@ _ = test_run:wait_lsn('replica', 'default')
 
 -- Check vinyl_timeout is ignored on 'subscribe' (gh-3087).
 _ = test_run:cmd("stop server replica")
-for i = 2001,3000 do s:insert{i, pad} end
+fill()
 _ = test_run:cmd("start server replica")
 _ = test_run:wait_lsn('replica', 'default')
 
@@ -36,7 +49,7 @@ _ = test_run:cmd("stop server replica")
 _ = test_run:cmd("cleanup server replica")
 
 box.snapshot()
-for i = 3001,4000 do s:insert{i, pad} end
+fill()
 
 _ = test_run:cmd("start server replica") -- join
 _ = test_run:cmd("stop server replica")
-- 
2.11.0




More information about the Tarantool-patches mailing list