From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp45.i.mail.ru (smtp45.i.mail.ru [94.100.177.105]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 371F3440F3C for ; Tue, 19 Nov 2019 19:05:03 +0300 (MSK) From: Georgy Kirichenko Date: Tue, 19 Nov 2019 19:04:56 +0300 Message-Id: <2570203f7065f2413bdbfa2e2096dad6f23f784f.1574178520.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 5/6] box: improve recovery journal List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Refactoring: track recovery journal vclock instead of to use the recovery ones. Now replicaset vclock will rely on recovery stream content instead of wal directory content (xlog names and meta). This enables applier to use this journal and generalize wal recovery and applier final join handling. Part of #980 --- src/box/box.cc | 39 +++++++++++++++++++++++------------- test/xlog-py/big_lsn.result | 4 ++++ test/xlog-py/big_lsn.test.py | 13 ++++++------ test/xlog-py/dup_key.result | 8 ++++++++ test/xlog-py/dup_key.test.py | 7 +++++++ 5 files changed, 51 insertions(+), 20 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index f41ef9ce8..71822551e 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -296,7 +296,7 @@ struct wal_stream { */ struct recovery_journal { struct journal base; - struct vclock *vclock; + struct vclock vclock; }; /** @@ -309,16 +309,22 @@ recovery_journal_write(struct journal *base, struct journal_entry *entry) { struct recovery_journal *journal = (struct recovery_journal *) base; - entry->res = vclock_sum(journal->vclock); + for (struct xrow_header **row = entry->rows; + row < entry->rows + entry->n_rows; ++row) { + vclock_follow_xrow(&journal->vclock, *row); + } + entry->res = vclock_sum(&journal->vclock); + /* Assume the entry was committed and adjust replicaset vclock. */ + vclock_copy(&replicaset.vclock, &journal->vclock); journal_entry_complete(entry); return 0; } static inline void -recovery_journal_create(struct recovery_journal *journal, struct vclock *v) +recovery_journal_create(struct recovery_journal *journal, const struct vclock *v) { journal_create(&journal->base, recovery_journal_write, NULL); - journal->vclock = v; + vclock_copy(&journal->vclock, v); } static int @@ -332,6 +338,15 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) say_error("error applying row: %s", request_str(&request)); return -1; } + } else { + struct txn *txn = txn_begin(); + if (txn == NULL || txn_begin_stmt(txn, NULL) != 0 || + txn_commit_stmt(txn, &request) != 0) { + txn_rollback(txn); + return -1; + } + if (txn_commit(txn) != 0) + return -1; } struct wal_stream *xstream = container_of(stream, struct wal_stream, base); @@ -1956,11 +1971,7 @@ local_recovery(const struct tt_uuid *instance_uuid, struct memtx_engine *memtx; memtx = (struct memtx_engine *)engine_by_name("memtx"); assert(memtx != NULL); - - struct recovery_journal journal; - recovery_journal_create(&journal, &recovery->vclock); - journal_set(&journal.base); - + vclock_copy(&replicaset.vclock, checkpoint_vclock); /* * We explicitly request memtx to recover its * snapshot as a separate phase since it contains @@ -1970,6 +1981,11 @@ local_recovery(const struct tt_uuid *instance_uuid, */ memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock); + vclock_copy(&replicaset.vclock, checkpoint_vclock); + struct recovery_journal journal; + recovery_journal_create(&journal, &recovery->vclock); + journal_set(&journal.base); + engine_begin_final_recovery_xc(); if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0) diag_raise(); @@ -1995,11 +2011,6 @@ local_recovery(const struct tt_uuid *instance_uuid, if (recover_remaining_wals(recovery, &wal_stream.base, NULL, true) != 0) diag_raise(); - /* - * Advance replica set vclock to reflect records - * applied in hot standby mode. - */ - vclock_copy(&replicaset.vclock, &recovery->vclock); box_listen(); box_sync_replication(false); } diff --git a/test/xlog-py/big_lsn.result b/test/xlog-py/big_lsn.result index b370773f2..6c10f6957 100644 --- a/test/xlog-py/big_lsn.result +++ b/test/xlog-py/big_lsn.result @@ -5,6 +5,10 @@ box.info.lsn box.space._schema:delete('dummy') --- ... +box.snapshot() +--- +- ok +... box.info.lsn --- - 123456789123 diff --git a/test/xlog-py/big_lsn.test.py b/test/xlog-py/big_lsn.test.py index c6a31d971..bdc84e012 100644 --- a/test/xlog-py/big_lsn.test.py +++ b/test/xlog-py/big_lsn.test.py @@ -9,21 +9,22 @@ server.stop() server.deploy() server.admin("box.info.lsn") server.admin("box.space._schema:delete('dummy')") +server.admin("box.snapshot()") server.stop() -# Bump the instance vclock by tweaking the last xlog. +# Bump the instance vclock by tweaking the checkpoint. old_lsn = 1 new_lsn = 123456789123 -wal_dir = os.path.join(server.vardir, server.name) -old_wal = os.path.join(wal_dir, "%020d.xlog" % old_lsn) -new_wal = os.path.join(wal_dir, "%020d.xlog" % new_lsn) -with open(old_wal, "r+") as f: +snap_dir = os.path.join(server.vardir, server.name) +old_snap = os.path.join(snap_dir, "%020d.snap" % old_lsn) +new_snap = os.path.join(snap_dir, "%020d.snap" % new_lsn) +with open(old_snap, "r+") as f: s = f.read() s = s.replace("VClock: {1: %d}" % old_lsn, "VClock: {1: %d}" % new_lsn) f.seek(0) f.write(s) -os.rename(old_wal, new_wal) +os.rename(old_snap, new_snap) # Recover and make a snapshot. server.start() diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result index f387e8e89..0d00dfb97 100644 --- a/test/xlog-py/dup_key.result +++ b/test/xlog-py/dup_key.result @@ -17,6 +17,14 @@ box.space.test:insert{2, 'second tuple'} - [2, 'second tuple'] ... .xlog exists +box.space.test:insert{1, 'nop'} +--- +- [1, 'nop'] +... +box.space.test:delete{1} +--- +- [1, 'nop'] +... box.space.test:insert{1, 'third tuple'} --- - [1, 'third tuple'] diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py index 7609c9555..cb834747f 100644 --- a/test/xlog-py/dup_key.test.py +++ b/test/xlog-py/dup_key.test.py @@ -29,12 +29,19 @@ if os.access(wal, os.F_OK): print ".xlog exists" os.rename(wal, wal_old) +# Write wal#1-1 +server.start() +server.admin("box.space.test:insert{1, 'nop'}") +server.admin("box.space.test:delete{1}") +server.stop() + # Write wal#2 server.start() server.admin("box.space.test:insert{1, 'third tuple'}") server.admin("box.space.test:insert{2, 'fourth tuple'}") server.stop() +os.unlink(wal) # Restore wal#1 if not os.access(wal, os.F_OK): print ".xlog does not exist" -- 2.24.0