[Tarantool-patches] [PATCH 5/6] box: improve recovery journal

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Nov 23 16:46:01 MSK 2019


Hi!

I still don't understand replication and recovery
code at all, so most of my comments are actually
questions.

See 5 of them below.

On 19/11/2019 17:04, Georgy Kirichenko wrote:
> Refactoring: track recovery journal vclock instead of to use
> the recovery ones. Now replicaset vclock will rely on recovery stream
> content instead of wal directory content (xlog names and meta). This
> enables applier to use this journal and  generalize wal recovery and
> applier final join handling.
> 
> Part of #980
> ---
>  src/box/box.cc               | 39 +++++++++++++++++++++++-------------
>  test/xlog-py/big_lsn.result  |  4 ++++
>  test/xlog-py/big_lsn.test.py | 13 ++++++------
>  test/xlog-py/dup_key.result  |  8 ++++++++
>  test/xlog-py/dup_key.test.py |  7 +++++++
>  5 files changed, 51 insertions(+), 20 deletions(-)
> 
> diff --git a/src/box/box.cc b/src/box/box.cc
> index f41ef9ce8..71822551e 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -309,16 +309,22 @@ recovery_journal_write(struct journal *base,
>  		       struct journal_entry *entry)
>  {
>  	struct recovery_journal *journal = (struct recovery_journal *) base;
> -	entry->res = vclock_sum(journal->vclock);
> +	for (struct xrow_header **row = entry->rows;
> +	     row < entry->rows + entry->n_rows; ++row) {
> +		vclock_follow_xrow(&journal->vclock, *row);
> +	}
> +	entry->res = vclock_sum(&journal->vclock);
> +	/* Assume the entry was committed and adjust replicaset vclock. */

1. What if it was not committed?

> +	vclock_copy(&replicaset.vclock, &journal->vclock);
>  	journal_entry_complete(entry);
>  	return 0;
>  }
> @@ -332,6 +338,15 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
>  			say_error("error applying row: %s", request_str(&request));
>  			return -1;
>  		}
> +	} else {
> +		struct txn *txn = txn_begin();
> +		if (txn == NULL || txn_begin_stmt(txn, NULL) != 0 ||
> +		    txn_commit_stmt(txn, &request) != 0) {
> +			txn_rollback(txn);

2. If txn == NULL, txn_rollback() will crash.

> +			return -1;
> +		}
> +		if (txn_commit(txn) != 0)
> +			return -1;
>  	}

3. What is it? It does not look like refactoring. Why did you add
this whole code block?

>  	struct wal_stream *xstream =
>  		container_of(stream, struct wal_stream, base);
> @@ -1970,6 +1981,11 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  	 */
>  	memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
>  
> +	vclock_copy(&replicaset.vclock, checkpoint_vclock);
> +	struct recovery_journal journal;
> +	recovery_journal_create(&journal, &recovery->vclock);
> +	journal_set(&journal.base);

4. Why did you move it? Why is vclock_copy() called 2 times?

> +
>  	engine_begin_final_recovery_xc();
>  	if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0)
>  		diag_raise();
> diff --git a/test/xlog-py/big_lsn.test.py b/test/xlog-py/big_lsn.test.py
> index c6a31d971..bdc84e012 100644
> --- a/test/xlog-py/big_lsn.test.py
> +++ b/test/xlog-py/big_lsn.test.py
> @@ -9,21 +9,22 @@ server.stop()
>  server.deploy()
>  server.admin("box.info.lsn")
>  server.admin("box.space._schema:delete('dummy')")
> +server.admin("box.snapshot()")
>  server.stop()
>  
> -# Bump the instance vclock by tweaking the last xlog.
> +# Bump the instance vclock by tweaking the checkpoint.
>  old_lsn = 1
>  new_lsn = 123456789123
> -wal_dir = os.path.join(server.vardir, server.name)
> -old_wal = os.path.join(wal_dir, "%020d.xlog" % old_lsn)
> -new_wal = os.path.join(wal_dir, "%020d.xlog" % new_lsn)
> -with open(old_wal, "r+") as f:
> +snap_dir = os.path.join(server.vardir, server.name)
> +old_snap = os.path.join(snap_dir, "%020d.snap" % old_lsn)
> +new_snap = os.path.join(snap_dir, "%020d.snap" % new_lsn)
> +with open(old_snap, "r+") as f:
>      s = f.read()
>      s = s.replace("VClock: {1: %d}" % old_lsn,
>                    "VClock: {1: %d}" % new_lsn)
>      f.seek(0)
>      f.write(s)
> -os.rename(old_wal, new_wal)
> +os.rename(old_snap, new_snap)

5. Why did you change the tests, if this commit is just
refactoring?

>  
>  # Recover and make a snapshot.
>  server.start()


More information about the Tarantool-patches mailing list