From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp16.mail.ru (smtp16.mail.ru [94.100.176.153]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 2CDAE469710 for ; Mon, 18 May 2020 15:24:26 +0300 (MSK) From: Serge Petrenko Date: Mon, 18 May 2020 15:24:05 +0300 Message-Id: <5428970e5b1574f456870fd7dd69ef67b46383c5.1589804001.git.sergepetrenko@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: v.shpilevoy@tarantool.org, gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org Make relay accumulate all the tx rows coming from recovery into a linked list and send them as a batch. This enables relay to make transaction-wide manipulations before sending rows to replica. Add one such manipulation right away: when a transaction ends with a local row, the local row has is_commit flag set to true. However, local rows are not replicated, so the replica never gets an is_commit marker and yields an error `ER_UNSUPPORTED: replication does not support interleaving transactions`, breaking replication. Now relay is assigns is_commit = true to the last global row it's about to send. Closes #4928 --- src/box/relay.cc | 98 ++++++++++--- test/replication/gh-4928-tx-boundaries.result | 132 ++++++++++++++++++ .../gh-4928-tx-boundaries.test.lua | 61 ++++++++ test/replication/suite.cfg | 1 + 4 files changed, 275 insertions(+), 17 deletions(-) create mode 100644 test/replication/gh-4928-tx-boundaries.result create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua diff --git a/src/box/relay.cc b/src/box/relay.cc index 2ad02cb8a..17b7bc667 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -177,7 +177,7 @@ relay_send(struct relay *relay, struct xrow_header *packet); static void relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row); static void -relay_send_row(struct xstream *stream, struct xrow_header *row); +relay_collect_tx(struct xstream *stream, struct xrow_header *row); struct relay * relay_new(struct replica *replica) @@ -355,7 +355,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, if (relay == NULL) diag_raise(); - relay_start(relay, fd, sync, relay_send_row); + relay_start(relay, fd, sync, relay_collect_tx); auto relay_guard = make_scoped_guard([=] { relay_stop(relay); relay_delete(relay); @@ -705,7 +705,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, diag_raise(); } - relay_start(relay, fd, sync, relay_send_row); + relay_start(relay, fd, sync, relay_collect_tx); auto relay_guard = make_scoped_guard([=] { relay_stop(relay); replica_on_relay_stop(replica); @@ -754,11 +754,39 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row) relay_send(relay, row); } -/** Send a single row to the client. */ +struct relay_tx_row { + struct rlist link; + struct xrow_header row; +}; + static void -relay_send_row(struct xstream *stream, struct xrow_header *packet) +relay_send_tx(struct relay *relay, struct rlist *tx_rows) +{ + ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); + + struct relay_tx_row *tx_row; + rlist_foreach_entry(tx_row, tx_rows, link) { + tx_row->row.sync = relay->sync; + coio_write_xrow(&relay->io, &tx_row->row); + } + relay->last_row_time = ev_monotonic_now(loop()); + + struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); + if (inj != NULL && inj->dparam > 0) + fiber_sleep(inj->dparam); +} + +/** + * Collect all the rows belonging to a single transaction and + * send them at once. + */ +static void +relay_collect_tx(struct xstream *stream, struct xrow_header *packet) { struct relay *relay = container_of(stream, struct relay, stream); + static RLIST_HEAD(tx_rows); + struct errinj *inj; + struct relay_tx_row *tx_row; assert(iproto_type_is_dml(packet->type)); if (packet->group_id == GROUP_LOCAL) { /* @@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) * order to correctly promote the vclock on the * replica. */ - if (packet->replica_id == REPLICA_ID_NIL) + if (packet->replica_id == REPLICA_ID_NIL) { + /* + * Make sure is_commit flag from the + * local row makes it to the replica, + * in case the transaction is not fully + * local. + */ + if (packet->is_commit && !rlist_empty(&tx_rows)) { + rlist_last_entry(&tx_rows, struct relay_tx_row, + link)->row.is_commit = true; + goto write; + } return; + } packet->type = IPROTO_NOP; packet->group_id = GROUP_DEFAULT; packet->bodycnt = 0; @@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) * it). In the latter case packet's LSN is less than or equal to * local master's LSN at the moment it received 'SUBSCRIBE' request. */ - if (relay->replica == NULL || - packet->replica_id != relay->replica->id || - packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe, - packet->replica_id)) { - struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN, - ERRINJ_INT); - if (inj != NULL && packet->lsn == inj->iparam) { - packet->lsn = inj->iparam - 1; - say_warn("injected broken lsn: %lld", - (long long) packet->lsn); - } + if (relay->replica != NULL && packet->replica_id == relay->replica->id && + packet->lsn > vclock_get(&relay->local_vclock_at_subscribe, + packet->replica_id)) { + return; + } + + inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT); + if (inj != NULL && packet->lsn == inj->iparam) { + packet->lsn = inj->iparam - 1; + say_warn("injected broken lsn: %lld", + (long long) packet->lsn); + } + + /* A short path for single-statement transactions. */ + if (packet->is_commit && rlist_empty(&tx_rows)) { relay_send(relay, packet); + return; + } + + tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc, + sizeof(*tx_row)); + if (tx_row == NULL) { + tnt_raise(OutOfMemory, sizeof(*tx_row), "region", + "struct relay_tx_row"); + } + tx_row->row = *packet; + rlist_add_tail_entry(&tx_rows, tx_row, link); + + if (packet->is_commit) { +write: relay_send_tx(relay, &tx_rows); + tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); + /* + * Free all the relay_tx_rows allocated on the + * fiber region. + */ + fiber_gc(); } } diff --git a/test/replication/gh-4928-tx-boundaries.result b/test/replication/gh-4928-tx-boundaries.result new file mode 100644 index 000000000..89b569e65 --- /dev/null +++ b/test/replication/gh-4928-tx-boundaries.result @@ -0,0 +1,132 @@ +-- test-run result file version 2 +-- gh-4928. Test that transactions mixing local and global +-- space operations are replicated correctly. +env = require('test_run') + | --- + | ... +test_run = env.new() + | --- + | ... +bit = require('bit') + | --- + | ... + +-- Init. +box.schema.user.grant('guest', 'replication') + | --- + | ... +box.schema.space.create('glob') + | --- + | - engine: memtx + | before_replace: 'function: 0x010392fcb0' + | on_replace: 'function: 0x010392fc78' + | ck_constraint: [] + | field_count: 0 + | temporary: false + | index: [] + | is_local: false + | enabled: false + | name: glob + | id: 512 + | - created + | ... +box.schema.space.create('loc', {is_local=true}) + | --- + | - engine: memtx + | before_replace: 'function: 0x010383f3b0' + | on_replace: 'function: 0x01033dcad8' + | ck_constraint: [] + | field_count: 0 + | temporary: false + | index: [] + | is_local: true + | enabled: false + | name: loc + | id: 513 + | - created + | ... +box.space.glob:create_index('pk') + | --- + | - unique: true + | parts: + | - type: unsigned + | is_nullable: false + | fieldno: 1 + | id: 0 + | space_id: 512 + | type: TREE + | name: pk + | ... +box.space.loc:create_index('pk') + | --- + | - unique: true + | parts: + | - type: unsigned + | is_nullable: false + | fieldno: 1 + | id: 0 + | space_id: 513 + | type: TREE + | name: pk + | ... + +function gen_mixed_tx(i)\ + box.begin()\ + if bit.band(i, 1) ~= 0 then\ + box.space.glob:insert{10 * i + 1}\ + else\ + box.space.loc:insert{10 * i + 1}\ + end\ + if bit.band(i, 2) ~= 0 then\ + box.space.glob:insert{10 * i + 2}\ + else\ + box.space.loc:insert{10 * i + 2}\ + end\ + if bit.band(i, 4) ~= 0 then\ + box.space.glob:insert{10 * i + 3}\ + else\ + box.space.loc:insert{10 * i + 3}\ + end\ + box.commit()\ +end + | --- + | ... + +test_run:cmd("create server replica with rpl_master=default,\ + script='replication/replica.lua'") + | --- + | - true + | ... +test_run:cmd('start server replica') + | --- + | - true + | ... +test_run:wait_downstream(2, {status='follow'}) + | --- + | - true + | ... + +for i = 0, 7 do gen_mixed_tx(i) end + | --- + | ... + +box.info.replication[2].status + | --- + | - null + | ... + +vclock = box.info.vclock + | --- + | ... +vclock[0] = nil + | --- + | ... +test_run:wait_vclock("replica", vclock) + | + +test_run:cmd('switch replica') + | [Lost current connection] + +box.info.status + | [Lost current connection] +box.info.replication[1].upstream.status diff --git a/test/replication/gh-4928-tx-boundaries.test.lua b/test/replication/gh-4928-tx-boundaries.test.lua new file mode 100644 index 000000000..07599c27f --- /dev/null +++ b/test/replication/gh-4928-tx-boundaries.test.lua @@ -0,0 +1,61 @@ +-- gh-4928. Test that transactions mixing local and global +-- space operations are replicated correctly. +env = require('test_run') +test_run = env.new() +bit = require('bit') + +-- Init. +box.schema.user.grant('guest', 'replication') +_ = box.schema.space.create('glob') +_ = box.schema.space.create('loc', {is_local=true}) +_ = box.space.glob:create_index('pk') +_ = box.space.loc:create_index('pk') + +function gen_mixed_tx(i)\ + box.begin()\ + if bit.band(i, 1) ~= 0 then\ + box.space.glob:insert{10 * i + 1}\ + else\ + box.space.loc:insert{10 * i + 1}\ + end\ + if bit.band(i, 2) ~= 0 then\ + box.space.glob:insert{10 * i + 2}\ + else\ + box.space.loc:insert{10 * i + 2}\ + end\ + if bit.band(i, 4) ~= 0 then\ + box.space.glob:insert{10 * i + 3}\ + else\ + box.space.loc:insert{10 * i + 3}\ + end\ + box.commit()\ +end + +test_run:cmd("create server replica with rpl_master=default,\ + script='replication/replica.lua'") +test_run:cmd('start server replica') +test_run:wait_downstream(2, {status='follow'}) + +for i = 0, 7 do gen_mixed_tx(i) end + +box.info.replication[2].status + +vclock = box.info.vclock +vclock[0] = nil +test_run:wait_vclock("replica", vclock) + +test_run:cmd('switch replica') + +box.info.status +box.info.replication[1].upstream.status + +box.space.test:select{} + +test_run:cmd('switch default') + +-- Cleanup. +test_run:cmd('stop server replica') +test_run:cmd('delete server replica') +box.schema.user.revoke('guest', 'replication') +box.space.loc:drop() +box.space.glob:drop() diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index d2743b5ed..f357b07da 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -18,6 +18,7 @@ "gh-4606-admin-creds.test.lua": {}, "gh-4739-vclock-assert.test.lua": {}, "gh-4730-applier-rollback.test.lua": {}, + "gh-4928-tx-boundaries.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} -- 2.24.2 (Apple Git-127)