From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id C270B28546 for ; Tue, 5 Mar 2019 04:25:40 -0500 (EST) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id dnjh3Tn1Z4Dh for ; Tue, 5 Mar 2019 04:25:40 -0500 (EST) Received: from smtp45.i.mail.ru (smtp45.i.mail.ru [94.100.177.105]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 7C6BC28528 for ; Tue, 5 Mar 2019 04:25:40 -0500 (EST) Date: Tue, 5 Mar 2019 12:25:38 +0300 From: Konstantin Osipov Subject: [tarantool-patches] Re: [PATCH 3/3] Transaction support for applier Message-ID: <20190305092538.GA21955@chai> References: MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko * Georgy Kirichenko [19/03/03 23:30]: > Applier fetch incoming rows to form a transaction and then apply it. > In case of replication all local changes moved to an journal entry > tail to form a separate transaction (like autonomous transaction) > to be able to replicate changes back so applier assumes that transactions > could not be mixed in a replication stream. > > Closes: #2798 > Needed for: #980 > --- > src/box/applier.cc | 243 ++++++++++++++++++++------ > src/box/txn.c | 21 ++- > src/box/txn.h | 4 + > test/replication/transaction.result | 240 +++++++++++++++++++++++++ > test/replication/transaction.test.lua | 86 +++++++++ > 5 files changed, 534 insertions(+), 60 deletions(-) > create mode 100644 test/replication/transaction.result > create mode 100644 test/replication/transaction.test.lua > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 3222b041d..dfabbe5ab 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -48,6 +48,12 @@ > #include "session.h" > #include "cfg.h" > #include "box.h" > +#include "txn.h" > + > +enum { > + /* Initial capacity of rows array. */ > + APPLIER_TX_INITIAL_ROW_COUNT = 16, > +}; > > STRS(applier_state, applier_STATE); > > @@ -380,6 +386,176 @@ applier_join(struct applier *applier) > applier_set_state(applier, APPLIER_READY); > } > > +/** > + * Read one transaction from network using applier's input buffer. > + * Transaction rows are placed onto fiber gc region. > + * We could not use applier input buffer for that because rpos is adjusted > + * after each xrow decoding and corresponding network input space is going > + * to be reused. > + * > + * Return count of transaction rows and put row's header pointers into rows > + * array. > + */ > +static int > +applier_read_tx(struct applier *applier, struct xrow_header **rows) > +{ > + struct ev_io *coio = &applier->io; > + struct ibuf *ibuf = &applier->ibuf; > + int64_t tsn = 0; > + int row_capacity = APPLIER_TX_INITIAL_ROW_COUNT; > + struct xrow_header *first_row, *row; > + first_row = (struct xrow_header *)region_alloc(&fiber()->gc, > + row_capacity * > + sizeof(struct xrow_header)); > + if (first_row == NULL) { > + diag_set(OutOfMemory, sizeof(struct xrow_header) * row_capacity, > + "region", "struct xrow_header"); > + goto error; > + } > + row = first_row; > + > + do { > + if (row == first_row + row_capacity) { > + /* Realloc rows array. */ > + row = (struct xrow_header *)region_alloc(&fiber()->gc, > + row_capacity * > + sizeof(struct xrow_header) << 1); > + if (row == NULL) { > + diag_set(OutOfMemory, > + sizeof(struct xrow_header) * > + row_capacity << 1, > + "region", "struct xrow_header"); > + goto error; > + } > + memcpy(row, first_row, row_capacity * > + sizeof(struct xrow_header) << 1); > + first_row = row; > + row = first_row + row_capacity; > + row_capacity <<= 1; > + } This looks like inventing a wheel, let's move the resize part to region_realloc(). > + if (row == first_row) { > + /* > + * First row in a transaction. In order to enforce > + * consistency check that first row lsn and replica id > + * match with transaction. > + */ > + tsn = row->tsn; > + if (row->lsn != tsn) { > + /* There is not a first row in the transactions. */ /* Transaction id must be derived from the log sequence number of * the first row in the transaction. */ > + diag_set(ClientError, ER_PROTOCOL, > + "Not a first row in a transaction"); This message would be confusing when it pops up, please use a message from the suggested comment. > + goto error; > + } > + } > + if (tsn != row->tsn) { > + /* We are not able to handle interleaving transactions. */ > + diag_set(ClientError, ER_UNSUPPORTED, > + "replications", > + "interleaving transactions"); "replication" > + if (row->body->iov_base != NULL) { > + /* Save row bodies to gc region. */ As a courtesy to performance you could only do this for multi-row transactions. You can see it's a multi-row transaction from xrow header. > + *rows = first_row; > + return row - first_row + 1; As an alternative to region_realloc(), you could add stailq in_txn to struct xrow_header. Being able to add xrow to a linked list won't hurt in other places either. > + > +static int > +applier_apply_tx(struct xrow_header *first_row, struct xrow_header *last_row) Please add a comment. > diff --git a/src/box/txn.c b/src/box/txn.c > index 7900fb3ab..f6bf72d0c 100644 > --- a/src/box/txn.c > +++ b/src/box/txn.c > @@ -34,6 +34,7 @@ > #include "journal.h" > #include > #include "xrow.h" > +#include "replication.h" Could this entire thing with remote rows be moved to a separate patch? -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov