[Tarantool-patches] [PATCH 5/6] box: improve recovery journal

Georgy Kirichenko georgy at tarantool.org
Tue Nov 19 19:04:56 MSK 2019


Refactoring: track recovery journal vclock instead of to use
the recovery ones. Now replicaset vclock will rely on recovery stream
content instead of wal directory content (xlog names and meta). This
enables applier to use this journal and  generalize wal recovery and
applier final join handling.

Part of #980
---
 src/box/box.cc               | 39 +++++++++++++++++++++++-------------
 test/xlog-py/big_lsn.result  |  4 ++++
 test/xlog-py/big_lsn.test.py | 13 ++++++------
 test/xlog-py/dup_key.result  |  8 ++++++++
 test/xlog-py/dup_key.test.py |  7 +++++++
 5 files changed, 51 insertions(+), 20 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index f41ef9ce8..71822551e 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -296,7 +296,7 @@ struct wal_stream {
  */
 struct recovery_journal {
 	struct journal base;
-	struct vclock *vclock;
+	struct vclock vclock;
 };
 
 /**
@@ -309,16 +309,22 @@ recovery_journal_write(struct journal *base,
 		       struct journal_entry *entry)
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
-	entry->res = vclock_sum(journal->vclock);
+	for (struct xrow_header **row = entry->rows;
+	     row < entry->rows + entry->n_rows; ++row) {
+		vclock_follow_xrow(&journal->vclock, *row);
+	}
+	entry->res = vclock_sum(&journal->vclock);
+	/* Assume the entry was committed and adjust replicaset vclock. */
+	vclock_copy(&replicaset.vclock, &journal->vclock);
 	journal_entry_complete(entry);
 	return 0;
 }
 
 static inline void
-recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
+recovery_journal_create(struct recovery_journal *journal, const struct vclock *v)
 {
 	journal_create(&journal->base, recovery_journal_write, NULL);
-	journal->vclock = v;
+	vclock_copy(&journal->vclock, v);
 }
 
 static int
@@ -332,6 +338,15 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 			say_error("error applying row: %s", request_str(&request));
 			return -1;
 		}
+	} else {
+		struct txn *txn = txn_begin();
+		if (txn == NULL || txn_begin_stmt(txn, NULL) != 0 ||
+		    txn_commit_stmt(txn, &request) != 0) {
+			txn_rollback(txn);
+			return -1;
+		}
+		if (txn_commit(txn) != 0)
+			return -1;
 	}
 	struct wal_stream *xstream =
 		container_of(stream, struct wal_stream, base);
@@ -1956,11 +1971,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	struct memtx_engine *memtx;
 	memtx = (struct memtx_engine *)engine_by_name("memtx");
 	assert(memtx != NULL);
-
-	struct recovery_journal journal;
-	recovery_journal_create(&journal, &recovery->vclock);
-	journal_set(&journal.base);
-
+	vclock_copy(&replicaset.vclock, checkpoint_vclock);
 	/*
 	 * We explicitly request memtx to recover its
 	 * snapshot as a separate phase since it contains
@@ -1970,6 +1981,11 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	 */
 	memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
 
+	vclock_copy(&replicaset.vclock, checkpoint_vclock);
+	struct recovery_journal journal;
+	recovery_journal_create(&journal, &recovery->vclock);
+	journal_set(&journal.base);
+
 	engine_begin_final_recovery_xc();
 	if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0)
 		diag_raise();
@@ -1995,11 +2011,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		if (recover_remaining_wals(recovery, &wal_stream.base, NULL,
 					   true) != 0)
 			diag_raise();
-		/*
-		 * Advance replica set vclock to reflect records
-		 * applied in hot standby mode.
-		 */
-		vclock_copy(&replicaset.vclock, &recovery->vclock);
 		box_listen();
 		box_sync_replication(false);
 	}
diff --git a/test/xlog-py/big_lsn.result b/test/xlog-py/big_lsn.result
index b370773f2..6c10f6957 100644
--- a/test/xlog-py/big_lsn.result
+++ b/test/xlog-py/big_lsn.result
@@ -5,6 +5,10 @@ box.info.lsn
 box.space._schema:delete('dummy')
 ---
 ...
+box.snapshot()
+---
+- ok
+...
 box.info.lsn
 ---
 - 123456789123
diff --git a/test/xlog-py/big_lsn.test.py b/test/xlog-py/big_lsn.test.py
index c6a31d971..bdc84e012 100644
--- a/test/xlog-py/big_lsn.test.py
+++ b/test/xlog-py/big_lsn.test.py
@@ -9,21 +9,22 @@ server.stop()
 server.deploy()
 server.admin("box.info.lsn")
 server.admin("box.space._schema:delete('dummy')")
+server.admin("box.snapshot()")
 server.stop()
 
-# Bump the instance vclock by tweaking the last xlog.
+# Bump the instance vclock by tweaking the checkpoint.
 old_lsn = 1
 new_lsn = 123456789123
-wal_dir = os.path.join(server.vardir, server.name)
-old_wal = os.path.join(wal_dir, "%020d.xlog" % old_lsn)
-new_wal = os.path.join(wal_dir, "%020d.xlog" % new_lsn)
-with open(old_wal, "r+") as f:
+snap_dir = os.path.join(server.vardir, server.name)
+old_snap = os.path.join(snap_dir, "%020d.snap" % old_lsn)
+new_snap = os.path.join(snap_dir, "%020d.snap" % new_lsn)
+with open(old_snap, "r+") as f:
     s = f.read()
     s = s.replace("VClock: {1: %d}" % old_lsn,
                   "VClock: {1: %d}" % new_lsn)
     f.seek(0)
     f.write(s)
-os.rename(old_wal, new_wal)
+os.rename(old_snap, new_snap)
 
 # Recover and make a snapshot.
 server.start()
diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result
index f387e8e89..0d00dfb97 100644
--- a/test/xlog-py/dup_key.result
+++ b/test/xlog-py/dup_key.result
@@ -17,6 +17,14 @@ box.space.test:insert{2, 'second tuple'}
 - [2, 'second tuple']
 ...
 .xlog exists
+box.space.test:insert{1, 'nop'}
+---
+- [1, 'nop']
+...
+box.space.test:delete{1}
+---
+- [1, 'nop']
+...
 box.space.test:insert{1, 'third tuple'}
 ---
 - [1, 'third tuple']
diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py
index 7609c9555..cb834747f 100644
--- a/test/xlog-py/dup_key.test.py
+++ b/test/xlog-py/dup_key.test.py
@@ -29,12 +29,19 @@ if os.access(wal, os.F_OK):
     print ".xlog exists"
     os.rename(wal, wal_old)
 
+# Write wal#1-1
+server.start()
+server.admin("box.space.test:insert{1, 'nop'}")
+server.admin("box.space.test:delete{1}")
+server.stop()
+
 # Write wal#2
 server.start()
 server.admin("box.space.test:insert{1, 'third tuple'}")
 server.admin("box.space.test:insert{2, 'fourth tuple'}")
 server.stop()
 
+os.unlink(wal)
 # Restore wal#1
 if not os.access(wal, os.F_OK):
     print ".xlog does not exist"
-- 
2.24.0



More information about the Tarantool-patches mailing list