From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Mon, 18 Feb 2019 12:36:12 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] [PATCH v3 2/2] Transaction support for applier Message-ID: <20190218093612.yl4vzppfsoqqxq2q@esperanza> References: <1951a4dc40cb56f1c7a1526d7d9d40229863cce1.1550001848.git.georgy@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <1951a4dc40cb56f1c7a1526d7d9d40229863cce1.1550001848.git.georgy@tarantool.org> To: Georgy Kirichenko Cc: tarantool-patches@freelists.org List-ID: On Tue, Feb 12, 2019 at 11:04:32PM +0300, Georgy Kirichenko wrote: > Applier fetch incoming rows to form a transaction and then apply it. > Implementation assumes that transaction could not mix in a > replication stream. Also distributed transaction are not supported yet. > > Closes: #2798 > Needed for: #980 > --- > src/box/applier.cc | 185 +++++++++++++++----- > test/replication/transaction.result | 240 ++++++++++++++++++++++++++ > test/replication/transaction.test.lua | 86 +++++++++ > 3 files changed, 471 insertions(+), 40 deletions(-) > create mode 100644 test/replication/transaction.result > create mode 100644 test/replication/transaction.test.lua > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 7f37fe2ee..59c33bb84 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -48,6 +48,7 @@ > #include "error.h" > #include "session.h" > #include "cfg.h" > +#include "txn.h" > > STRS(applier_state, applier_STATE); > > @@ -378,6 +379,105 @@ applier_join(struct applier *applier) > applier_set_state(applier, APPLIER_READY); > } > > +/** > + * Read one transaction from network. > + * Transaction rows are placed into row_buf as an array, row's bodies are > + * placed into obuf because it is not allowed to relocate row's bodies. > + * Also we could not use applier input buffer because rpos adjusted after xrow > + * decoding and corresponding space going to reuse. > + * > + * Note: current implementation grants that transaction could not be mixed, so > + * we read each transaction from first xrow until xrow with txn_last = true. > + */ > +static int64_t > +applier_read_tx(struct applier *applier, struct ibuf *row_buf, > + struct obuf *data_buf) > +{ > + struct xrow_header *row; > + struct ev_io *coio = &applier->io; > + struct ibuf *ibuf = &applier->ibuf; > + int64_t txn_id = 0; > + > + do { > + row = (struct xrow_header *)ibuf_alloc(row_buf, > + sizeof(struct xrow_header)); Nit: the line's too for no reason, and there are more lines like that. Please fix them where you can without making the code look ugly. > + if (row == NULL) { > + diag_set(OutOfMemory, sizeof(struct xrow_header), > + "slab", "struct xrow_header"); > + goto error; > + } > + > + double timeout = replication_disconnect_timeout(); > + try { > + /* TODO: we should have a C version of this function. */ > + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); Nit: IMO better use guards to free resources, if any. > + } catch (...) { > + goto error; > + } > + > + if (iproto_type_is_error(row->type)) { > + xrow_decode_error(row); > + goto error; > + } > + > + /* Replication request. */ > + if (row->replica_id == REPLICA_ID_NIL || > + row->replica_id >= VCLOCK_MAX) { > + /* > + * A safety net, this can only occur > + * if we're fed a strangely broken xlog. > + */ > + diag_set(ClientError, ER_UNKNOWN_REPLICA, > + int2str(row->replica_id), > + tt_uuid_str(&REPLICASET_UUID)); > + goto error; > + } > + if (ibuf_used(row_buf) == sizeof(struct xrow_header)) { > + /* > + * First row in a transaction. In order to enforce > + * consistency check that first row lsn and replica id > + * match with transaction. > + */ > + txn_id = row->txn_id; > + if (row->lsn != txn_id) { > + /* There is not a first row in the transactions. */ > + diag_set(ClientError, ER_PROTOCOL, > + "Not a first row in a transaction"); > + goto error; > + } > + } > + if (txn_id != row->txn_id) { > + /* We are not able to handle interleaving transactions. */ > + diag_set(ClientError, ER_UNSUPPORTED, > + "replications", > + "interleaving transactions"); > + goto error; > + } > + > + > + applier->lag = ev_now(loop()) - row->tm; > + applier->last_row_time = ev_monotonic_now(loop()); > + > + if (row->body->iov_base != NULL) { > + void *new_base = obuf_alloc(data_buf, row->body->iov_len); > + if (new_base == NULL) { > + diag_set(OutOfMemory, row->body->iov_len, > + "slab", "xrow_data"); > + goto error; > + } > + memcpy(new_base, row->body->iov_base, row->body->iov_len); > + row->body->iov_base = new_base; So we first read a row to an ibuf, then copy it to an obuf. I understand that you do this, because xrow_header::body has pointers in it. Still it looks rather awkward. May be, we'd better temporarily fix up those pointers to store relative offsets instead? > + } > + > + } while (row->txn_commit == 0); > + > + return 0; > +error: > + ibuf_reset(row_buf); > + obuf_reset(data_buf); obuf_reset, ibuf_reset don't free up memory. You must use obuf_destroy, ibuf_destroy. > + return -1; > +} > + > /** > * Execute and process SUBSCRIBE request (follow updates from a master). > */ > @@ -392,6 +492,10 @@ applier_subscribe(struct applier *applier) > struct xrow_header row; > struct vclock remote_vclock_at_subscribe; > struct tt_uuid cluster_id = uuid_nil; > + struct ibuf row_buf; > + struct obuf data_buf; IMO the buffers better be a part of the applier struct. Then you wouldn't need to use a label to free them up in applier_read_tx and hence could simply let the exception thrown by coio_read_xrow_timeout_xc travel up the stack without wrapping it in that awkward try-catch block. > + ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header)); > + obuf_create(&data_buf, &cord()->slabc, 0x10000); This constant look like magic to me. > > xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, > &replicaset.vclock); > @@ -501,36 +605,16 @@ applier_subscribe(struct applier *applier) > applier_set_state(applier, APPLIER_FOLLOW); > } > > - /* > - * Tarantool < 1.7.7 does not send periodic heartbeat > - * messages so we can't assume that if we haven't heard > - * from the master for quite a while the connection is > - * broken - the master might just be idle. > - */ > - if (applier->version_id < version_id(1, 7, 7)) { > - coio_read_xrow(coio, ibuf, &row); You silently dropped this branch. Please leave it be. > - } else { > - double timeout = replication_disconnect_timeout(); > - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); > - } > + if (applier_read_tx(applier, &row_buf, &data_buf) != 0) > + diag_raise(); > > - if (iproto_type_is_error(row.type)) > - xrow_decode_error_xc(&row); /* error */ > - /* Replication request. */ > - if (row.replica_id == REPLICA_ID_NIL || > - row.replica_id >= VCLOCK_MAX) { > - /* > - * A safety net, this can only occur > - * if we're fed a strangely broken xlog. > - */ > - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, > - int2str(row.replica_id), > - tt_uuid_str(&REPLICASET_UUID)); > - } > + struct txn *txn = NULL; > + struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos; > + struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1; > > - applier->lag = ev_now(loop()) - row.tm; > + applier->lag = ev_now(loop()) - last_row->tm; > applier->last_row_time = ev_monotonic_now(loop()); > - struct replica *replica = replica_by_id(row.replica_id); > + struct replica *replica = replica_by_id(first_row->replica_id); > struct latch *latch = (replica ? &replica->order_latch : > &replicaset.applier.order_latch); > /* > @@ -540,24 +624,45 @@ applier_subscribe(struct applier *applier) > * that belong to the same server id. > */ > latch_lock(latch); > - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { > - int res = xstream_write(applier->subscribe_stream, &row); > - if (res != 0) { > - struct error *e = diag_last_error(diag_get()); > - /** > - * Silently skip ER_TUPLE_FOUND error if such > - * option is set in config. > - */ > - if (e->type == &type_ClientError && > + if (vclock_get(&replicaset.vclock, > + first_row->replica_id) < first_row->lsn) { > + struct xrow_header *row = first_row; > + if (first_row != last_row) > + txn = txn_begin(false); We have a nice level of abstraction implemented by xstream, but now you bluntly break it by calling txn_begin/commit directly. Please come up with a better solution, e.g. you could extend the xstream interface with write_tx method. > + int res = 0; > + while (row <= last_row && res == 0) { > + res = xstream_write(applier->subscribe_stream, row); > + struct error *e; > + if (res != 0 && > + (e = diag_last_error(diag_get()))->type == > + &type_ClientError && > box_error_code(e) == ER_TUPLE_FOUND && > - replication_skip_conflict) > + replication_skip_conflict) { > + /** > + * Silently skip ER_TUPLE_FOUND error > + * if such option is set in config. > + */ > diag_clear(diag_get()); > - else { > - latch_unlock(latch); > - diag_raise(); > + row->type = IPROTO_NOP; > + row->bodycnt = 0; > + res = xstream_write(applier->subscribe_stream, > + row); > } > + ++row; > + } > + if (res == 0 && txn != NULL) > + res = txn_commit(txn); > + > + if (res != 0) { > + txn_rollback(); > + obuf_reset(&data_buf); > + ibuf_reset(&row_buf); > + latch_unlock(latch); > + diag_raise(); > } > } > + obuf_reset(&data_buf); > + ibuf_reset(&row_buf); > latch_unlock(latch); > > if (applier->state == APPLIER_SYNC || > diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua > new file mode 100644 > index 000000000..47003c644 > --- /dev/null > +++ b/test/replication/transaction.test.lua > @@ -0,0 +1,86 @@ > +env = require('test_run') > +test_run = env.new() > +box.schema.user.grant('guest', 'replication') > + > +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) > +_ = s:create_index('pk') > + > +-- transaction w/o conflict > +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() Whenever you add a test, please write a few words about what it does and the resolution of which ticket it is supposed to test.