From: Georgy Kirichenko <georgy@tarantool.org> To: tarantool-patches@freelists.org Cc: Georgy Kirichenko <georgy@tarantool.org> Subject: [tarantool-patches] [PATCH 3/3] Transaction support for applier Date: Sun, 3 Mar 2019 23:26:18 +0300 [thread overview] Message-ID: <f7da57ec765aae5b550842f435d69a56e9585761.1551644303.git.georgy@tarantool.org> (raw) In-Reply-To: <cover.1551644303.git.georgy@tarantool.org> Applier fetch incoming rows to form a transaction and then apply it. In case of replication all local changes moved to an journal entry tail to form a separate transaction (like autonomous transaction) to be able to replicate changes back so applier assumes that transactions could not be mixed in a replication stream. Closes: #2798 Needed for: #980 --- src/box/applier.cc | 243 ++++++++++++++++++++------ src/box/txn.c | 21 ++- src/box/txn.h | 4 + test/replication/transaction.result | 240 +++++++++++++++++++++++++ test/replication/transaction.test.lua | 86 +++++++++ 5 files changed, 534 insertions(+), 60 deletions(-) create mode 100644 test/replication/transaction.result create mode 100644 test/replication/transaction.test.lua diff --git a/src/box/applier.cc b/src/box/applier.cc index 3222b041d..dfabbe5ab 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -48,6 +48,12 @@ #include "session.h" #include "cfg.h" #include "box.h" +#include "txn.h" + +enum { + /* Initial capacity of rows array. */ + APPLIER_TX_INITIAL_ROW_COUNT = 16, +}; STRS(applier_state, applier_STATE); @@ -380,6 +386,176 @@ applier_join(struct applier *applier) applier_set_state(applier, APPLIER_READY); } +/** + * Read one transaction from network using applier's input buffer. + * Transaction rows are placed onto fiber gc region. + * We could not use applier input buffer for that because rpos is adjusted + * after each xrow decoding and corresponding network input space is going + * to be reused. + * + * Return count of transaction rows and put row's header pointers into rows + * array. + */ +static int +applier_read_tx(struct applier *applier, struct xrow_header **rows) +{ + struct ev_io *coio = &applier->io; + struct ibuf *ibuf = &applier->ibuf; + int64_t tsn = 0; + int row_capacity = APPLIER_TX_INITIAL_ROW_COUNT; + struct xrow_header *first_row, *row; + first_row = (struct xrow_header *)region_alloc(&fiber()->gc, + row_capacity * + sizeof(struct xrow_header)); + if (first_row == NULL) { + diag_set(OutOfMemory, sizeof(struct xrow_header) * row_capacity, + "region", "struct xrow_header"); + goto error; + } + row = first_row; + + do { + if (row == first_row + row_capacity) { + /* Realloc rows array. */ + row = (struct xrow_header *)region_alloc(&fiber()->gc, + row_capacity * + sizeof(struct xrow_header) << 1); + if (row == NULL) { + diag_set(OutOfMemory, + sizeof(struct xrow_header) * + row_capacity << 1, + "region", "struct xrow_header"); + goto error; + } + memcpy(row, first_row, row_capacity * + sizeof(struct xrow_header) << 1); + first_row = row; + row = first_row + row_capacity; + row_capacity <<= 1; + } + + double timeout = replication_disconnect_timeout(); + /* + * Unfortunately we do not have C-version of coio read xrow + * functions yet so use try-catch guard as workaround. + */ + try { + /* + * Tarantool < 1.7.7 does not send periodic heartbeat + * messages so we can't assume that if we haven't heard + * from the master for quite a while the connection is + * broken - the master might just be idle. + */ + if (applier->version_id < version_id(1, 7, 7)) + coio_read_xrow(coio, ibuf, row); + else + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); + } catch (...) { + goto error; + } + + if (iproto_type_is_error(row->type)) { + xrow_decode_error(row); + goto error; + } + + /* Replication request. */ + if (row->replica_id == REPLICA_ID_NIL || + row->replica_id >= VCLOCK_MAX) { + /* + * A safety net, this can only occur + * if we're fed a strangely broken xlog. + */ + diag_set(ClientError, ER_UNKNOWN_REPLICA, + int2str(row->replica_id), + tt_uuid_str(&REPLICASET_UUID)); + goto error; + } + if (row == first_row) { + /* + * First row in a transaction. In order to enforce + * consistency check that first row lsn and replica id + * match with transaction. + */ + tsn = row->tsn; + if (row->lsn != tsn) { + /* There is not a first row in the transactions. */ + diag_set(ClientError, ER_PROTOCOL, + "Not a first row in a transaction"); + goto error; + } + } + if (tsn != row->tsn) { + /* We are not able to handle interleaving transactions. */ + diag_set(ClientError, ER_UNSUPPORTED, + "replications", + "interleaving transactions"); + goto error; + } + + + applier->lag = ev_now(loop()) - row->tm; + applier->last_row_time = ev_monotonic_now(loop()); + + if (row->body->iov_base != NULL) { + /* Save row bodies to gc region. */ + void *new_base = region_alloc(&fiber()->gc, + row->body->iov_len); + if (new_base == NULL) { + diag_set(OutOfMemory, row->body->iov_len, + "slab", "xrow_data"); + goto error; + } + memcpy(new_base, row->body->iov_base, row->body->iov_len); + /* Adjust row body pointers. */ + row->body->iov_base = new_base; + } + + } while (row->is_commit == 0 && ++row); + + *rows = first_row; + return row - first_row + 1; +error: + return -1; +} + +static int +applier_apply_tx(struct xrow_header *first_row, struct xrow_header *last_row) +{ + int res = 0; + struct txn *txn = NULL; + struct xrow_header *row = first_row; + if (first_row != last_row) + txn = txn_begin(false); + while (row <= last_row && res == 0) { + res = apply_row(row); + struct error *e; + if (res != 0 && + (e = diag_last_error(diag_get()))->type == + &type_ClientError && + box_error_code(e) == ER_TUPLE_FOUND && + replication_skip_conflict) { + /* + * In case of ER_TUPLE_FOUND error and enabled + * replication_skip_conflict configuration + * option, skip applying the foreign row and + * replace it with NOP in the local write ahead + * log. + */ + diag_clear(diag_get()); + (row)->type = IPROTO_NOP; + (row)->bodycnt = 0; + res = apply_row(row); + } + ++row; + } + if (res == 0 && txn != NULL) + res = txn_commit(txn); + if (res != 0) + txn_rollback(); + return res; +} + /** * Execute and process SUBSCRIBE request (follow updates from a master). */ @@ -509,36 +685,14 @@ applier_subscribe(struct applier *applier) applier_set_state(applier, APPLIER_FOLLOW); } - /* - * Tarantool < 1.7.7 does not send periodic heartbeat - * messages so we can't assume that if we haven't heard - * from the master for quite a while the connection is - * broken - the master might just be idle. - */ - if (applier->version_id < version_id(1, 7, 7)) { - coio_read_xrow(coio, ibuf, &row); - } else { - double timeout = replication_disconnect_timeout(); - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); - } + struct xrow_header *tx_rows; + int row_count = applier_read_tx(applier, &tx_rows); + if (row_count < 0) + diag_raise(); - if (iproto_type_is_error(row.type)) - xrow_decode_error_xc(&row); /* error */ - /* Replication request. */ - if (row.replica_id == REPLICA_ID_NIL || - row.replica_id >= VCLOCK_MAX) { - /* - * A safety net, this can only occur - * if we're fed a strangely broken xlog. - */ - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, - int2str(row.replica_id), - tt_uuid_str(&REPLICASET_UUID)); - } - - applier->lag = ev_now(loop()) - row.tm; + applier->lag = ev_now(loop()) - (tx_rows + row_count - 1)->tm; applier->last_row_time = ev_monotonic_now(loop()); - struct replica *replica = replica_by_id(row.replica_id); + struct replica *replica = replica_by_id(tx_rows->replica_id); struct latch *latch = (replica ? &replica->order_latch : &replicaset.applier.order_latch); /* @@ -548,33 +702,12 @@ applier_subscribe(struct applier *applier) * that belong to the same server id. */ latch_lock(latch); - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { - int res = apply_row(&row); - if (res != 0) { - struct error *e = diag_last_error(diag_get()); - /* - * In case of ER_TUPLE_FOUND error and enabled - * replication_skip_conflict configuration - * option, skip applying the foreign row and - * replace it with NOP in the local write ahead - * log. - */ - if (e->type == &type_ClientError && - box_error_code(e) == ER_TUPLE_FOUND && - replication_skip_conflict) { - diag_clear(diag_get()); - struct xrow_header nop; - nop.type = IPROTO_NOP; - nop.bodycnt = 0; - nop.replica_id = row.replica_id; - nop.lsn = row.lsn; - res = apply_row(&nop); - } - } - if (res != 0) { - latch_unlock(latch); - diag_raise(); - } + if (vclock_get(&replicaset.vclock, + tx_rows->replica_id) < tx_rows->lsn && + applier_apply_tx(tx_rows, tx_rows + row_count - 1) != 0) { + latch_unlock(latch); + fiber_gc(); + diag_raise(); } latch_unlock(latch); diff --git a/src/box/txn.c b/src/box/txn.c index 7900fb3ab..f6bf72d0c 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -34,6 +34,7 @@ #include "journal.h" #include <fiber.h> #include "xrow.h" +#include "replication.h" double too_long_threshold; @@ -141,6 +142,7 @@ txn_begin(bool is_autocommit) /* Initialize members explicitly to save time on memset() */ stailq_create(&txn->stmts); txn->n_rows = 0; + txn->n_remote_rows = 0; txn->is_autocommit = is_autocommit; txn->has_triggers = false; txn->is_aborted = false; @@ -233,6 +235,9 @@ txn_commit_stmt(struct txn *txn, struct request *request) if (stmt->space == NULL || !space_is_temporary(stmt->space)) { if (txn_add_redo(stmt, request) != 0) goto fail; + if (stmt->row->replica_id != 0 && + stmt->row->replica_id != instance_id) + ++txn->n_remote_rows; ++txn->n_rows; } /* @@ -271,14 +276,20 @@ txn_write_to_wal(struct txn *txn) return -1; struct txn_stmt *stmt; - struct xrow_header **row = req->rows; + struct xrow_header **remote_row = req->rows; + struct xrow_header **local_row = req->rows + txn->n_remote_rows; stailq_foreach_entry(stmt, &txn->stmts, next) { if (stmt->row == NULL) continue; /* A read (e.g. select) request */ - *row++ = stmt->row; + if (stmt->row->replica_id != 0 && + stmt->row->replica_id != instance_id) + *remote_row++ = stmt->row; + else + *local_row++ = stmt->row; req->approx_len += xrow_approx_len(stmt->row); } - assert(row == req->rows + req->n_rows); + assert(remote_row == req->rows + txn->n_remote_rows); + assert(local_row == req->rows + req->n_rows); ev_tstamp start = ev_monotonic_now(loop()); int64_t res = journal_write(req); @@ -399,8 +410,6 @@ txn_rollback() txn_stmt_unref_tuples(stmt); TRASH(txn); - /** Free volatile txn memory. */ - fiber_gc(); fiber_set_txn(fiber(), NULL); } @@ -480,6 +489,8 @@ box_txn_rollback() return -1; } txn_rollback(); /* doesn't throw */ + /** Free volatile txn memory. */ + fiber_gc(); return 0; } diff --git a/src/box/txn.h b/src/box/txn.h index de5cb0de4..2791fdf73 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -142,6 +142,10 @@ struct txn { struct stailq stmts; /** Total number of WAL rows in this txn. */ int n_rows; + /** + * Count of rows generated on a remote replica. + */ + int n_remote_rows; /** * True if this transaction is running in autocommit mode * (statement end causes an automatic transaction commit). diff --git a/test/replication/transaction.result b/test/replication/transaction.result new file mode 100644 index 000000000..009f84430 --- /dev/null +++ b/test/replication/transaction.result @@ -0,0 +1,240 @@ +env = require('test_run') +--- +... +test_run = env.new() +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) +--- +... +_ = s:create_index('pk') +--- +... +-- transaction w/o conflict +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() +--- +... +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +--- +- true +... +test_run:cmd("start server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +-- insert a conflicting row +box.space.test:replace({4, 'r'}) +--- +- [4, 'r'] +... +v1 = box.info.vclock +--- +... +test_run:cmd("switch default") +--- +- true +... +-- create a two-row transaction with conflicting second +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit() +--- +... +-- create a third transaction +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- nothing was applied +v1[1] == box.info.vclock[1] +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [4, 'r'] +... +-- check replication status +box.info.replication[1].upstream.status +--- +- stopped +... +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'pk' in space 'test' +... +-- set conflict to third transaction +box.space.test:delete({3}) +--- +... +box.space.test:replace({6, 'r'}) +--- +- [6, 'r'] +... +-- restart replication +replication = box.cfg.replication +--- +... +box.cfg{replication = {}} +--- +... +box.cfg{replication = replication} +--- +... +-- replication stopped of third transaction +v1[1] + 2 == box.info.vclock[1] +--- +- false +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [4, 'r'] + - [6, 'r'] +... +-- check replication status +box.info.replication[1].upstream.status +--- +- stopped +... +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'pk' in space 'test' +... +-- check restart does not help +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [4, 'r'] + - [6, 'r'] +... +-- set skip conflict rows and check that non-conflicting were applied +replication = box.cfg.replication +--- +... +box.cfg{replication = {}, replication_skip_conflict = true} +--- +... +box.cfg{replication = replication} +--- +... +-- check last transaction applied without conflicting row +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'r'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] +... +box.info.replication[1].upstream.status +--- +- follow +... +-- make some new conflicting rows with skip-conflicts +box.space.test:replace({8, 'r'}) +--- +- [8, 'r'] +... +box.space.test:replace({9, 'r'}) +--- +- [9, 'r'] +... +-- issue a conflicting tx +test_run:cmd("switch default") +--- +- true +... +box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- vclock should be increased but rows skipped +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'r'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] + - [8, 'r'] + - [9, 'r'] +... +-- check restart does not change something +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'r'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] + - [8, 'r'] + - [9, 'r'] +... +box.info.replication[1].upstream.status +--- +- follow +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... +box.schema.user.revoke('guest', 'replication') +--- +... +s:drop() +--- +... diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua new file mode 100644 index 000000000..47003c644 --- /dev/null +++ b/test/replication/transaction.test.lua @@ -0,0 +1,86 @@ +env = require('test_run') +test_run = env.new() +box.schema.user.grant('guest', 'replication') + +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) +_ = s:create_index('pk') + +-- transaction w/o conflict +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() + +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("start server replica") +test_run:cmd("switch replica") + +-- insert a conflicting row +box.space.test:replace({4, 'r'}) +v1 = box.info.vclock + +test_run:cmd("switch default") +-- create a two-row transaction with conflicting second +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit() +-- create a third transaction +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit() + +test_run:cmd("switch replica") +-- nothing was applied +v1[1] == box.info.vclock[1] +box.space.test:select() +-- check replication status +box.info.replication[1].upstream.status +box.info.replication[1].upstream.message +-- set conflict to third transaction +box.space.test:delete({3}) +box.space.test:replace({6, 'r'}) +-- restart replication +replication = box.cfg.replication +box.cfg{replication = {}} +box.cfg{replication = replication} +-- replication stopped of third transaction +v1[1] + 2 == box.info.vclock[1] +box.space.test:select() +-- check replication status +box.info.replication[1].upstream.status +box.info.replication[1].upstream.message + +-- check restart does not help +test_run:cmd("switch default") +test_run:cmd("restart server replica") +test_run:cmd("switch replica") + +box.space.test:select() +-- set skip conflict rows and check that non-conflicting were applied +replication = box.cfg.replication +box.cfg{replication = {}, replication_skip_conflict = true} +box.cfg{replication = replication} + +-- check last transaction applied without conflicting row +box.space.test:select() +box.info.replication[1].upstream.status + +-- make some new conflicting rows with skip-conflicts +box.space.test:replace({8, 'r'}) +box.space.test:replace({9, 'r'}) + +-- issue a conflicting tx +test_run:cmd("switch default") +box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit() + +test_run:cmd("switch replica") +-- vclock should be increased but rows skipped +box.space.test:select() + +-- check restart does not change something +test_run:cmd("switch default") +test_run:cmd("restart server replica") +test_run:cmd("switch replica") + +box.space.test:select() +box.info.replication[1].upstream.status + +test_run:cmd("switch default") +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") + +box.schema.user.revoke('guest', 'replication') +s:drop() -- 2.21.0
next prev parent reply other threads:[~2019-03-03 20:26 UTC|newest] Thread overview: 15+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-03-03 20:26 [tarantool-patches] [PATCH 0/3] Transaction boundaries " Georgy Kirichenko 2019-03-03 20:26 ` [tarantool-patches] [PATCH 1/3] Applier gets rid of a xstream Georgy Kirichenko 2019-03-05 8:52 ` [tarantool-patches] " Konstantin Osipov 2019-03-05 10:19 ` [tarantool-patches] " Vladimir Davydov 2019-03-03 20:26 ` [tarantool-patches] [PATCH 2/3] Merge apply row and apply_initial_join_row Georgy Kirichenko 2019-03-05 9:06 ` [tarantool-patches] " Konstantin Osipov 2019-03-05 10:26 ` [tarantool-patches] " Vladimir Davydov 2019-03-03 20:26 ` Georgy Kirichenko [this message] 2019-03-05 9:11 ` [tarantool-patches] Re: [PATCH 3/3] Transaction support for applier Konstantin Osipov 2019-03-05 10:28 ` Vladimir Davydov [not found] ` <20190305112333.GA30697@chai> 2019-03-05 11:27 ` Vladimir Davydov 2019-03-05 9:13 ` Konstantin Osipov 2019-03-05 9:25 ` Konstantin Osipov 2019-03-05 9:28 ` Konstantin Osipov 2019-03-05 11:59 ` [tarantool-patches] " Vladimir Davydov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=f7da57ec765aae5b550842f435d69a56e9585761.1551644303.git.georgy@tarantool.org \ --to=georgy@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH 3/3] Transaction support for applier' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox