[tarantool-patches] [PATCH 3/3] Transaction support for applier

Vladimir Davydov vdavydov.dev at gmail.com
Tue Mar 5 14:59:02 MSK 2019


On Sun, Mar 03, 2019 at 11:26:18PM +0300, Georgy Kirichenko wrote:
> Applier fetch incoming rows to form a transaction and then apply it.
> In case of replication all local changes moved to an journal entry
> tail to form a separate transaction (like autonomous transaction)
> to be able to replicate changes back so applier assumes that transactions
> could not be mixed in a replication stream.
> 
> Closes: #2798
> Needed for: #980
> ---
>  src/box/applier.cc                    | 243 ++++++++++++++++++++------
>  src/box/txn.c                         |  21 ++-
>  src/box/txn.h                         |   4 +
>  test/replication/transaction.result   | 240 +++++++++++++++++++++++++
>  test/replication/transaction.test.lua |  86 +++++++++
>  5 files changed, 534 insertions(+), 60 deletions(-)
>  create mode 100644 test/replication/transaction.result
>  create mode 100644 test/replication/transaction.test.lua
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 3222b041d..dfabbe5ab 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -48,6 +48,12 @@
>  #include "session.h"
>  #include "cfg.h"
>  #include "box.h"
> +#include "txn.h"
> +
> +enum {
> +	/* Initial capacity of rows array. */
> +	APPLIER_TX_INITIAL_ROW_COUNT = 16,
> +};

I don't think it makes sense to make this a file-wide constant. After
all, it's only used in applier_read_tx so let's please move it there or,
even better, use 16 directly in region_alloc - it should be clear what
it means.

>  
>  STRS(applier_state, applier_STATE);
>  
> @@ -380,6 +386,176 @@ applier_join(struct applier *applier)
>  	applier_set_state(applier, APPLIER_READY);
>  }
>  
> +/**
> + * Read one transaction from network using applier's input buffer.
> + * Transaction rows are placed onto fiber gc region.
> + * We could not use applier input buffer for that because rpos is adjusted
> + * after each xrow decoding and corresponding network input space is going
> + * to be reused.
> + *
> + * Return count of transaction rows and put row's header pointers into rows
> + * array.
> + */
> +static int
> +applier_read_tx(struct applier *applier, struct xrow_header **rows)
> +{
> +	struct ev_io *coio = &applier->io;
> +	struct ibuf *ibuf = &applier->ibuf;
> +	int64_t tsn = 0;
> +	int row_capacity = APPLIER_TX_INITIAL_ROW_COUNT;
> +	struct xrow_header *first_row, *row;
> +	first_row = (struct xrow_header *)region_alloc(&fiber()->gc,
> +						       row_capacity *
> +						       sizeof(struct xrow_header));
> +	if (first_row == NULL) {
> +		diag_set(OutOfMemory, sizeof(struct xrow_header) * row_capacity,
> +			 "region", "struct xrow_header");
> +		goto error;
> +	}
> +	row = first_row;
> +
> +	do {
> +		if (row == first_row + row_capacity) {
> +			/* Realloc rows array. */
> +			row = (struct xrow_header *)region_alloc(&fiber()->gc,
> +								 row_capacity *
> +								 sizeof(struct xrow_header) << 1);

This like is too long. Please make sure your code fits in ~80
characters.

> +			if (row == NULL) {
> +				diag_set(OutOfMemory,
> +					 sizeof(struct xrow_header) *
> +					 row_capacity << 1,
> +					 "region", "struct xrow_header");
> +				goto error;
> +			}
> +			memcpy(row, first_row, row_capacity *
> +					       sizeof(struct xrow_header) << 1);

I agree with Kostja - it's worth introducing region_realloc for this.
Please do in a separate patch. I bet there are other places in the code
that do exactly the same thing. It would be nice if you patched those as
well.

> +			first_row = row;
> +			row = first_row + row_capacity;
> +			row_capacity <<= 1;
> +		}

Please move the code below this point to a new helper function,
applier_read_row. It will make the code easier to follow and also
reduce the indentation level.

> +
> +		double timeout = replication_disconnect_timeout();
> +		/*
> +		 * Unfortunately we do not have C-version of coio read xrow
> +		 * functions yet so use try-catch guard as workaround.
> +		 */

Why return -1 at all in this function when you can simply pass the
exception to the caller. Let's please use exceptions in this code for
now, otherwise it looks inconsistent - sometimes we return -1, sometimes
we use exceptions for error propagation. We'll remove all exceptions at
once when we convert applier.cc to C.

> +		try {
> +			/*
> +			 * Tarantool < 1.7.7 does not send periodic heartbeat
> +			 * messages so we can't assume that if we haven't heard
> +			 * from the master for quite a while the connection is
> +			 * broken - the master might just be idle.
> +			 */
> +			if (applier->version_id < version_id(1, 7, 7))
> +				coio_read_xrow(coio, ibuf, row);
> +			else
> +				coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
> +		} catch (...) {
> +			goto error;
> +		}
> +
> +		if (iproto_type_is_error(row->type)) {
> +			xrow_decode_error(row);
> +			goto error;
> +		}
> +
> +		/* Replication request. */
> +		if (row->replica_id == REPLICA_ID_NIL ||
> +		    row->replica_id >= VCLOCK_MAX) {
> +			/*
> +			 * A safety net, this can only occur
> +			 * if we're fed a strangely broken xlog.
> +			 */
> +			diag_set(ClientError, ER_UNKNOWN_REPLICA,
> +				 int2str(row->replica_id),
> +				 tt_uuid_str(&REPLICASET_UUID));
> +			goto error;
> +		}
> +		if (row == first_row) {
> +			/*
> +			 * First row in a transaction. In order to enforce
> +			 * consistency check that first row lsn and replica id
> +			 * match with transaction.
> +			 */
> +			tsn = row->tsn;
> +			if (row->lsn != tsn) {
> +				/* There is not a first row in the transactions. */

This comment is useless as it simply reiterates the error message.

> +				diag_set(ClientError, ER_PROTOCOL,
> +					 "Not a first row in a transaction");

Don't very much like the error message. It's kinda too specific and
doesn't say much what happened at the same time. May be, we'd better
rewrite it as

	"Inconsistent transaction stream"

?

> +				goto error;
> +			}
> +		}
> +		if (tsn != row->tsn) {
> +			/* We are not able to handle interleaving transactions. */

Again, a useless comment.

> +			diag_set(ClientError, ER_UNSUPPORTED,
> +				 "replications",

s/replications/replication

> +				 "interleaving transactions");
> +			goto error;
> +		}
> +
> +
> +		applier->lag = ev_now(loop()) - row->tm;
> +		applier->last_row_time = ev_monotonic_now(loop());
> +
> +		if (row->body->iov_base != NULL) {
> +			/* Save row bodies to gc region. */
> +			void *new_base = region_alloc(&fiber()->gc,
> +						      row->body->iov_len);
> +			if (new_base == NULL) {
> +				diag_set(OutOfMemory, row->body->iov_len,
> +					 "slab", "xrow_data");
> +				goto error;
> +			}
> +			memcpy(new_base, row->body->iov_base, row->body->iov_len);

I assume there can't be more than one iov in the body in thise case, but
still, assuming this implicitly in the code looks fragile. Let's please
either add an assertion or loop over all iovs.

Also, I think it's worth factoring out this function to xrow.c
(xrow_dup_body or something).

> +			/* Adjust row body pointers. */
> +			row->body->iov_base = new_base;
> +		}
> +
> +	} while (row->is_commit == 0 && ++row);

Nit: is_commit is a bool so s/row->is_commit == 0/!row->is_commit/

> +
> +	*rows = first_row;
> +	return row - first_row + 1;
> +error:
> +	return -1;
> +}
> +
> +static int
> +applier_apply_tx(struct xrow_header *first_row, struct xrow_header *last_row)
> +{
> +	int res = 0;
> +	struct txn *txn = NULL;
> +	struct xrow_header *row = first_row;
> +	if (first_row != last_row)
> +		txn = txn_begin(false);

Shouldn't we return immediately with an error if txn_begin returned
NULL?

> +	while (row <= last_row && res == 0) {
> +		res = apply_row(row);
> +		struct error *e;
> +		if (res != 0 &&
> +		    (e = diag_last_error(diag_get()))->type ==
> +			    &type_ClientError &&

Ouch, this hurts my eyes. Please rewrite it as it original was:

	if (res != 0) {
		struct error *e = ...
		if (e->type == ...) {
		}
	}

> +		    box_error_code(e) == ER_TUPLE_FOUND &&
> +		    replication_skip_conflict) {
> +			/*
> +			 * In case of ER_TUPLE_FOUND error and enabled
> +			 * replication_skip_conflict configuration
> +			 * option, skip applying the foreign row and
> +			 * replace it with NOP in the local write ahead
> +			 * log.
> +			 */

I wonder if we should skip the whole conflicting transaction rather than
just one row in this case...

> +			diag_clear(diag_get());
> +			(row)->type = IPROTO_NOP;
> +			(row)->bodycnt = 0;

Nit: pointless parentheses around 'row'.

What about row->tsn? Shouldn't we set it, too.

> +			res = apply_row(row);
> +		}
> +		++row;
> +	}
> +	if (res == 0 && txn != NULL)
> +		res = txn_commit(txn);
> +	if (res != 0)
> +		txn_rollback();
> +	return res;
> +}
> +
>  /**
>   * Execute and process SUBSCRIBE request (follow updates from a master).
>   */
> @@ -509,36 +685,14 @@ applier_subscribe(struct applier *applier)
>  			applier_set_state(applier, APPLIER_FOLLOW);
>  		}
>  
> -		/*
> -		 * Tarantool < 1.7.7 does not send periodic heartbeat
> -		 * messages so we can't assume that if we haven't heard
> -		 * from the master for quite a while the connection is
> -		 * broken - the master might just be idle.
> -		 */
> -		if (applier->version_id < version_id(1, 7, 7)) {
> -			coio_read_xrow(coio, ibuf, &row);
> -		} else {
> -			double timeout = replication_disconnect_timeout();
> -			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
> -		}
> +		struct xrow_header *tx_rows;
> +		int row_count = applier_read_tx(applier, &tx_rows);
> +		if (row_count < 0)
> +			diag_raise();
>  
> -		if (iproto_type_is_error(row.type))
> -			xrow_decode_error_xc(&row);  /* error */
> -		/* Replication request. */
> -		if (row.replica_id == REPLICA_ID_NIL ||
> -		    row.replica_id >= VCLOCK_MAX) {
> -			/*
> -			 * A safety net, this can only occur
> -			 * if we're fed a strangely broken xlog.
> -			 */
> -			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
> -				  int2str(row.replica_id),
> -				  tt_uuid_str(&REPLICASET_UUID));
> -		}
> -
> -		applier->lag = ev_now(loop()) - row.tm;
> +		applier->lag = ev_now(loop()) - (tx_rows + row_count - 1)->tm;
>  		applier->last_row_time = ev_monotonic_now(loop());

Hmm, why? Isn't it enough to update those in applier_read_tx?

> -		struct replica *replica = replica_by_id(row.replica_id);
> +		struct replica *replica = replica_by_id(tx_rows->replica_id);
>  		struct latch *latch = (replica ? &replica->order_latch :
>  				       &replicaset.applier.order_latch);
>  		/*
> @@ -548,33 +702,12 @@ applier_subscribe(struct applier *applier)
>  		 * that belong to the same server id.
>  		 */
>  		latch_lock(latch);
> -		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> -			int res = apply_row(&row);
> -			if (res != 0) {
> -				struct error *e = diag_last_error(diag_get());
> -				/*
> -				 * In case of ER_TUPLE_FOUND error and enabled
> -				 * replication_skip_conflict configuration
> -				 * option, skip applying the foreign row and
> -				 * replace it with NOP in the local write ahead
> -				 * log.
> -				 */
> -				if (e->type == &type_ClientError &&
> -				    box_error_code(e) == ER_TUPLE_FOUND &&
> -				    replication_skip_conflict) {
> -					diag_clear(diag_get());
> -					struct xrow_header nop;
> -					nop.type = IPROTO_NOP;
> -					nop.bodycnt = 0;
> -					nop.replica_id = row.replica_id;
> -					nop.lsn = row.lsn;
> -					res = apply_row(&nop);
> -				}
> -			}
> -			if (res != 0) {
> -				latch_unlock(latch);
> -				diag_raise();
> -			}
> +		if (vclock_get(&replicaset.vclock,
> +			       tx_rows->replica_id) < tx_rows->lsn &&
> +		    applier_apply_tx(tx_rows, tx_rows + row_count - 1) != 0) {
> +			latch_unlock(latch);
> +			fiber_gc();
> +			diag_raise();

Why do you call fiber_gc() on this particular error, but don't in other
cases? Let's please instead add a guard to the loop body that would call
fiber_gc both on exception and when an iteration is complete.

>  		}
>  		latch_unlock(latch);
>  
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 7900fb3ab..f6bf72d0c 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -34,6 +34,7 @@
>  #include "journal.h"
>  #include <fiber.h>
>  #include "xrow.h"
> +#include "replication.h"
>  
>  double too_long_threshold;
>  
> @@ -141,6 +142,7 @@ txn_begin(bool is_autocommit)
>  	/* Initialize members explicitly to save time on memset() */
>  	stailq_create(&txn->stmts);
>  	txn->n_rows = 0;
> +	txn->n_remote_rows = 0;
>  	txn->is_autocommit = is_autocommit;
>  	txn->has_triggers  = false;
>  	txn->is_aborted = false;
> @@ -233,6 +235,9 @@ txn_commit_stmt(struct txn *txn, struct request *request)
>  	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
>  		if (txn_add_redo(stmt, request) != 0)
>  			goto fail;
> +		if (stmt->row->replica_id != 0 &&
> +		    stmt->row->replica_id != instance_id)
> +			++txn->n_remote_rows;
>  		++txn->n_rows;
>  	}
>  	/*
> @@ -271,14 +276,20 @@ txn_write_to_wal(struct txn *txn)
>  		return -1;
>  
>  	struct txn_stmt *stmt;
> -	struct xrow_header **row = req->rows;
> +	struct xrow_header **remote_row = req->rows;
> +	struct xrow_header **local_row = req->rows + txn->n_remote_rows;
>  	stailq_foreach_entry(stmt, &txn->stmts, next) {
>  		if (stmt->row == NULL)
>  			continue; /* A read (e.g. select) request */
> -		*row++ = stmt->row;
> +		if (stmt->row->replica_id != 0 &&
> +		    stmt->row->replica_id != instance_id)
> +			*remote_row++ = stmt->row;
> +		else
> +			*local_row++ = stmt->row;

Please write a comment explaining what are you doing here and why it's
necessary.

>  		req->approx_len += xrow_approx_len(stmt->row);
>  	}
> -	assert(row == req->rows + req->n_rows);
> +	assert(remote_row == req->rows + txn->n_remote_rows);
> +	assert(local_row == req->rows + req->n_rows);
>  
>  	ev_tstamp start = ev_monotonic_now(loop());
>  	int64_t res = journal_write(req);
> @@ -399,8 +410,6 @@ txn_rollback()
>  		txn_stmt_unref_tuples(stmt);
>  
>  	TRASH(txn);
> -	/** Free volatile txn memory. */
> -	fiber_gc();
>  	fiber_set_txn(fiber(), NULL);
>  }
>  
> @@ -480,6 +489,8 @@ box_txn_rollback()
>  		return -1;
>  	}
>  	txn_rollback(); /* doesn't throw */
> +	/** Free volatile txn memory. */
> +	fiber_gc();

I assume this is a follow-up for 77fa1736dbb9 ("box: factor fiber_gc
out of txn_commit"). Please do this in a separate patch with a proper
justification.

>  	return 0;
>  }
>  
> diff --git a/src/box/txn.h b/src/box/txn.h
> index de5cb0de4..2791fdf73 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -142,6 +142,10 @@ struct txn {
>  	struct stailq stmts;
>  	/** Total number of WAL rows in this txn. */
>  	int n_rows;
> +	/**
> +	 * Count of rows generated on a remote replica.
> +	 */
> +	int n_remote_rows;
>  	/**
>  	 * True if this transaction is running in autocommit mode
>  	 * (statement end causes an automatic transaction commit).
> diff --git a/test/replication/transaction.result b/test/replication/transaction.result
> new file mode 100644
> index 000000000..009f84430
> --- /dev/null
> +++ b/test/replication/transaction.result
> @@ -0,0 +1,240 @@
> +env = require('test_run')
> +---
> +...
> +test_run = env.new()
> +---
> +...
> +box.schema.user.grant('guest', 'replication')
> +---
> +...
> +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
> +---
> +...
> +_ = s:create_index('pk')
> +---
> +...
> +-- transaction w/o conflict
> +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
> +---
> +...
> +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
> +---
> +- true
> +...
> +test_run:cmd("start server replica")
> +---
> +- true
> +...
> +test_run:cmd("switch replica")
> +---
> +- true
> +...
> +-- insert a conflicting row
> +box.space.test:replace({4, 'r'})
> +---
> +- [4, 'r']
> +...
> +v1 = box.info.vclock
> +---
> +...
> +test_run:cmd("switch default")
> +---
> +- true
> +...
> +-- create a two-row transaction with conflicting second
> +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
> +---
> +...
> +-- create a third transaction
> +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
> +---
> +...
> +test_run:cmd("switch replica")
> +---
> +- true
> +...
> +-- nothing was applied
> +v1[1] == box.info.vclock[1]
> +---
> +- true
> +...
> +box.space.test:select()
> +---
> +- - [1, 'm']
> +  - [2, 'm']
> +  - [4, 'r']
> +...
> +-- check replication status
> +box.info.replication[1].upstream.status
> +---
> +- stopped
> +...
> +box.info.replication[1].upstream.message
> +---
> +- Duplicate key exists in unique index 'pk' in space 'test'
> +...
> +-- set conflict to third transaction
> +box.space.test:delete({3})
> +---
> +...
> +box.space.test:replace({6, 'r'})
> +---
> +- [6, 'r']
> +...
> +-- restart replication
> +replication = box.cfg.replication
> +---
> +...
> +box.cfg{replication = {}}
> +---
> +...
> +box.cfg{replication = replication}
> +---
> +...
> +-- replication stopped of third transaction
> +v1[1] + 2 == box.info.vclock[1]
> +---
> +- false

false?

> diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
> new file mode 100644
> index 000000000..47003c644
> --- /dev/null
> +++ b/test/replication/transaction.test.lua
> @@ -0,0 +1,86 @@
> +env = require('test_run')
> +test_run = env.new()
> +box.schema.user.grant('guest', 'replication')
> +
> +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
> +_ = s:create_index('pk')
> +
> +-- transaction w/o conflict
> +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
> +
> +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
> +test_run:cmd("start server replica")
> +test_run:cmd("switch replica")
> +
> +-- insert a conflicting row
> +box.space.test:replace({4, 'r'})
> +v1 = box.info.vclock
> +
> +test_run:cmd("switch default")
> +-- create a two-row transaction with conflicting second
> +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
> +-- create a third transaction
> +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
> +
> +test_run:cmd("switch replica")
> +-- nothing was applied
> +v1[1] == box.info.vclock[1]
> +box.space.test:select()
> +-- check replication status
> +box.info.replication[1].upstream.status
> +box.info.replication[1].upstream.message
> +-- set conflict to third transaction
> +box.space.test:delete({3})
> +box.space.test:replace({6, 'r'})
> +-- restart replication
> +replication = box.cfg.replication
> +box.cfg{replication = {}}
> +box.cfg{replication = replication}
> +-- replication stopped of third transaction
> +v1[1] + 2 == box.info.vclock[1]
> +box.space.test:select()
> +-- check replication status
> +box.info.replication[1].upstream.status
> +box.info.replication[1].upstream.message
> +
> +-- check restart does not help
> +test_run:cmd("switch default")
> +test_run:cmd("restart server replica")
> +test_run:cmd("switch replica")
> +
> +box.space.test:select()
> +-- set skip conflict rows and check that non-conflicting were applied
> +replication = box.cfg.replication
> +box.cfg{replication = {}, replication_skip_conflict = true}
> +box.cfg{replication = replication}
> +
> +-- check last transaction applied without conflicting row
> +box.space.test:select()
> +box.info.replication[1].upstream.status
> +
> +-- make some new conflicting rows with skip-conflicts
> +box.space.test:replace({8, 'r'})
> +box.space.test:replace({9, 'r'})
> +
> +-- issue a conflicting tx
> +test_run:cmd("switch default")
> +box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
> +
> +test_run:cmd("switch replica")
> +-- vclock should be increased but rows skipped
> +box.space.test:select()
> +
> +-- check restart does not change something
> +test_run:cmd("switch default")
> +test_run:cmd("restart server replica")
> +test_run:cmd("switch replica")
> +
> +box.space.test:select()
> +box.info.replication[1].upstream.status
> +
> +test_run:cmd("switch default")
> +test_run:cmd("stop server replica")
> +test_run:cmd("cleanup server replica")

You need to delete the replica here and call cleanup_cluster on the
default instance. Please take a look at how other replication tests do
cleanup.

> +
> +box.schema.user.revoke('guest', 'replication')
> +s:drop()



More information about the Tarantool-patches mailing list