[Tarantool-patches] [PATCH 6/6] recovery: follow transaction boundaries while recovery or join

Georgy Kirichenko georgy at tarantool.org
Tue Nov 19 19:04:57 MSK 2019


Do not start a transaction for each local journal or final join row
but follow transaction boundaries instead.

Part of #980
---
 src/box/applier.cc                     | 92 +++++++++++++-------------
 src/box/box.cc                         | 72 ++++++++++++++------
 test/xlog/panic_on_broken_lsn.result   |  9 ++-
 test/xlog/panic_on_broken_lsn.test.lua |  7 +-
 4 files changed, 107 insertions(+), 73 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 294765195..d00b1b04a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -268,23 +268,6 @@ apply_row(struct xrow_header *row)
 	return 0;
 }
 
-static int
-apply_final_join_row(struct xrow_header *row)
-{
-	struct txn *txn = txn_begin();
-	if (txn == NULL)
-		return -1;
-	if (apply_row(row) != 0) {
-		txn_rollback(txn);
-		fiber_gc();
-		return -1;
-	}
-	if (txn_commit(txn) != 0)
-		return -1;
-	fiber_gc();
-	return 0;
-}
-
 /**
  * Connect to a remote host and authenticate the client.
  */
@@ -391,6 +374,22 @@ done:
 	applier_set_state(applier, APPLIER_READY);
 }
 
+/**
+ * A helper struct to link xrow objects in a list.
+ */
+struct applier_tx_row {
+	/* Next transaction row. */
+	struct stailq_entry next;
+	/* xrow_header struct for the current transaction row. */
+	struct xrow_header row;
+};
+
+static void
+applier_read_tx(struct applier *applier, struct stailq *rows);
+
+static int
+applier_apply_tx(struct stailq *rows);
+
 /**
  * Execute and process JOIN request (bootstrap the instance).
  */
@@ -478,27 +477,29 @@ applier_join(struct applier *applier)
 	 * Receive final data.
 	 */
 	while (true) {
-		if (coio_read_xrow(coio, ibuf, &row) < 0)
-			diag_raise();
-		applier->last_row_time = ev_monotonic_now(loop());
-		if (iproto_type_is_dml(row.type)) {
-			vclock_follow_xrow(&replicaset.vclock, &row);
-			if (apply_final_join_row(&row) != 0)
-				diag_raise();
-			if (++row_count % 100000 == 0)
-				say_info("%.1fM rows received", row_count / 1e6);
-		} else if (row.type == IPROTO_OK) {
-			/*
-			 * Current vclock. This is not used now,
-			 * ignore.
-			 */
+		struct stailq rows;
+		applier_read_tx(applier, &rows);
+		struct xrow_header *first_row =
+			&(stailq_first_entry(&rows, struct applier_tx_row,
+					    next)->row);
+		if (first_row->type == IPROTO_OK) {
+			if (applier->version_id < version_id(1, 7, 0)) {
+				/*
+				 * This is the start vclock if the
+				 * server is 1.6. Since we have
+				 * not initialized replication
+				 * vclock yet, do it now. In 1.7+
+				 * this vclock is not used.
+				 */
+				xrow_decode_vclock_xc(first_row, &replicaset.vclock);
+			}
 			break; /* end of stream */
-		} else if (iproto_type_is_error(row.type)) {
-			xrow_decode_error_xc(&row);  /* rethrow error */
-		} else {
-			tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
-				  (uint32_t) row.type);
 		}
+		if (applier_apply_tx(&rows) != 0)
+			diag_raise();
+		if (ibuf_used(ibuf) == 0)
+			ibuf_reset(ibuf);
+		fiber_gc();
 	}
 	say_info("final data received");
 
@@ -506,16 +507,6 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
-/**
- * A helper struct to link xrow objects in a list.
- */
-struct applier_tx_row {
-	/* Next transaction row. */
-	struct stailq_entry next;
-	/* xrow_header struct for the current transaction row. */
-	struct xrow_header row;
-};
-
 static struct applier_tx_row *
 applier_read_tx_row(struct applier *applier)
 {
@@ -532,6 +523,9 @@ applier_read_tx_row(struct applier *applier)
 	struct xrow_header *row = &tx_row->row;
 
 	double timeout = replication_disconnect_timeout();
+	/* We check timeout only in case of subscribe. */
+	if (applier->state == APPLIER_FINAL_JOIN)
+		timeout = TIMEOUT_INFINITY;
 	/*
 	 * Tarantool < 1.7.7 does not send periodic heartbeat
 	 * messages so we can't assume that if we haven't heard
@@ -568,6 +562,12 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
 		struct applier_tx_row *tx_row = applier_read_tx_row(applier);
 		struct xrow_header *row = &tx_row->row;
 
+		if (row->type == IPROTO_OK) {
+			stailq_add_tail(rows, &tx_row->next);
+			assert(tx_row->row.is_commit);
+			break;
+		}
+
 		if (iproto_type_is_error(row->type))
 			xrow_decode_error_xc(row);
 
diff --git a/src/box/box.cc b/src/box/box.cc
index 71822551e..9464eee63 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -286,6 +286,8 @@ struct wal_stream {
 	struct xstream base;
 	/** How many rows have been recovered so far. */
 	size_t rows;
+	/** Current transaction.*/
+	struct txn *txn;
 };
 
 /**
@@ -321,7 +323,8 @@ recovery_journal_write(struct journal *base,
 }
 
 static inline void
-recovery_journal_create(struct recovery_journal *journal, const struct vclock *v)
+recovery_journal_create(struct recovery_journal *journal,
+			const struct vclock *v)
 {
 	journal_create(&journal->base, recovery_journal_write, NULL);
 	vclock_copy(&journal->vclock, v);
@@ -330,33 +333,44 @@ recovery_journal_create(struct recovery_journal *journal, const struct vclock *v
 static int
 apply_wal_row(struct xstream *stream, struct xrow_header *row)
 {
+	struct wal_stream *wal_stream =
+		container_of(stream, struct wal_stream, base);
+	if (wal_stream->txn == NULL) {
+		wal_stream->txn = txn_begin();
+		if (wal_stream->txn == NULL)
+			return -1;
+	}
 	struct request request;
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
+	int rc = 0;
 	if (request.type != IPROTO_NOP) {
 		struct space *space = space_cache_find_xc(request.space_id);
-		if (box_process_rw(&request, space, NULL) != 0) {
+		rc = box_process_rw(&request, space, NULL);
+		if (rc != 0)
 			say_error("error applying row: %s", request_str(&request));
-			return -1;
-		}
 	} else {
-		struct txn *txn = txn_begin();
-		if (txn == NULL || txn_begin_stmt(txn, NULL) != 0 ||
-		    txn_commit_stmt(txn, &request) != 0) {
-			txn_rollback(txn);
+		struct txn *txn = in_txn();
+		rc = txn_begin_stmt(txn, NULL);
+		if (rc == 0)
+			rc = txn_commit_stmt(txn, &request);
+	}
+	if (row->is_commit) {
+		if (txn_commit(wal_stream->txn) != 0) {
+			wal_stream->txn = NULL;
 			return -1;
 		}
-		if (txn_commit(txn) != 0)
-			return -1;
+		wal_stream->txn = NULL;
 	}
-	struct wal_stream *xstream =
-		container_of(stream, struct wal_stream, base);
 	/**
 	 * Yield once in a while, but not too often,
 	 * mostly to allow signal handling to take place.
 	 */
-	if (++xstream->rows % WAL_ROWS_PER_YIELD == 0)
+	if (++(wal_stream->rows) > WAL_ROWS_PER_YIELD &&
+	    wal_stream->txn == NULL) {
+		wal_stream->rows -= WAL_ROWS_PER_YIELD;
 		fiber_sleep(0);
-	return 0;
+	}
+	return rc;
 }
 
 static void
@@ -364,6 +378,21 @@ wal_stream_create(struct wal_stream *ctx)
 {
 	xstream_create(&ctx->base, apply_wal_row);
 	ctx->rows = 0;
+	ctx->txn = NULL;
+}
+
+static int
+wal_stream_destroy(struct wal_stream *ctx)
+{
+	if (ctx->txn != NULL) {
+		/* The last processed row does not have a commit flag set. */
+		txn_rollback(ctx->txn);
+		ctx->txn = NULL;
+		diag_set(ClientError, ER_UNSUPPORTED,
+			 "recovery", "not finished transactions");
+		return -1;
+	}
+	return 0;
 }
 
 /* {{{ configuration bindings */
@@ -1917,6 +1946,9 @@ local_recovery(const struct tt_uuid *instance_uuid,
 
 	struct wal_stream wal_stream;
 	wal_stream_create(&wal_stream);
+	auto wal_stream_guard = make_scoped_guard([&]{
+		wal_stream_destroy(&wal_stream);
+	});
 
 	struct recovery *recovery;
 	recovery = recovery_new(cfg_gets("wal_dir"),
@@ -1925,13 +1957,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	if (recovery == NULL)
 		diag_raise();
 
-	/*
-	 * Make sure we report the actual recovery position
-	 * in box.info while local recovery is in progress.
-	 */
-	box_vclock = &recovery->vclock;
 	auto guard = make_scoped_guard([&]{
-		box_vclock = &replicaset.vclock;
 		recovery_stop_local(recovery);
 		recovery_delete(recovery);
 	});
@@ -1981,12 +2007,12 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	 */
 	memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
 
-	vclock_copy(&replicaset.vclock, checkpoint_vclock);
 	struct recovery_journal journal;
 	recovery_journal_create(&journal, &recovery->vclock);
 	journal_set(&journal.base);
 
 	engine_begin_final_recovery_xc();
+
 	if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0)
 		diag_raise();
 	engine_end_recovery_xc();
@@ -2015,7 +2041,9 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		box_sync_replication(false);
 	}
 	recovery_finalize(recovery);
-
+	wal_stream_guard.is_active = false;
+	if (wal_stream_destroy(&wal_stream))
+		diag_raise();
 	/*
 	 * We must enable WAL before finalizing engine recovery,
 	 * because an engine may start writing to WAL right after
diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result
index 1e62680eb..e209374b6 100644
--- a/test/xlog/panic_on_broken_lsn.result
+++ b/test/xlog/panic_on_broken_lsn.result
@@ -141,8 +141,11 @@ test_run:cmd("setopt delimiter ';'")
 _ = fiber.create(function()
     test_run:wait_cond(function() return box.info.replication[2] ~= nil end)
     lsn = box.info.vclock[1]
-    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 2)
+    box.begin()
     box.space.test:auto_increment{'v1'}
+    box.space.test:auto_increment{'v1'}
+    box.commit()
     box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
 end);
 ---
@@ -164,9 +167,9 @@ box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
 - ok
 ...
 -- Check that log contains the mention of broken LSN and the request printout
-grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn)
+grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn + 1)
 ---
-- '{type: ''INSERT'', replica_id: 1, space_id: 9000, index_id: 0, tuple: [2, "v1"]}'
+- '{type: ''INSERT'', replica_id: 1, space_id: 9000, index_id: 0, tuple: [3, "v1"]}'
 ...
 test_run:cmd('cleanup server replica')
 ---
diff --git a/test/xlog/panic_on_broken_lsn.test.lua b/test/xlog/panic_on_broken_lsn.test.lua
index 80cccd918..a1d62cee5 100644
--- a/test/xlog/panic_on_broken_lsn.test.lua
+++ b/test/xlog/panic_on_broken_lsn.test.lua
@@ -67,8 +67,11 @@ test_run:cmd("setopt delimiter ';'")
 _ = fiber.create(function()
     test_run:wait_cond(function() return box.info.replication[2] ~= nil end)
     lsn = box.info.vclock[1]
-    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 2)
+    box.begin()
     box.space.test:auto_increment{'v1'}
+    box.space.test:auto_increment{'v1'}
+    box.commit()
     box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
 end);
 test_run:cmd("setopt delimiter ''");
@@ -78,7 +81,7 @@ test_run:cmd('start server replica with crash_expected=True')
 box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
 
 -- Check that log contains the mention of broken LSN and the request printout
-grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn)
+grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn + 1)
 
 test_run:cmd('cleanup server replica')
 test_run:cmd('delete server replica')
-- 
2.24.0



More information about the Tarantool-patches mailing list