From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 280AE2AA2C for ; Fri, 22 Mar 2019 08:06:17 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id QJCYaadFUAc3 for ; Fri, 22 Mar 2019 08:06:16 -0400 (EDT) Received: from smtp16.mail.ru (smtp16.mail.ru [94.100.176.153]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 909AB2A9F7 for ; Fri, 22 Mar 2019 08:06:16 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH v3 4/5] Transaction support for applier Date: Fri, 22 Mar 2019 15:06:09 +0300 Message-Id: <2d06385bd5d62551ce1132d3d5c937d417ea9e66.1553255718.git.georgy@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Applier fetch incoming rows to form a transaction and then apply it. Rows are fetched and stored on fiber gc region until last transaction row with is_commit was fetched. After fetch a multi row transaction is going to be applied into txn_begin/txn_commit/txn_rolback boundaries. At this time we could not apply single row transaction in such boundaries because of ddl which does not support non auto commit transactions. Closes: #2798 Needed for: #980 --- src/box/applier.cc | 218 +++++++++++++++++------ test/replication/transaction.result | 242 ++++++++++++++++++++++++++ test/replication/transaction.test.lua | 86 +++++++++ 3 files changed, 491 insertions(+), 55 deletions(-) create mode 100644 test/replication/transaction.result create mode 100644 test/replication/transaction.test.lua diff --git a/src/box/applier.cc b/src/box/applier.cc index 94c07aac7..08ad4a6a8 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -426,6 +426,158 @@ applier_join(struct applier *applier) applier_set_state(applier, APPLIER_READY); } +/** + * Helper struct to bind rows in a list. + */ +struct applier_tx_row { + /* Next transaction row. */ + struct stailq_entry next; + /* xrow_header struct for the current transaction row. */ + struct xrow_header row; +}; + +static struct applier_tx_row * +applier_read_tx_row(struct applier *applier) +{ + struct ev_io *coio = &applier->io; + struct ibuf *ibuf = &applier->ibuf; + + struct applier_tx_row *tx_row = (struct applier_tx_row *) + region_alloc(&fiber()->gc, sizeof(struct applier_tx_row)); + + if (tx_row == NULL) + tnt_raise(OutOfMemory, sizeof(struct applier_tx_row), + "region", "struct applier_tx_row"); + + struct xrow_header *row = &tx_row->row; + + double timeout = replication_disconnect_timeout(); + /* + * Tarantool < 1.7.7 does not send periodic heartbeat + * messages so we can't assume that if we haven't heard + * from the master for quite a while the connection is + * broken - the master might just be idle. + */ + if (applier->version_id < version_id(1, 7, 7)) + coio_read_xrow(coio, ibuf, row); + else + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); + + applier->lag = ev_now(loop()) - row->tm; + applier->last_row_time = ev_monotonic_now(loop()); + return tx_row; +} + +/** + * Read one transaction from network using applier's input buffer. + * Transaction rows are placed onto fiber gc region. + * We could not use applier input buffer for that because rpos is adjusted + * after each xrow decoding and corresponding network input space is going + * to be reused. + */ +static void +applier_read_tx(struct applier *applier, struct stailq *rows) +{ + int64_t tsn = 0; + + stailq_create(rows); + do { + struct applier_tx_row *tx_row = applier_read_tx_row(applier); + struct xrow_header *row = &tx_row->row; + + if (iproto_type_is_error(row->type)) + xrow_decode_error_xc(row); + + /* Replication request. */ + if (row->replica_id == REPLICA_ID_NIL || + row->replica_id >= VCLOCK_MAX) { + /* + * A safety net, this can only occur + * if we're fed a strangely broken xlog. + */ + tnt_raise(ClientError, ER_UNKNOWN_REPLICA, + int2str(row->replica_id), + tt_uuid_str(&REPLICASET_UUID)); + } + if (tsn == 0) { + /* + * Transaction id must be derived from the log sequence + * number of the first row in the transaction. + */ + tsn = row->tsn; + if (row->lsn != tsn) + tnt_raise(ClientError, ER_PROTOCOL, + "Transaction id must be derived from " + "the lsn of the first row in the " + "transaction."); + } + if (tsn != row->tsn) + tnt_raise(ClientError, ER_UNSUPPORTED, + "replication", + "interleaving transactions"); + + assert(row->bodycnt <= 1); + if (row->bodycnt == 1 && !row->is_commit) { + /* Save row body to gc region. */ + void *new_base = region_alloc(&fiber()->gc, + row->body->iov_len); + if (new_base == NULL) + tnt_raise(OutOfMemory, row->body->iov_len, + "region", "xrow body"); + memcpy(new_base, row->body->iov_base, row->body->iov_len); + /* Adjust row body pointers. */ + row->body->iov_base = new_base; + } + stailq_add_tail(rows, &tx_row->next); + + } while (!stailq_last_entry(rows, struct applier_tx_row, + next)->row.is_commit); +} + +/** + * Apply all rows in the rows queue as a single transaction. + * + * Return 0 for success or -1 in case of an error. + */ +static int +applier_apply_tx(struct stailq *rows) +{ + int res = 0; + struct txn *txn = txn_begin(false); + struct applier_tx_row *item; + if (txn == NULL) + diag_raise(); + stailq_foreach_entry(item, rows, next) { + struct xrow_header *row = &item->row; + res = apply_row(row); + if (res != 0) { + struct error *e = diag_last_error(diag_get()); + /* + * In case of ER_TUPLE_FOUND error and enabled + * replication_skip_conflict configuration + * option, skip applying the foreign row and + * replace it with NOP in the local write ahead + * log. + */ + if (e->type == &type_ClientError && + box_error_code(e) == ER_TUPLE_FOUND && + replication_skip_conflict) { + diag_clear(diag_get()); + row->type = IPROTO_NOP; + row->bodycnt = 0; + res = apply_row(row); + } + } + if (res != 0) + break; + } + if (res == 0) + res = txn_commit(txn); + else + txn_rollback(); + return res; +} + /** * Execute and process SUBSCRIBE request (follow updates from a master). */ @@ -555,36 +707,14 @@ applier_subscribe(struct applier *applier) applier_set_state(applier, APPLIER_FOLLOW); } - /* - * Tarantool < 1.7.7 does not send periodic heartbeat - * messages so we can't assume that if we haven't heard - * from the master for quite a while the connection is - * broken - the master might just be idle. - */ - if (applier->version_id < version_id(1, 7, 7)) { - coio_read_xrow(coio, ibuf, &row); - } else { - double timeout = replication_disconnect_timeout(); - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout); - } - - if (iproto_type_is_error(row.type)) - xrow_decode_error_xc(&row); /* error */ - /* Replication request. */ - if (row.replica_id == REPLICA_ID_NIL || - row.replica_id >= VCLOCK_MAX) { - /* - * A safety net, this can only occur - * if we're fed a strangely broken xlog. - */ - tnt_raise(ClientError, ER_UNKNOWN_REPLICA, - int2str(row.replica_id), - tt_uuid_str(&REPLICASET_UUID)); - } + struct stailq rows; + applier_read_tx(applier, &rows); - applier->lag = ev_now(loop()) - row.tm; + struct xrow_header *first_row = + &stailq_first_entry(&rows, struct applier_tx_row, + next)->row; applier->last_row_time = ev_monotonic_now(loop()); - struct replica *replica = replica_by_id(row.replica_id); + struct replica *replica = replica_by_id(first_row->replica_id); struct latch *latch = (replica ? &replica->order_latch : &replicaset.applier.order_latch); /* @@ -594,33 +724,11 @@ applier_subscribe(struct applier *applier) * that belong to the same server id. */ latch_lock(latch); - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) { - int res = apply_row(&row); - if (res != 0) { - struct error *e = diag_last_error(diag_get()); - /* - * In case of ER_TUPLE_FOUND error and enabled - * replication_skip_conflict configuration - * option, skip applying the foreign row and - * replace it with NOP in the local write ahead - * log. - */ - if (e->type == &type_ClientError && - box_error_code(e) == ER_TUPLE_FOUND && - replication_skip_conflict) { - diag_clear(diag_get()); - struct xrow_header nop; - nop.type = IPROTO_NOP; - nop.bodycnt = 0; - nop.replica_id = row.replica_id; - nop.lsn = row.lsn; - res = apply_row(&nop); - } - } - if (res != 0) { - latch_unlock(latch); - diag_raise(); - } + if (vclock_get(&replicaset.vclock, first_row->replica_id) < + first_row->lsn && + applier_apply_tx(&rows) != 0) { + latch_unlock(latch); + diag_raise(); } latch_unlock(latch); diff --git a/test/replication/transaction.result b/test/replication/transaction.result new file mode 100644 index 000000000..8c2ac6ee4 --- /dev/null +++ b/test/replication/transaction.result @@ -0,0 +1,242 @@ +env = require('test_run') +--- +... +test_run = env.new() +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) +--- +... +_ = s:create_index('pk') +--- +... +-- transaction w/o conflict +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() +--- +... +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +--- +- true +... +test_run:cmd("start server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +-- insert a conflicting row +box.space.test:replace({4, 'r'}) +--- +- [4, 'r'] +... +v1 = box.info.vclock +--- +... +test_run:cmd("switch default") +--- +- true +... +-- create a two-row transaction with conflicting second +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit() +--- +... +-- create a third transaction +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- nothing was applied +v1[1] == box.info.vclock[1] +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [4, 'r'] +... +-- check replication status +box.info.replication[1].upstream.status +--- +- stopped +... +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'pk' in space 'test' +... +-- set conflict to third transaction +_ = box.space.test:delete({4}) +--- +... +box.space.test:replace({6, 'r'}) +--- +- [6, 'r'] +... +-- restart replication +replication = box.cfg.replication +--- +... +box.cfg{replication = {}} +--- +... +box.cfg{replication = replication} +--- +... +-- replication stopped of third transaction +v1[1] + 2 == box.info.vclock[1] +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [6, 'r'] +... +-- check replication status +box.info.replication[1].upstream.status +--- +- stopped +... +box.info.replication[1].upstream.message +--- +- Duplicate key exists in unique index 'pk' in space 'test' +... +-- check restart does not help +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [6, 'r'] +... +-- set skip conflict rows and check that non-conflicting were applied +replication = box.cfg.replication +--- +... +box.cfg{replication = {}, replication_skip_conflict = true} +--- +... +box.cfg{replication = replication} +--- +... +-- check last transaction applied without conflicting row +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] +... +box.info.replication[1].upstream.status +--- +- follow +... +-- make some new conflicting rows with skip-conflicts +box.space.test:replace({8, 'r'}) +--- +- [8, 'r'] +... +box.space.test:replace({9, 'r'}) +--- +- [9, 'r'] +... +-- issue a conflicting tx +test_run:cmd("switch default") +--- +- true +... +box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit() +--- +... +test_run:cmd("switch replica") +--- +- true +... +-- vclock should be increased but rows skipped +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] + - [8, 'r'] + - [9, 'r'] +... +-- check restart does not change something +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("restart server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test:select() +--- +- - [1, 'm'] + - [2, 'm'] + - [3, 'm'] + - [4, 'm'] + - [5, 'm'] + - [6, 'r'] + - [7, 'm'] + - [8, 'r'] + - [9, 'r'] +... +box.info.replication[1].upstream.status +--- +- follow +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... +box.schema.user.revoke('guest', 'replication') +--- +... +s:drop() +--- +... diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua new file mode 100644 index 000000000..f25a4737d --- /dev/null +++ b/test/replication/transaction.test.lua @@ -0,0 +1,86 @@ +env = require('test_run') +test_run = env.new() +box.schema.user.grant('guest', 'replication') + +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) +_ = s:create_index('pk') + +-- transaction w/o conflict +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit() + +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("start server replica") +test_run:cmd("switch replica") + +-- insert a conflicting row +box.space.test:replace({4, 'r'}) +v1 = box.info.vclock + +test_run:cmd("switch default") +-- create a two-row transaction with conflicting second +box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit() +-- create a third transaction +box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit() + +test_run:cmd("switch replica") +-- nothing was applied +v1[1] == box.info.vclock[1] +box.space.test:select() +-- check replication status +box.info.replication[1].upstream.status +box.info.replication[1].upstream.message +-- set conflict to third transaction +_ = box.space.test:delete({4}) +box.space.test:replace({6, 'r'}) +-- restart replication +replication = box.cfg.replication +box.cfg{replication = {}} +box.cfg{replication = replication} +-- replication stopped of third transaction +v1[1] + 2 == box.info.vclock[1] +box.space.test:select() +-- check replication status +box.info.replication[1].upstream.status +box.info.replication[1].upstream.message + +-- check restart does not help +test_run:cmd("switch default") +test_run:cmd("restart server replica") +test_run:cmd("switch replica") + +box.space.test:select() +-- set skip conflict rows and check that non-conflicting were applied +replication = box.cfg.replication +box.cfg{replication = {}, replication_skip_conflict = true} +box.cfg{replication = replication} + +-- check last transaction applied without conflicting row +box.space.test:select() +box.info.replication[1].upstream.status + +-- make some new conflicting rows with skip-conflicts +box.space.test:replace({8, 'r'}) +box.space.test:replace({9, 'r'}) + +-- issue a conflicting tx +test_run:cmd("switch default") +box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit() + +test_run:cmd("switch replica") +-- vclock should be increased but rows skipped +box.space.test:select() + +-- check restart does not change something +test_run:cmd("switch default") +test_run:cmd("restart server replica") +test_run:cmd("switch replica") + +box.space.test:select() +box.info.replication[1].upstream.status + +test_run:cmd("switch default") +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") + +box.schema.user.revoke('guest', 'replication') +s:drop() -- 2.21.0