[tarantool-patches] [PATCH v3 2/2] Transaction support for applier

Vladimir Davydov vdavydov.dev at gmail.com
Mon Feb 18 12:36:12 MSK 2019


On Tue, Feb 12, 2019 at 11:04:32PM +0300, Georgy Kirichenko wrote:
> Applier fetch incoming rows to form a transaction and then apply it.
> Implementation assumes that transaction could not mix in a
> replication stream. Also distributed transaction are not supported yet.
> 
> Closes: #2798
> Needed for: #980
> ---
>  src/box/applier.cc                    | 185 +++++++++++++++-----
>  test/replication/transaction.result   | 240 ++++++++++++++++++++++++++
>  test/replication/transaction.test.lua |  86 +++++++++
>  3 files changed, 471 insertions(+), 40 deletions(-)
>  create mode 100644 test/replication/transaction.result
>  create mode 100644 test/replication/transaction.test.lua
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 7f37fe2ee..59c33bb84 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -48,6 +48,7 @@
>  #include "error.h"
>  #include "session.h"
>  #include "cfg.h"
> +#include "txn.h"
>  
>  STRS(applier_state, applier_STATE);
>  
> @@ -378,6 +379,105 @@ applier_join(struct applier *applier)
>  	applier_set_state(applier, APPLIER_READY);
>  }
>  
> +/**
> + * Read one transaction from network.
> + * Transaction rows are placed into row_buf as an array, row's bodies are
> + * placed into obuf because it is not allowed to relocate row's bodies.
> + * Also we could not use applier input buffer because rpos adjusted after xrow
> + * decoding and corresponding space going to reuse.
> + *
> + * Note: current implementation grants that transaction could not be mixed, so
> + * we read each transaction from first xrow until xrow with txn_last = true.
> + */
> +static int64_t
> +applier_read_tx(struct applier *applier, struct ibuf *row_buf,
> +		struct obuf *data_buf)
> +{
> +	struct xrow_header *row;
> +	struct ev_io *coio = &applier->io;
> +	struct ibuf *ibuf = &applier->ibuf;
> +	int64_t txn_id = 0;
> +
> +	do {
> +		row = (struct xrow_header *)ibuf_alloc(row_buf,
> +						       sizeof(struct xrow_header));

Nit: the line's too for no reason, and there are more lines like that.
Please fix them where you can without making the code look ugly.

> +		if (row == NULL) {
> +			diag_set(OutOfMemory, sizeof(struct xrow_header),
> +				 "slab", "struct xrow_header");
> +			goto error;
> +		}
> +
> +		double timeout = replication_disconnect_timeout();
> +		try {
> +			/* TODO: we should have a C version of this function. */
> +			coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);

Nit: IMO better use guards to free resources, if any.

> +		} catch (...) {
> +			goto error;
> +		}
> +
> +		if (iproto_type_is_error(row->type)) {
> +			xrow_decode_error(row);
> +			goto error;
> +		}
> +
> +		/* Replication request. */
> +		if (row->replica_id == REPLICA_ID_NIL ||
> +		    row->replica_id >= VCLOCK_MAX) {
> +			/*
> +			 * A safety net, this can only occur
> +			 * if we're fed a strangely broken xlog.
> +			 */
> +			diag_set(ClientError, ER_UNKNOWN_REPLICA,
> +				 int2str(row->replica_id),
> +				 tt_uuid_str(&REPLICASET_UUID));
> +			goto error;
> +		}
> +		if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
> +			/*
> +			 * First row in a transaction. In order to enforce
> +			 * consistency check that first row lsn and replica id
> +			 * match with transaction.
> +			 */
> +			txn_id = row->txn_id;
> +			if (row->lsn != txn_id) {
> +				/* There is not a first row in the transactions. */
> +				diag_set(ClientError, ER_PROTOCOL,
> +					 "Not a first row in a transaction");
> +				goto error;
> +			}
> +		}
> +		if (txn_id != row->txn_id) {
> +			/* We are not able to handle interleaving transactions. */
> +			diag_set(ClientError, ER_UNSUPPORTED,
> +				 "replications",
> +				 "interleaving transactions");
> +			goto error;
> +		}
> +
> +
> +		applier->lag = ev_now(loop()) - row->tm;
> +		applier->last_row_time = ev_monotonic_now(loop());
> +
> +		if (row->body->iov_base != NULL) {
> +			void *new_base = obuf_alloc(data_buf, row->body->iov_len);
> +			if (new_base == NULL) {
> +				diag_set(OutOfMemory, row->body->iov_len,
> +					 "slab", "xrow_data");
> +				goto error;
> +			}
> +			memcpy(new_base, row->body->iov_base, row->body->iov_len);
> +			row->body->iov_base = new_base;

So we first read a row to an ibuf, then copy it to an obuf. I understand
that you do this, because xrow_header::body has pointers in it. Still it
looks rather awkward. May be, we'd better temporarily fix up those
pointers to store relative offsets instead?

> +		}
> +
> +	} while (row->txn_commit == 0);
> +
> +	return 0;
> +error:
> +	ibuf_reset(row_buf);
> +	obuf_reset(data_buf);

obuf_reset, ibuf_reset don't free up memory. You must use obuf_destroy,
ibuf_destroy.

> +	return -1;
> +}
> +
>  /**
>   * Execute and process SUBSCRIBE request (follow updates from a master).
>   */
> @@ -392,6 +492,10 @@ applier_subscribe(struct applier *applier)
>  	struct xrow_header row;
>  	struct vclock remote_vclock_at_subscribe;
>  	struct tt_uuid cluster_id = uuid_nil;
> +	struct ibuf row_buf;
> +	struct obuf data_buf;

IMO the buffers better be a part of the applier struct. Then you
wouldn't need to use a label to free them up in applier_read_tx and
hence could simply let the exception thrown by coio_read_xrow_timeout_xc
travel up the stack without wrapping it in that awkward try-catch block.

> +	ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header));
> +	obuf_create(&data_buf, &cord()->slabc, 0x10000);

This constant look like magic to me.

>  
>  	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
>  				 &replicaset.vclock);
> @@ -501,36 +605,16 @@ applier_subscribe(struct applier *applier)
>  			applier_set_state(applier, APPLIER_FOLLOW);
>  		}
>  
> -		/*
> -		 * Tarantool < 1.7.7 does not send periodic heartbeat
> -		 * messages so we can't assume that if we haven't heard
> -		 * from the master for quite a while the connection is
> -		 * broken - the master might just be idle.
> -		 */
> -		if (applier->version_id < version_id(1, 7, 7)) {
> -			coio_read_xrow(coio, ibuf, &row);

You silently dropped this branch. Please leave it be.

> -		} else {
> -			double timeout = replication_disconnect_timeout();
> -			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
> -		}
> +		if (applier_read_tx(applier, &row_buf, &data_buf) != 0)
> +			diag_raise();
>  
> -		if (iproto_type_is_error(row.type))
> -			xrow_decode_error_xc(&row);  /* error */
> -		/* Replication request. */
> -		if (row.replica_id == REPLICA_ID_NIL ||
> -		    row.replica_id >= VCLOCK_MAX) {
> -			/*
> -			 * A safety net, this can only occur
> -			 * if we're fed a strangely broken xlog.
> -			 */
> -			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
> -				  int2str(row.replica_id),
> -				  tt_uuid_str(&REPLICASET_UUID));
> -		}
> +		struct txn *txn = NULL;
> +		struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos;
> +		struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1;
>  
> -		applier->lag = ev_now(loop()) - row.tm;
> +		applier->lag = ev_now(loop()) - last_row->tm;
>  		applier->last_row_time = ev_monotonic_now(loop());
> -		struct replica *replica = replica_by_id(row.replica_id);
> +		struct replica *replica = replica_by_id(first_row->replica_id);
>  		struct latch *latch = (replica ? &replica->order_latch :
>  				       &replicaset.applier.order_latch);
>  		/*
> @@ -540,24 +624,45 @@ applier_subscribe(struct applier *applier)
>  		 * that belong to the same server id.
>  		 */
>  		latch_lock(latch);
> -		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> -			int res = xstream_write(applier->subscribe_stream, &row);
> -			if (res != 0) {
> -				struct error *e = diag_last_error(diag_get());
> -				/**
> -				 * Silently skip ER_TUPLE_FOUND error if such
> -				 * option is set in config.
> -				 */
> -				if (e->type == &type_ClientError &&
> +		if (vclock_get(&replicaset.vclock,
> +			       first_row->replica_id) < first_row->lsn) {
> +			struct xrow_header *row = first_row;
> +			if (first_row != last_row)
> +				txn = txn_begin(false);

We have a nice level of abstraction implemented by xstream, but now you
bluntly break it by calling txn_begin/commit directly. Please come up
with a better solution, e.g. you could extend the xstream interface with
write_tx method.

> +			int res = 0;
> +			while (row <= last_row && res == 0) {
> +				res = xstream_write(applier->subscribe_stream, row);
> +				struct error *e;
> +				if (res != 0 &&
> +				    (e = diag_last_error(diag_get()))->type ==
> +				    &type_ClientError &&
>  				    box_error_code(e) == ER_TUPLE_FOUND &&
> -				    replication_skip_conflict)
> +				    replication_skip_conflict) {
> +					/**
> +					 * Silently skip ER_TUPLE_FOUND error
> +					 * if such option is set in config.
> +					 */
>  					diag_clear(diag_get());
> -				else {
> -					latch_unlock(latch);
> -					diag_raise();
> +					row->type = IPROTO_NOP;
> +					row->bodycnt = 0;
> +					res = xstream_write(applier->subscribe_stream,
> +							    row);
>  				}
> +				++row;
> +			}
> +			if (res == 0 && txn != NULL)
> +				res = txn_commit(txn);
> +
> +			if (res != 0) {
> +				txn_rollback();
> +				obuf_reset(&data_buf);
> +				ibuf_reset(&row_buf);
> +				latch_unlock(latch);
> +				diag_raise();
>  			}
>  		}
> +		obuf_reset(&data_buf);
> +		ibuf_reset(&row_buf);
>  		latch_unlock(latch);
>  
>  		if (applier->state == APPLIER_SYNC ||
> diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
> new file mode 100644
> index 000000000..47003c644
> --- /dev/null
> +++ b/test/replication/transaction.test.lua
> @@ -0,0 +1,86 @@
> +env = require('test_run')
> +test_run = env.new()
> +box.schema.user.grant('guest', 'replication')
> +
> +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
> +_ = s:create_index('pk')
> +
> +-- transaction w/o conflict
> +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()

Whenever you add a test, please write a few words about what it does and
the resolution of which ticket it is supposed to test.



More information about the Tarantool-patches mailing list