[tarantool-patches] Re: [PATCH 3/3] Transaction support for applier
Konstantin Osipov
kostja at tarantool.org
Tue Mar 5 12:25:38 MSK 2019
* Georgy Kirichenko <georgy at tarantool.org> [19/03/03 23:30]:
> Applier fetch incoming rows to form a transaction and then apply it.
> In case of replication all local changes moved to an journal entry
> tail to form a separate transaction (like autonomous transaction)
> to be able to replicate changes back so applier assumes that transactions
> could not be mixed in a replication stream.
>
> Closes: #2798
> Needed for: #980
> ---
> src/box/applier.cc | 243 ++++++++++++++++++++------
> src/box/txn.c | 21 ++-
> src/box/txn.h | 4 +
> test/replication/transaction.result | 240 +++++++++++++++++++++++++
> test/replication/transaction.test.lua | 86 +++++++++
> 5 files changed, 534 insertions(+), 60 deletions(-)
> create mode 100644 test/replication/transaction.result
> create mode 100644 test/replication/transaction.test.lua
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 3222b041d..dfabbe5ab 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -48,6 +48,12 @@
> #include "session.h"
> #include "cfg.h"
> #include "box.h"
> +#include "txn.h"
> +
> +enum {
> + /* Initial capacity of rows array. */
> + APPLIER_TX_INITIAL_ROW_COUNT = 16,
> +};
>
> STRS(applier_state, applier_STATE);
>
> @@ -380,6 +386,176 @@ applier_join(struct applier *applier)
> applier_set_state(applier, APPLIER_READY);
> }
>
> +/**
> + * Read one transaction from network using applier's input buffer.
> + * Transaction rows are placed onto fiber gc region.
> + * We could not use applier input buffer for that because rpos is adjusted
> + * after each xrow decoding and corresponding network input space is going
> + * to be reused.
> + *
> + * Return count of transaction rows and put row's header pointers into rows
> + * array.
> + */
> +static int
> +applier_read_tx(struct applier *applier, struct xrow_header **rows)
> +{
> + struct ev_io *coio = &applier->io;
> + struct ibuf *ibuf = &applier->ibuf;
> + int64_t tsn = 0;
> + int row_capacity = APPLIER_TX_INITIAL_ROW_COUNT;
> + struct xrow_header *first_row, *row;
> + first_row = (struct xrow_header *)region_alloc(&fiber()->gc,
> + row_capacity *
> + sizeof(struct xrow_header));
> + if (first_row == NULL) {
> + diag_set(OutOfMemory, sizeof(struct xrow_header) * row_capacity,
> + "region", "struct xrow_header");
> + goto error;
> + }
> + row = first_row;
> +
> + do {
> + if (row == first_row + row_capacity) {
> + /* Realloc rows array. */
> + row = (struct xrow_header *)region_alloc(&fiber()->gc,
> + row_capacity *
> + sizeof(struct xrow_header) << 1);
> + if (row == NULL) {
> + diag_set(OutOfMemory,
> + sizeof(struct xrow_header) *
> + row_capacity << 1,
> + "region", "struct xrow_header");
> + goto error;
> + }
> + memcpy(row, first_row, row_capacity *
> + sizeof(struct xrow_header) << 1);
> + first_row = row;
> + row = first_row + row_capacity;
> + row_capacity <<= 1;
> + }
This looks like inventing a wheel, let's move the resize part to
region_realloc().
> + if (row == first_row) {
> + /*
> + * First row in a transaction. In order to enforce
> + * consistency check that first row lsn and replica id
> + * match with transaction.
> + */
> + tsn = row->tsn;
> + if (row->lsn != tsn) {
> + /* There is not a first row in the transactions. */
/* Transaction id must be derived from the log sequence number of
* the first row in the transaction.
*/
> + diag_set(ClientError, ER_PROTOCOL,
> + "Not a first row in a transaction");
This message would be confusing when it pops up, please use a
message from the suggested comment.
> + goto error;
> + }
> + }
> + if (tsn != row->tsn) {
> + /* We are not able to handle interleaving transactions. */
> + diag_set(ClientError, ER_UNSUPPORTED,
> + "replications",
> + "interleaving transactions");
"replication"
> + if (row->body->iov_base != NULL) {
> + /* Save row bodies to gc region. */
As a courtesy to performance you could only do this for multi-row
transactions. You can see it's a multi-row transaction from xrow
header.
> + *rows = first_row;
> + return row - first_row + 1;
As an alternative to region_realloc(), you could add stailq in_txn
to struct xrow_header. Being able to add xrow to a linked list
won't hurt in other places either.
> +
> +static int
> +applier_apply_tx(struct xrow_header *first_row, struct xrow_header *last_row)
Please add a comment.
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 7900fb3ab..f6bf72d0c 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -34,6 +34,7 @@
> #include "journal.h"
> #include <fiber.h>
> #include "xrow.h"
> +#include "replication.h"
Could this entire thing with remote rows be moved to a separate
patch?
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
More information about the Tarantool-patches
mailing list