[Tarantool-patches] [PATCH 6/6] recovery: follow transaction boundaries while recovery or join

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Nov 23 16:46:04 MSK 2019


Thanks for the patch!

See 13 comments below.

On 19/11/2019 17:04, Georgy Kirichenko wrote:
> Do not start a transaction for each local journal or final join row
> but follow transaction boundaries instead.

1. Why don't we already have boundaries on join? I thought,
that transactional replication is done.

> 
> Part of #980
> ---
>  src/box/applier.cc                     | 92 +++++++++++++-------------
>  src/box/box.cc                         | 72 ++++++++++++++------
>  test/xlog/panic_on_broken_lsn.result   |  9 ++-
>  test/xlog/panic_on_broken_lsn.test.lua |  7 +-
>  4 files changed, 107 insertions(+), 73 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 294765195..d00b1b04a 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -478,27 +477,29 @@ applier_join(struct applier *applier)
>  	 * Receive final data.
>  	 */
>  	while (true) {
> -		if (coio_read_xrow(coio, ibuf, &row) < 0)
> -			diag_raise();
> -		applier->last_row_time = ev_monotonic_now(loop());
> -		if (iproto_type_is_dml(row.type)) {
> -			vclock_follow_xrow(&replicaset.vclock, &row);
> -			if (apply_final_join_row(&row) != 0)
> -				diag_raise();
> -			if (++row_count % 100000 == 0)
> -				say_info("%.1fM rows received", row_count / 1e6);
> -		} else if (row.type == IPROTO_OK) {
> -			/*
> -			 * Current vclock. This is not used now,
> -			 * ignore.
> -			 */
> +		struct stailq rows;
> +		applier_read_tx(applier, &rows);
> +		struct xrow_header *first_row =
> +			&(stailq_first_entry(&rows, struct applier_tx_row,
> +					    next)->row);
> +		if (first_row->type == IPROTO_OK) {
> +			if (applier->version_id < version_id(1, 7, 0)) {
> +				/*
> +				 * This is the start vclock if the
> +				 * server is 1.6. Since we have
> +				 * not initialized replication
> +				 * vclock yet, do it now. In 1.7+
> +				 * this vclock is not used.
> +				 */
> +				xrow_decode_vclock_xc(first_row, &replicaset.vclock);
> +			}
>  			break; /* end of stream */
> -		} else if (iproto_type_is_error(row.type)) {
> -			xrow_decode_error_xc(&row);  /* rethrow error */
> -		} else {
> -			tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
> -				  (uint32_t) row.type);
>  		}
> +		if (applier_apply_tx(&rows) != 0)
> +			diag_raise();
> +		if (ibuf_used(ibuf) == 0)
> +			ibuf_reset(ibuf);

2. I propose you to move ibuf reset and fiber_gc() into
applier_apply_tx(). This looks confusing, that the cycle does
not touch ibuf nor fiber->gc directly anywhere, but you reset
it each iteration.

And why do you reset it only when used == 0? It is filled
with transaction's data, right? After applier_apply_tx() the
data should not be needed anymore, regardless of what is in
the ibuf.

> +		fiber_gc();
>  	}
>  	say_info("final data received");
>  
> @@ -532,6 +523,9 @@ applier_read_tx_row(struct applier *applier)
>  	struct xrow_header *row = &tx_row->row;
>  
>  	double timeout = replication_disconnect_timeout();
> +	/* We check timeout only in case of subscribe. */
> +	if (applier->state == APPLIER_FINAL_JOIN)
> +		timeout = TIMEOUT_INFINITY;

3. Why? And how is it related to transactionality of
recovery and join?

>  	/*
>  	 * Tarantool < 1.7.7 does not send periodic heartbeat
>  	 * messages so we can't assume that if we haven't heard
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 71822551e..9464eee63 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -286,6 +286,8 @@ struct wal_stream {
>  	struct xstream base;
>  	/** How many rows have been recovered so far. */
>  	size_t rows;
> +	/** Current transaction.*/

4. Please, put a whitespace before '*/'.

> +	struct txn *txn;
>  };
>  
>  /**
> @@ -321,7 +323,8 @@ recovery_journal_write(struct journal *base,
>  }
>  
>  static inline void
> -recovery_journal_create(struct recovery_journal *journal, const struct vclock *v)
> +recovery_journal_create(struct recovery_journal *journal,
> +			const struct vclock *v)

5. Unnecessary change.

>  {
>  	journal_create(&journal->base, recovery_journal_write, NULL);
>  	vclock_copy(&journal->vclock, v);
> @@ -330,33 +333,44 @@ recovery_journal_create(struct recovery_journal *journal, const struct vclock *v
>  static int
>  apply_wal_row(struct xstream *stream, struct xrow_header *row)
>  {
> +	struct wal_stream *wal_stream =
> +		container_of(stream, struct wal_stream, base);
> +	if (wal_stream->txn == NULL) {
> +		wal_stream->txn = txn_begin();
> +		if (wal_stream->txn == NULL)
> +			return -1;
> +	}
>  	struct request request;
>  	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> +	int rc = 0;
>  	if (request.type != IPROTO_NOP) {
>  		struct space *space = space_cache_find_xc(request.space_id);
> -		if (box_process_rw(&request, space, NULL) != 0) {
> +		rc = box_process_rw(&request, space, NULL);
> +		if (rc != 0)
>  			say_error("error applying row: %s", request_str(&request));
> -			return -1;

6. So now you can apply a row, get an error on that,
then you will commit the broken transaction below.
Please, don't.

> -		}
>  	} else {
> -		struct txn *txn = txn_begin();
> -		if (txn == NULL || txn_begin_stmt(txn, NULL) != 0 ||
> -		    txn_commit_stmt(txn, &request) != 0) {
> -			txn_rollback(txn);
> +		struct txn *txn = in_txn();
> +		rc = txn_begin_stmt(txn, NULL);

7. Why do you need this empty statement?

> +		if (rc == 0)
> +			rc = txn_commit_stmt(txn, &request);
> +	}
> +	if (row->is_commit) {
> +		if (txn_commit(wal_stream->txn) != 0) {
> +			wal_stream->txn = NULL;
>  			return -1;
>  		}
> -		if (txn_commit(txn) != 0)
> -			return -1;
> +		wal_stream->txn = NULL;
>  	}
> -	struct wal_stream *xstream =
> -		container_of(stream, struct wal_stream, base);
>  	/**
>  	 * Yield once in a while, but not too often,
>  	 * mostly to allow signal handling to take place.
>  	 */
> -	if (++xstream->rows % WAL_ROWS_PER_YIELD == 0)
> +	if (++(wal_stream->rows) > WAL_ROWS_PER_YIELD &&
> +	    wal_stream->txn == NULL) {
> +		wal_stream->rows -= WAL_ROWS_PER_YIELD;

8. Why did you change this?

>  		fiber_sleep(0);
> -	return 0;
> +	}
> +	return rc;
>  }
>  
>  static void
> @@ -1917,6 +1946,9 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  
>  	struct wal_stream wal_stream;
>  	wal_stream_create(&wal_stream);
> +	auto wal_stream_guard = make_scoped_guard([&]{
> +		wal_stream_destroy(&wal_stream);

9. You ignore wal_stream_destroy() error here.

> +	});
>  
>  	struct recovery *recovery;
>  	recovery = recovery_new(cfg_gets("wal_dir"),
> @@ -1925,13 +1957,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  	if (recovery == NULL)
>  		diag_raise();
>  
> -	/*
> -	 * Make sure we report the actual recovery position
> -	 * in box.info while local recovery is in progress.
> -	 */
> -	box_vclock = &recovery->vclock;

10. So now you don't report it?

>  	auto guard = make_scoped_guard([&]{
> -		box_vclock = &replicaset.vclock;
>  		recovery_stop_local(recovery);
>  		recovery_delete(recovery);
>  	});
> @@ -1981,12 +2007,12 @@ local_recovery(const struct tt_uuid *instance_uuid,
>  	 */
>  	memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
>  
> -	vclock_copy(&replicaset.vclock, checkpoint_vclock);
>  	struct recovery_journal journal;
>  	recovery_journal_create(&journal, &recovery->vclock);
>  	journal_set(&journal.base);
>  
>  	engine_begin_final_recovery_xc();
> +

11. Unnecessary change.

> diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result
> index 1e62680eb..e209374b6 100644
> --- a/test/xlog/panic_on_broken_lsn.result
> +++ b/test/xlog/panic_on_broken_lsn.result
> @@ -141,8 +141,11 @@ test_run:cmd("setopt delimiter ';'")
>  _ = fiber.create(function()
>      test_run:wait_cond(function() return box.info.replication[2] ~= nil end)
>      lsn = box.info.vclock[1]
> -    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
> +    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 2)
> +    box.begin()
>      box.space.test:auto_increment{'v1'}
> +    box.space.test:auto_increment{'v1'}
> +    box.commit()

12. Why?

If you want to test transactional recovery/join, then please,
add a new test case. Try not to change the old ones except
when they get broken, or a change is very trivial.

If this is not a test for transactional recovery/join, then
please, add one. I think this is testable, since it is not
just refactoring.

>      box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
>  end);
>  ---
> @@ -164,9 +167,9 @@ box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
>  - ok
>  ...
>  -- Check that log contains the mention of broken LSN and the request printout
> -grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn)
> +grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn + 1)

13. Why +1?


More information about the Tarantool-patches mailing list