[tarantool-patches] Re: [PATCH 3/3] Transaction support for applier

Konstantin Osipov kostja at tarantool.org
Tue Mar 5 12:25:38 MSK 2019


* Georgy Kirichenko <georgy at tarantool.org> [19/03/03 23:30]:
> Applier fetch incoming rows to form a transaction and then apply it.
> In case of replication all local changes moved to an journal entry
> tail to form a separate transaction (like autonomous transaction)
> to be able to replicate changes back so applier assumes that transactions
> could not be mixed in a replication stream.
> 
> Closes: #2798
> Needed for: #980
> ---
>  src/box/applier.cc                    | 243 ++++++++++++++++++++------
>  src/box/txn.c                         |  21 ++-
>  src/box/txn.h                         |   4 +
>  test/replication/transaction.result   | 240 +++++++++++++++++++++++++
>  test/replication/transaction.test.lua |  86 +++++++++
>  5 files changed, 534 insertions(+), 60 deletions(-)
>  create mode 100644 test/replication/transaction.result
>  create mode 100644 test/replication/transaction.test.lua
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 3222b041d..dfabbe5ab 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -48,6 +48,12 @@
>  #include "session.h"
>  #include "cfg.h"
>  #include "box.h"
> +#include "txn.h"
> +
> +enum {
> +	/* Initial capacity of rows array. */
> +	APPLIER_TX_INITIAL_ROW_COUNT = 16,
> +};
>  
>  STRS(applier_state, applier_STATE);
>  
> @@ -380,6 +386,176 @@ applier_join(struct applier *applier)
>  	applier_set_state(applier, APPLIER_READY);
>  }
>  
> +/**
> + * Read one transaction from network using applier's input buffer.
> + * Transaction rows are placed onto fiber gc region.
> + * We could not use applier input buffer for that because rpos is adjusted
> + * after each xrow decoding and corresponding network input space is going
> + * to be reused.
> + *
> + * Return count of transaction rows and put row's header pointers into rows
> + * array.
> + */
> +static int
> +applier_read_tx(struct applier *applier, struct xrow_header **rows)
> +{
> +	struct ev_io *coio = &applier->io;
> +	struct ibuf *ibuf = &applier->ibuf;
> +	int64_t tsn = 0;
> +	int row_capacity = APPLIER_TX_INITIAL_ROW_COUNT;
> +	struct xrow_header *first_row, *row;
> +	first_row = (struct xrow_header *)region_alloc(&fiber()->gc,
> +						       row_capacity *
> +						       sizeof(struct xrow_header));
> +	if (first_row == NULL) {
> +		diag_set(OutOfMemory, sizeof(struct xrow_header) * row_capacity,
> +			 "region", "struct xrow_header");
> +		goto error;
> +	}
> +	row = first_row;
> +
> +	do {
> +		if (row == first_row + row_capacity) {
> +			/* Realloc rows array. */
> +			row = (struct xrow_header *)region_alloc(&fiber()->gc,
> +								 row_capacity *
> +								 sizeof(struct xrow_header) << 1);
> +			if (row == NULL) {
> +				diag_set(OutOfMemory,
> +					 sizeof(struct xrow_header) *
> +					 row_capacity << 1,
> +					 "region", "struct xrow_header");
> +				goto error;
> +			}
> +			memcpy(row, first_row, row_capacity *
> +					       sizeof(struct xrow_header) << 1);
> +			first_row = row;
> +			row = first_row + row_capacity;
> +			row_capacity <<= 1;
> +		}

This looks like inventing a wheel, let's move the resize part to
region_realloc().

> +		if (row == first_row) {
> +			/*
> +			 * First row in a transaction. In order to enforce
> +			 * consistency check that first row lsn and replica id
> +			 * match with transaction.
> +			 */
> +			tsn = row->tsn;
> +			if (row->lsn != tsn) {
> +				/* There is not a first row in the transactions. */

/* Transaction id must be derived from the log sequence number of
 * the first row in the transaction.
 */

> +				diag_set(ClientError, ER_PROTOCOL,
> +					 "Not a first row in a transaction");

This message would be confusing when it pops up, please use a
message from the suggested comment.

> +				goto error;
> +			}
> +		}
> +		if (tsn != row->tsn) {
> +			/* We are not able to handle interleaving transactions. */
> +			diag_set(ClientError, ER_UNSUPPORTED,
> +				 "replications",
> +				 "interleaving transactions");

"replication"

> +		if (row->body->iov_base != NULL) {
> +			/* Save row bodies to gc region. */

As a courtesy to performance you could only do this for multi-row
transactions. You can see it's a multi-row transaction from xrow
header.

> +	*rows = first_row;
> +	return row - first_row + 1;

As an alternative to region_realloc(), you could add stailq in_txn
to struct xrow_header.  Being able to add xrow to a linked list
won't hurt in other places either.

> +
> +static int
> +applier_apply_tx(struct xrow_header *first_row, struct xrow_header *last_row)

Please add a comment.

> diff --git a/src/box/txn.c b/src/box/txn.c
> index 7900fb3ab..f6bf72d0c 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -34,6 +34,7 @@
>  #include "journal.h"
>  #include <fiber.h>
>  #include "xrow.h"
> +#include "replication.h"

Could this entire thing with remote rows be moved to a separate
patch?


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov




More information about the Tarantool-patches mailing list