* [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol @ 2019-01-06 13:05 Georgy Kirichenko 2019-01-06 13:05 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko 2019-01-11 13:30 ` [tarantool-patches] Re: [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko 0 siblings, 2 replies; 5+ messages in thread From: Georgy Kirichenko @ 2019-01-06 13:05 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko This patchset introduces transactional replication and consist of two commits: * the first one forms transaction boundaries in a xstream * the second one forms transactions in applier buffers and then applies them with correct begin/commit boundaries. Note: distributed transaction are not supported so journal forms a separate transaction for all local triggers effects. Changes in v2: - Fixed local transaction extraction Georgy Kirichenko (2): Journal transaction boundaries Transaction support for applier src/box/applier.cc | 202 ++++++++++++++++++++++++++----------- src/box/iproto_constants.h | 3 + src/box/wal.c | 36 ++++++- src/box/xrow.c | 38 +++++++ src/box/xrow.h | 5 +- test/unit/xrow.cc | 3 + test/vinyl/errinj.result | 8 +- test/vinyl/info.result | 38 +++---- test/vinyl/layout.result | 24 ++--- 9 files changed, 263 insertions(+), 94 deletions(-) -- 2.20.1 ^ permalink raw reply [flat|nested] 5+ messages in thread
* [tarantool-patches] [PATCH v2 2/2] Transaction support for applier 2019-01-06 13:05 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko @ 2019-01-06 13:05 ` Georgy Kirichenko 2019-01-11 13:30 ` [tarantool-patches] Re: [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko 1 sibling, 0 replies; 5+ messages in thread From: Georgy Kirichenko @ 2019-01-06 13:05 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko Applier fetch incoming rows to form a transaction and then apply it. Implementation assumes that transaction could not mix in a replication stream. Also distributed transaction are not supported yet. Closes: #2798 Needed for: #980 --- src/box/applier.cc | 202 ++++++++++++++++++++++++++++++++------------- 1 file changed, 145 insertions(+), 57 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 6c0eb45d5..7e208aaa2 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -48,6 +48,7 @@ #include "error.h" #include "session.h" #include "cfg.h" +#include "txn.h" STRS(applier_state, applier_STATE); @@ -380,6 +381,102 @@ applier_join(struct applier *applier) applier_set_state(applier, APPLIER_READY); } +/** + * Read one transaction from network. + * Transaction rows are placed into row_buf as an array, row's bodies are + * placed into obuf because it is not allowed to relocate row's bodies. + * Also we could not use applier input buffer because rpos adjusted after xrow + * decoding and corresponding space going to reuse. + * + * Note: current implementation grants that transaction could not be mixed, so + * we read each transaction from first xrow until xrow with txn_last = true. + */ +static int64_t +applier_read_tx(struct applier *applier, struct ibuf *row_buf, + struct obuf *data_buf) +{ + struct xrow_header *row; + struct ev_io *coio = &applier->io; + struct ibuf *ibuf = &applier->ibuf; + int64_t txn_id = 0; + uint32_t txn_replica_id = 0; + + do { + row = (struct xrow_header *)ibuf_alloc(row_buf, + sizeof(struct xrow_header)); + if (row == NULL) { + diag_set(OutOfMemory, sizeof(struct xrow_header), + "slab", "struct xrow_header"); + goto error; + } + + double timeout = replication_disconnect_timeout(); + try { + /* TODO: we should have a C version of this function. */ + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); + } catch (...) { + goto error; + } + + if (iproto_type_is_error(row->type)) { + xrow_decode_error(row); + goto error; + } + + /* Replication request. */ + if (row->replica_id == REPLICA_ID_NIL || + row->replica_id >= VCLOCK_MAX) { + /* + * A safety net, this can only occur + * if we're fed a strangely broken xlog. + */ + diag_set(ClientError, ER_UNKNOWN_REPLICA, + int2str(row->replica_id), + tt_uuid_str(&REPLICASET_UUID)); + goto error; + } + if (ibuf_used(row_buf) == sizeof(struct xrow_header)) { + /* + * First row in a transaction. In order to enforce + * consistency check that first row lsn and replica id + * match with transaction. + */ + txn_id = row->lsn; + txn_replica_id = row->replica_id; + } + if (txn_id != row->txn_id || + txn_replica_id != row->txn_replica_id) { + /* We are not able to handle interleaving transactions. */ + diag_set(ClientError, ER_UNSUPPORTED, + "replications", + "interleaving transactions"); + goto error; + } + + + applier->lag = ev_now(loop()) - row->tm; + applier->last_row_time = ev_monotonic_now(loop()); + + if (row->body->iov_base != NULL) { + void *new_base = obuf_alloc(data_buf, row->body->iov_len); + if (new_base == NULL) { + diag_set(OutOfMemory, row->body->iov_len, + "slab", "xrow_data"); + goto error; + } + memcpy(new_base, row->body->iov_base, row->body->iov_len); + row->body->iov_base = new_base; + } + + } while (row->txn_last == 0); + + return 0; +error: + ibuf_reset(row_buf); + obuf_reset(data_buf); + return -1; +} + /** * Execute and process SUBSCRIBE request (follow updates from a master). */ @@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier) struct ibuf *ibuf = &applier->ibuf; struct xrow_header row; struct vclock remote_vclock_at_subscribe; + struct ibuf row_buf; + struct obuf data_buf; + ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header)); + obuf_create(&data_buf, &cord()->slabc, 0x10000); xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, &replicaset.vclock); @@ -475,80 +576,67 @@ applier_subscribe(struct applier *applier) applier_set_state(applier, APPLIER_FOLLOW); } - /* - * Tarantool < 1.7.7 does not send periodic heartbeat - * messages so we can't assume that if we haven't heard - * from the master for quite a while the connection is - * broken - the master might just be idle. - */ - if (applier->version_id < version_id(1, 7, 7)) { - coio_read_xrow(coio, ibuf, &row); - } else { - double timeout = replication_disconnect_timeout(); - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); - } + if (applier_read_tx(applier, &row_buf, &data_buf) != 0) + diag_raise(); - if (iproto_type_is_error(row.type)) - xrow_decode_error_xc(&row); /* error */ - /* Replication request. */ - if (row.replica_id == REPLICA_ID_NIL || - row.replica_id >= VCLOCK_MAX) { - /* - * A safety net, this can only occur - * if we're fed a strangely broken xlog. - */ - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, - int2str(row.replica_id), - tt_uuid_str(&REPLICASET_UUID)); - } + struct txn *txn = NULL; + struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos; + struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1; - applier->lag = ev_now(loop()) - row.tm; + applier->lag = ev_now(loop()) - last_row->tm; applier->last_row_time = ev_monotonic_now(loop()); - struct replica *replica = replica_by_id(row.replica_id); + struct replica *replica = replica_by_id(first_row->txn_replica_id); struct latch *latch = (replica ? &replica->order_latch : &replicaset.applier.order_latch); - /* - * In a full mesh topology, the same set - * of changes may arrive via two - * concurrently running appliers. Thanks - * to vclock_follow() above, the first row - * in the set will be skipped - but the - * remaining may execute out of order, - * when the following xstream_write() - * yields on WAL. Hence we need a latch to - * strictly order all changes which belong - * to the same server id. - */ latch_lock(latch); + /* First row identifies a transaction. */ + assert(first_row->lsn == first_row->txn_id); + assert(first_row->replica_id == first_row->txn_replica_id); if (vclock_get(&replicaset.applier.vclock, - row.replica_id) < row.lsn) { + first_row->replica_id) < first_row->lsn) { /* Preserve old lsn value. */ int64_t old_lsn = vclock_get(&replicaset.applier.vclock, - row.replica_id); - vclock_follow_xrow(&replicaset.applier.vclock, &row); - int res = xstream_write(applier->subscribe_stream, &row); - struct error *e = diag_last_error(diag_get()); - if (res != 0 && e->type == &type_ClientError && - box_error_code(e) == ER_TUPLE_FOUND && - replication_skip_conflict) { - /** - * Silently skip ER_TUPLE_FOUND error if such - * option is set in config. - */ - diag_clear(diag_get()); - row.type = IPROTO_NOP; - row.bodycnt = 0; - res = xstream_write(applier->subscribe_stream, - &row); + first_row->replica_id); + + struct xrow_header *row = first_row; + if (first_row != last_row) + txn = txn_begin(false); + int res = 0; + while (row <= last_row && res == 0) { + vclock_follow_xrow(&replicaset.applier.vclock, row); + res = xstream_write(applier->subscribe_stream, row); + struct error *e; + if (res != 0 && + (e = diag_last_error(diag_get()))->type == + &type_ClientError && + box_error_code(e) == ER_TUPLE_FOUND && + replication_skip_conflict) { + /** + * Silently skip ER_TUPLE_FOUND error + * if such option is set in config. + */ + diag_clear(diag_get()); + row->type = IPROTO_NOP; + row->bodycnt = 0; + res = xstream_write(applier->subscribe_stream, + row); + } + ++row; } + if (res == 0 && txn != NULL) + res = txn_commit(txn); if (res != 0) { /* Rollback lsn to have a chance for a retry. */ vclock_set(&replicaset.applier.vclock, - row.replica_id, old_lsn); + first_row->replica_id, old_lsn); + obuf_reset(&data_buf); + ibuf_reset(&row_buf); latch_unlock(latch); diag_raise(); } } + obuf_reset(&data_buf); + ibuf_reset(&row_buf); latch_unlock(latch); /* * Stay 'orphan' until appliers catch up with -- 2.20.1 ^ permalink raw reply [flat|nested] 5+ messages in thread
* [tarantool-patches] Re: [PATCH v2 0/2] Transaction boundaries in replication protocol 2019-01-06 13:05 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko 2019-01-06 13:05 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko @ 2019-01-11 13:30 ` Georgy Kirichenko 1 sibling, 0 replies; 5+ messages in thread From: Georgy Kirichenko @ 2019-01-11 13:30 UTC (permalink / raw) To: tarantool-patches [-- Attachment #1: Type: text/plain, Size: 1236 bytes --] Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-2798-transaction-boundaries Issue: https://github.com/tarantool/tarantool/issues/2798 On Sunday, January 6, 2019 4:05:51 PM MSK Georgy Kirichenko wrote: > This patchset introduces transactional replication and consist of two > commits: > * the first one forms transaction boundaries in a xstream > * the second one forms transactions in applier buffers and then > applies them with correct begin/commit boundaries. > > Note: distributed transaction are not supported so journal forms a > separate transaction for all local triggers effects. > > Changes in v2: > - Fixed local transaction extraction > > Georgy Kirichenko (2): > Journal transaction boundaries > Transaction support for applier > > src/box/applier.cc | 202 ++++++++++++++++++++++++++----------- > src/box/iproto_constants.h | 3 + > src/box/wal.c | 36 ++++++- > src/box/xrow.c | 38 +++++++ > src/box/xrow.h | 5 +- > test/unit/xrow.cc | 3 + > test/vinyl/errinj.result | 8 +- > test/vinyl/info.result | 38 +++---- > test/vinyl/layout.result | 24 ++--- > 9 files changed, 263 insertions(+), 94 deletions(-) [-- Attachment #2: This is a digitally signed message part. --] [-- Type: application/pgp-signature, Size: 488 bytes --] ^ permalink raw reply [flat|nested] 5+ messages in thread
* [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol @ 2019-01-22 10:57 Georgy Kirichenko 2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko 0 siblings, 1 reply; 5+ messages in thread From: Georgy Kirichenko @ 2019-01-22 10:57 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko This patchset introduces transactional replication and consist of two commits: * the first one forms transaction boundaries in a xstream * the second one forms transactions in applier buffers and then applies them with correct begin/commit boundaries. Note: this pathchset based on g.kirichenko/gh-980-disable-lsn-gaps Note: distributed transaction are not supported so journal forms a separate transaction for all local triggers effects. Changes in v2: - Rebased against latest 2.1 - Fixed local transaction extraction Issue: https://github.com/tarantool/tarantool/issues/2798 Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-2798-transaction-boundaries Georgy Kirichenko (2): Journal transaction boundaries Transaction support for applier src/box/applier.cc | 207 ++++++++++++++++++++++++---------- src/box/iproto_constants.h | 3 + src/box/wal.c | 36 +++++- src/box/xrow.c | 38 +++++++ src/box/xrow.h | 5 +- test/unit/xrow.cc | 3 + test/vinyl/errinj_stat.result | 8 +- test/vinyl/layout.result | 24 ++-- test/vinyl/stat.result | 78 ++++++------- 9 files changed, 286 insertions(+), 116 deletions(-) -- 2.20.1 ^ permalink raw reply [flat|nested] 5+ messages in thread
* [tarantool-patches] [PATCH v2 2/2] Transaction support for applier 2019-01-22 10:57 [tarantool-patches] " Georgy Kirichenko @ 2019-01-22 10:57 ` Georgy Kirichenko 2019-01-28 13:35 ` Vladimir Davydov 0 siblings, 1 reply; 5+ messages in thread From: Georgy Kirichenko @ 2019-01-22 10:57 UTC (permalink / raw) To: tarantool-patches; +Cc: Georgy Kirichenko Applier fetch incoming rows to form a transaction and then apply it. Implementation assumes that transaction could not mix in a replication stream. Also distributed transaction are not supported yet. Closes: #2798 Needed for: #980 --- src/box/applier.cc | 207 ++++++++++++++++++++++++++++++++------------- 1 file changed, 148 insertions(+), 59 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index adbe88679..0e3832ad8 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -48,6 +48,7 @@ #include "error.h" #include "session.h" #include "cfg.h" +#include "txn.h" STRS(applier_state, applier_STATE); @@ -380,6 +381,102 @@ applier_join(struct applier *applier) applier_set_state(applier, APPLIER_READY); } +/** + * Read one transaction from network. + * Transaction rows are placed into row_buf as an array, row's bodies are + * placed into obuf because it is not allowed to relocate row's bodies. + * Also we could not use applier input buffer because rpos adjusted after xrow + * decoding and corresponding space going to reuse. + * + * Note: current implementation grants that transaction could not be mixed, so + * we read each transaction from first xrow until xrow with txn_last = true. + */ +static int64_t +applier_read_tx(struct applier *applier, struct ibuf *row_buf, + struct obuf *data_buf) +{ + struct xrow_header *row; + struct ev_io *coio = &applier->io; + struct ibuf *ibuf = &applier->ibuf; + int64_t txn_id = 0; + uint32_t txn_replica_id = 0; + + do { + row = (struct xrow_header *)ibuf_alloc(row_buf, + sizeof(struct xrow_header)); + if (row == NULL) { + diag_set(OutOfMemory, sizeof(struct xrow_header), + "slab", "struct xrow_header"); + goto error; + } + + double timeout = replication_disconnect_timeout(); + try { + /* TODO: we should have a C version of this function. */ + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); + } catch (...) { + goto error; + } + + if (iproto_type_is_error(row->type)) { + xrow_decode_error(row); + goto error; + } + + /* Replication request. */ + if (row->replica_id == REPLICA_ID_NIL || + row->replica_id >= VCLOCK_MAX) { + /* + * A safety net, this can only occur + * if we're fed a strangely broken xlog. + */ + diag_set(ClientError, ER_UNKNOWN_REPLICA, + int2str(row->replica_id), + tt_uuid_str(&REPLICASET_UUID)); + goto error; + } + if (ibuf_used(row_buf) == sizeof(struct xrow_header)) { + /* + * First row in a transaction. In order to enforce + * consistency check that first row lsn and replica id + * match with transaction. + */ + txn_id = row->lsn; + txn_replica_id = row->replica_id; + } + if (txn_id != row->txn_id || + txn_replica_id != row->txn_replica_id) { + /* We are not able to handle interleaving transactions. */ + diag_set(ClientError, ER_UNSUPPORTED, + "replications", + "interleaving transactions"); + goto error; + } + + + applier->lag = ev_now(loop()) - row->tm; + applier->last_row_time = ev_monotonic_now(loop()); + + if (row->body->iov_base != NULL) { + void *new_base = obuf_alloc(data_buf, row->body->iov_len); + if (new_base == NULL) { + diag_set(OutOfMemory, row->body->iov_len, + "slab", "xrow_data"); + goto error; + } + memcpy(new_base, row->body->iov_base, row->body->iov_len); + row->body->iov_base = new_base; + } + + } while (row->txn_last == 0); + + return 0; +error: + ibuf_reset(row_buf); + obuf_reset(data_buf); + return -1; +} + /** * Execute and process SUBSCRIBE request (follow updates from a master). */ @@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier) struct ibuf *ibuf = &applier->ibuf; struct xrow_header row; struct vclock remote_vclock_at_subscribe; + struct ibuf row_buf; + struct obuf data_buf; + ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header)); + obuf_create(&data_buf, &cord()->slabc, 0x10000); xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, &replicaset.vclock); @@ -475,87 +576,75 @@ applier_subscribe(struct applier *applier) applier_set_state(applier, APPLIER_FOLLOW); } - /* - * Tarantool < 1.7.7 does not send periodic heartbeat - * messages so we can't assume that if we haven't heard - * from the master for quite a while the connection is - * broken - the master might just be idle. - */ - if (applier->version_id < version_id(1, 7, 7)) { - coio_read_xrow(coio, ibuf, &row); - } else { - double timeout = replication_disconnect_timeout(); - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); - } + if (applier_read_tx(applier, &row_buf, &data_buf) != 0) + diag_raise(); - if (iproto_type_is_error(row.type)) - xrow_decode_error_xc(&row); /* error */ - /* Replication request. */ - if (row.replica_id == REPLICA_ID_NIL || - row.replica_id >= VCLOCK_MAX) { - /* - * A safety net, this can only occur - * if we're fed a strangely broken xlog. - */ - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, - int2str(row.replica_id), - tt_uuid_str(&REPLICASET_UUID)); - } + struct txn *txn = NULL; + struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos; + struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1; - applier->lag = ev_now(loop()) - row.tm; + applier->lag = ev_now(loop()) - last_row->tm; applier->last_row_time = ev_monotonic_now(loop()); - struct replica *replica = replica_by_id(row.replica_id); + struct replica *replica = replica_by_id(first_row->txn_replica_id); struct latch *latch = (replica ? &replica->order_latch : &replicaset.applier.order_latch); - /* - * In a full mesh topology, the same set - * of changes may arrive via two - * concurrently running appliers. Thanks - * to vclock_follow() above, the first row - * in the set will be skipped - but the - * remaining may execute out of order, - * when the following xstream_write() - * yields on WAL. Hence we need a latch to - * strictly order all changes which belong - * to the same server id. - */ latch_lock(latch); + /* First row identifies a transaction. */ + assert(first_row->lsn == first_row->txn_id); + assert(first_row->replica_id == first_row->txn_replica_id); if (vclock_get(&replicaset.applier.vclock, - row.replica_id) < row.lsn) { - if (row.replica_id == instance_id && + first_row->replica_id) < first_row->lsn) { + if (first_row->replica_id == instance_id && vclock_get(&replicaset.vclock, instance_id) < - row.lsn) { + first_row->lsn) { /* Local row returned back. */ goto done; } /* Preserve old lsn value. */ int64_t old_lsn = vclock_get(&replicaset.applier.vclock, - row.replica_id); - vclock_follow_xrow(&replicaset.applier.vclock, &row); - int res = xstream_write(applier->subscribe_stream, &row); - struct error *e = diag_last_error(diag_get()); - if (res != 0 && e->type == &type_ClientError && - box_error_code(e) == ER_TUPLE_FOUND && - replication_skip_conflict) { - /** - * Silently skip ER_TUPLE_FOUND error if such - * option is set in config. - */ - diag_clear(diag_get()); - row.type = IPROTO_NOP; - row.bodycnt = 0; - res = xstream_write(applier->subscribe_stream, - &row); + first_row->replica_id); + + struct xrow_header *row = first_row; + if (first_row != last_row) + txn = txn_begin(false); + int res = 0; + while (row <= last_row && res == 0) { + vclock_follow_xrow(&replicaset.applier.vclock, row); + res = xstream_write(applier->subscribe_stream, row); + struct error *e; + if (res != 0 && + (e = diag_last_error(diag_get()))->type == + &type_ClientError && + box_error_code(e) == ER_TUPLE_FOUND && + replication_skip_conflict) { + /** + * Silently skip ER_TUPLE_FOUND error + * if such option is set in config. + */ + diag_clear(diag_get()); + row->type = IPROTO_NOP; + row->bodycnt = 0; + res = xstream_write(applier->subscribe_stream, + row); + } + ++row; } + if (res == 0 && txn != NULL) + res = txn_commit(txn); + if (res != 0) { /* Rollback lsn to have a chance for a retry. */ vclock_set(&replicaset.applier.vclock, - row.replica_id, old_lsn); + first_row->replica_id, old_lsn); + obuf_reset(&data_buf); + ibuf_reset(&row_buf); latch_unlock(latch); diag_raise(); } } done: + obuf_reset(&data_buf); + ibuf_reset(&row_buf); latch_unlock(latch); /* * Stay 'orphan' until appliers catch up with -- 2.20.1 ^ permalink raw reply [flat|nested] 5+ messages in thread
* Re: [tarantool-patches] [PATCH v2 2/2] Transaction support for applier 2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko @ 2019-01-28 13:35 ` Vladimir Davydov 0 siblings, 0 replies; 5+ messages in thread From: Vladimir Davydov @ 2019-01-28 13:35 UTC (permalink / raw) To: Georgy Kirichenko; +Cc: tarantool-patches On Tue, Jan 22, 2019 at 01:57:37PM +0300, Georgy Kirichenko wrote: > Applier fetch incoming rows to form a transaction and then apply it. > Implementation assumes that transaction could not mix in a > replication stream. Also distributed transaction are not supported yet. > > Closes: #2798 > Needed for: #980 > --- > src/box/applier.cc | 207 ++++++++++++++++++++++++++++++++------------- > 1 file changed, 148 insertions(+), 59 deletions(-) Without a test, this patch is inadmissible. Vlad mentioned that he has some tests left from his old implementation. Please salvage those. > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index adbe88679..0e3832ad8 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -48,6 +48,7 @@ > #include "error.h" > #include "session.h" > #include "cfg.h" > +#include "txn.h" > > STRS(applier_state, applier_STATE); > > @@ -380,6 +381,102 @@ applier_join(struct applier *applier) > applier_set_state(applier, APPLIER_READY); > } > > +/** > + * Read one transaction from network. > + * Transaction rows are placed into row_buf as an array, row's bodies are > + * placed into obuf because it is not allowed to relocate row's bodies. > + * Also we could not use applier input buffer because rpos adjusted after xrow > + * decoding and corresponding space going to reuse. > + * > + * Note: current implementation grants that transaction could not be mixed, so > + * we read each transaction from first xrow until xrow with txn_last = true. > + */ > +static int64_t > +applier_read_tx(struct applier *applier, struct ibuf *row_buf, > + struct obuf *data_buf) > +{ > + struct xrow_header *row; > + struct ev_io *coio = &applier->io; > + struct ibuf *ibuf = &applier->ibuf; > + int64_t txn_id = 0; > + uint32_t txn_replica_id = 0; > + > + do { > + row = (struct xrow_header *)ibuf_alloc(row_buf, > + sizeof(struct xrow_header)); > + if (row == NULL) { > + diag_set(OutOfMemory, sizeof(struct xrow_header), > + "slab", "struct xrow_header"); > + goto error; > + } > + > + double timeout = replication_disconnect_timeout(); > + try { > + /* TODO: we should have a C version of this function. */ > + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); > + } catch (...) { > + goto error; > + } > + > + if (iproto_type_is_error(row->type)) { > + xrow_decode_error(row); > + goto error; > + } > + > + /* Replication request. */ > + if (row->replica_id == REPLICA_ID_NIL || > + row->replica_id >= VCLOCK_MAX) { > + /* > + * A safety net, this can only occur > + * if we're fed a strangely broken xlog. > + */ > + diag_set(ClientError, ER_UNKNOWN_REPLICA, > + int2str(row->replica_id), > + tt_uuid_str(&REPLICASET_UUID)); > + goto error; > + } > + if (ibuf_used(row_buf) == sizeof(struct xrow_header)) { > + /* > + * First row in a transaction. In order to enforce > + * consistency check that first row lsn and replica id > + * match with transaction. > + */ > + txn_id = row->lsn; > + txn_replica_id = row->replica_id; > + } > + if (txn_id != row->txn_id || > + txn_replica_id != row->txn_replica_id) { > + /* We are not able to handle interleaving transactions. */ > + diag_set(ClientError, ER_UNSUPPORTED, > + "replications", > + "interleaving transactions"); > + goto error; > + } Accumulating rows feels like the iproto realm. I don't think that it's a good idea to implement a dirty ad-hoc solution for this. Instead we should move applier to iproto IMO. This would probably allow us to reuse the code for interactive iproto transactions - the two issues look very similar to me and I think we should use the same protocol and code to get them both working. > + > + > + applier->lag = ev_now(loop()) - row->tm; > + applier->last_row_time = ev_monotonic_now(loop()); > + > + if (row->body->iov_base != NULL) { > + void *new_base = obuf_alloc(data_buf, row->body->iov_len); > + if (new_base == NULL) { > + diag_set(OutOfMemory, row->body->iov_len, > + "slab", "xrow_data"); > + goto error; > + } > + memcpy(new_base, row->body->iov_base, row->body->iov_len); > + row->body->iov_base = new_base; > + } > + > + } while (row->txn_last == 0); > + > + return 0; > +error: > + ibuf_reset(row_buf); > + obuf_reset(data_buf); > + return -1; > +} > + > /** > * Execute and process SUBSCRIBE request (follow updates from a master). > */ > @@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier) > struct ibuf *ibuf = &applier->ibuf; > struct xrow_header row; > struct vclock remote_vclock_at_subscribe; > + struct ibuf row_buf; > + struct obuf data_buf; > + ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header)); > + obuf_create(&data_buf, &cord()->slabc, 0x10000); > > xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, > &replicaset.vclock); > @@ -475,87 +576,75 @@ applier_subscribe(struct applier *applier) > applier_set_state(applier, APPLIER_FOLLOW); > } > > - /* > - * Tarantool < 1.7.7 does not send periodic heartbeat > - * messages so we can't assume that if we haven't heard > - * from the master for quite a while the connection is > - * broken - the master might just be idle. > - */ > - if (applier->version_id < version_id(1, 7, 7)) { > - coio_read_xrow(coio, ibuf, &row); > - } else { > - double timeout = replication_disconnect_timeout(); > - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); > - } > + if (applier_read_tx(applier, &row_buf, &data_buf) != 0) > + diag_raise(); > > - if (iproto_type_is_error(row.type)) > - xrow_decode_error_xc(&row); /* error */ > - /* Replication request. */ > - if (row.replica_id == REPLICA_ID_NIL || > - row.replica_id >= VCLOCK_MAX) { > - /* > - * A safety net, this can only occur > - * if we're fed a strangely broken xlog. > - */ > - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, > - int2str(row.replica_id), > - tt_uuid_str(&REPLICASET_UUID)); > - } > + struct txn *txn = NULL; > + struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos; > + struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1; > > - applier->lag = ev_now(loop()) - row.tm; > + applier->lag = ev_now(loop()) - last_row->tm; > applier->last_row_time = ev_monotonic_now(loop()); > - struct replica *replica = replica_by_id(row.replica_id); > + struct replica *replica = replica_by_id(first_row->txn_replica_id); > struct latch *latch = (replica ? &replica->order_latch : > &replicaset.applier.order_latch); > - /* > - * In a full mesh topology, the same set > - * of changes may arrive via two > - * concurrently running appliers. Thanks > - * to vclock_follow() above, the first row > - * in the set will be skipped - but the > - * remaining may execute out of order, > - * when the following xstream_write() > - * yields on WAL. Hence we need a latch to > - * strictly order all changes which belong > - * to the same server id. > - */ > latch_lock(latch); > + /* First row identifies a transaction. */ > + assert(first_row->lsn == first_row->txn_id); > + assert(first_row->replica_id == first_row->txn_replica_id); > if (vclock_get(&replicaset.applier.vclock, > - row.replica_id) < row.lsn) { > - if (row.replica_id == instance_id && > + first_row->replica_id) < first_row->lsn) { > + if (first_row->replica_id == instance_id && > vclock_get(&replicaset.vclock, instance_id) < > - row.lsn) { > + first_row->lsn) { > /* Local row returned back. */ > goto done; > } > /* Preserve old lsn value. */ > int64_t old_lsn = vclock_get(&replicaset.applier.vclock, > - row.replica_id); > - vclock_follow_xrow(&replicaset.applier.vclock, &row); > - int res = xstream_write(applier->subscribe_stream, &row); > - struct error *e = diag_last_error(diag_get()); > - if (res != 0 && e->type == &type_ClientError && > - box_error_code(e) == ER_TUPLE_FOUND && > - replication_skip_conflict) { > - /** > - * Silently skip ER_TUPLE_FOUND error if such > - * option is set in config. > - */ > - diag_clear(diag_get()); > - row.type = IPROTO_NOP; > - row.bodycnt = 0; > - res = xstream_write(applier->subscribe_stream, > - &row); > + first_row->replica_id); > + > + struct xrow_header *row = first_row; > + if (first_row != last_row) > + txn = txn_begin(false); So we have xstream_write to hide box internals, but we still use txn_begin/commit. This looks ugly. We should encapsulate those somehow as well, I guess. > + int res = 0; > + while (row <= last_row && res == 0) { > + vclock_follow_xrow(&replicaset.applier.vclock, row); > + res = xstream_write(applier->subscribe_stream, row); > + struct error *e; > + if (res != 0 && > + (e = diag_last_error(diag_get()))->type == > + &type_ClientError && > + box_error_code(e) == ER_TUPLE_FOUND && > + replication_skip_conflict) { > + /** > + * Silently skip ER_TUPLE_FOUND error > + * if such option is set in config. > + */ > + diag_clear(diag_get()); > + row->type = IPROTO_NOP; > + row->bodycnt = 0; > + res = xstream_write(applier->subscribe_stream, > + row); > + } > + ++row; > } > + if (res == 0 && txn != NULL) > + res = txn_commit(txn); > + > if (res != 0) { > /* Rollback lsn to have a chance for a retry. */ > vclock_set(&replicaset.applier.vclock, > - row.replica_id, old_lsn); > + first_row->replica_id, old_lsn); > + obuf_reset(&data_buf); > + ibuf_reset(&row_buf); > latch_unlock(latch); > diag_raise(); > } > } > done: > + obuf_reset(&data_buf); > + ibuf_reset(&row_buf); > latch_unlock(latch); > /* > * Stay 'orphan' until appliers catch up with ^ permalink raw reply [flat|nested] 5+ messages in thread
end of thread, other threads:[~2019-01-28 13:35 UTC | newest] Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2019-01-06 13:05 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko 2019-01-06 13:05 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko 2019-01-11 13:30 ` [tarantool-patches] Re: [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko 2019-01-22 10:57 [tarantool-patches] " Georgy Kirichenko 2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko 2019-01-28 13:35 ` Vladimir Davydov
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox