From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp41.i.mail.ru (smtp41.i.mail.ru [94.100.177.101]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 52DA0469710 for ; Mon, 18 May 2020 15:28:47 +0300 (MSK) References: <5428970e5b1574f456870fd7dd69ef67b46383c5.1589804001.git.sergepetrenko@tarantool.org> From: Serge Petrenko Message-ID: <1b20e028-c6a8-a717-adfb-1737af36cf65@tarantool.org> Date: Mon, 18 May 2020 15:28:46 +0300 MIME-Version: 1.0 In-Reply-To: <5428970e5b1574f456870fd7dd69ef67b46383c5.1589804001.git.sergepetrenko@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-GB Subject: Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: v.shpilevoy@tarantool.org, gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org 18.05.2020 15:24, Serge Petrenko пишет: > Make relay accumulate all the tx rows coming from recovery into a linked > list and send them as a batch. > > This enables relay to make transaction-wide manipulations before sending > rows to replica. > > Add one such manipulation right away: when a transaction ends with a > local row, the local row has is_commit flag set to true. However, local > rows are not replicated, so the replica never gets an is_commit marker > and yields an error `ER_UNSUPPORTED: replication does not support > interleaving transactions`, breaking replication. > > Now relay is assigns is_commit = true to the last global row it's > about to send. > > Closes #4928 > --- > src/box/relay.cc | 98 ++++++++++--- > test/replication/gh-4928-tx-boundaries.result | 132 ++++++++++++++++++ > .../gh-4928-tx-boundaries.test.lua | 61 ++++++++ > test/replication/suite.cfg | 1 + > 4 files changed, 275 insertions(+), 17 deletions(-) > create mode 100644 test/replication/gh-4928-tx-boundaries.result > create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua Sorry for the mess, the correct patch is below:  src/box/relay.cc                              |  98 ++++++++++---  test/replication/gh-4928-tx-boundaries.result | 138 ++++++++++++++++++  .../gh-4928-tx-boundaries.test.lua            |  61 ++++++++  test/replication/suite.cfg                    |   1 +  4 files changed, 281 insertions(+), 17 deletions(-)  create mode 100644 test/replication/gh-4928-tx-boundaries.result  create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua diff --git a/src/box/relay.cc b/src/box/relay.cc index 2ad02cb8a..17b7bc667 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -177,7 +177,7 @@ relay_send(struct relay *relay, struct xrow_header *packet);  static void  relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row);  static void -relay_send_row(struct xstream *stream, struct xrow_header *row); +relay_collect_tx(struct xstream *stream, struct xrow_header *row);  struct relay *  relay_new(struct replica *replica) @@ -355,7 +355,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,      if (relay == NULL)          diag_raise(); -    relay_start(relay, fd, sync, relay_send_row); +    relay_start(relay, fd, sync, relay_collect_tx);      auto relay_guard = make_scoped_guard([=] {          relay_stop(relay);          relay_delete(relay); @@ -705,7 +705,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,              diag_raise();      } -    relay_start(relay, fd, sync, relay_send_row); +    relay_start(relay, fd, sync, relay_collect_tx);      auto relay_guard = make_scoped_guard([=] {          relay_stop(relay);          replica_on_relay_stop(replica); @@ -754,11 +754,39 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)          relay_send(relay, row);  } -/** Send a single row to the client. */ +struct relay_tx_row { +    struct rlist link; +    struct xrow_header row; +}; +  static void -relay_send_row(struct xstream *stream, struct xrow_header *packet) +relay_send_tx(struct relay *relay, struct rlist *tx_rows) +{ +    ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); + +    struct relay_tx_row *tx_row; +    rlist_foreach_entry(tx_row, tx_rows, link) { +        tx_row->row.sync = relay->sync; +        coio_write_xrow(&relay->io, &tx_row->row); +    } +    relay->last_row_time = ev_monotonic_now(loop()); + +    struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); +    if (inj != NULL && inj->dparam > 0) +        fiber_sleep(inj->dparam); +} + +/** + * Collect all the rows belonging to a single transaction and + * send them at once. + */ +static void +relay_collect_tx(struct xstream *stream, struct xrow_header *packet)  {      struct relay *relay = container_of(stream, struct relay, stream); +    static RLIST_HEAD(tx_rows); +    struct errinj *inj; +    struct relay_tx_row *tx_row;      assert(iproto_type_is_dml(packet->type));      if (packet->group_id == GROUP_LOCAL) {          /* @@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)           * order to correctly promote the vclock on the           * replica.           */ -        if (packet->replica_id == REPLICA_ID_NIL) +        if (packet->replica_id == REPLICA_ID_NIL) { +            /* +             * Make sure is_commit flag from the +             * local row makes it to the replica, +             * in case the transaction is not fully +             * local. +             */ +            if (packet->is_commit && !rlist_empty(&tx_rows)) { +                rlist_last_entry(&tx_rows, struct relay_tx_row, +                         link)->row.is_commit = true; +                goto write; +            }              return; +        }          packet->type = IPROTO_NOP;          packet->group_id = GROUP_DEFAULT;          packet->bodycnt = 0; @@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)       * it). In the latter case packet's LSN is less than or equal to       * local master's LSN at the moment it received 'SUBSCRIBE' request.       */ -    if (relay->replica == NULL || -        packet->replica_id != relay->replica->id || -        packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe, -                      packet->replica_id)) { -        struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN, -                        ERRINJ_INT); -        if (inj != NULL && packet->lsn == inj->iparam) { -            packet->lsn = inj->iparam - 1; -            say_warn("injected broken lsn: %lld", -                 (long long) packet->lsn); -        } +    if (relay->replica != NULL && packet->replica_id == relay->replica->id && +        packet->lsn > vclock_get(&relay->local_vclock_at_subscribe, +                     packet->replica_id)) { +        return; +    } + +    inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT); +    if (inj != NULL && packet->lsn == inj->iparam) { +        packet->lsn = inj->iparam - 1; +        say_warn("injected broken lsn: %lld", +             (long long) packet->lsn); +    } + +    /* A short path for single-statement transactions. */ +    if (packet->is_commit && rlist_empty(&tx_rows)) {          relay_send(relay, packet); +        return; +    } + +    tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc, +                             sizeof(*tx_row)); +    if (tx_row == NULL) { +        tnt_raise(OutOfMemory, sizeof(*tx_row), "region", +              "struct relay_tx_row"); +    } +    tx_row->row = *packet; +    rlist_add_tail_entry(&tx_rows, tx_row, link); + +    if (packet->is_commit) { +write:        relay_send_tx(relay, &tx_rows); +        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); +        /* +         * Free all the relay_tx_rows allocated on the +         * fiber region. +         */ +        fiber_gc();      }  } diff --git a/test/replication/gh-4928-tx-boundaries.result b/test/replication/gh-4928-tx-boundaries.result new file mode 100644 index 000000000..969bd8438 --- /dev/null +++ b/test/replication/gh-4928-tx-boundaries.result @@ -0,0 +1,138 @@ +-- test-run result file version 2 +-- gh-4928. Test that transactions mixing local and global +-- space operations are replicated correctly. +env = require('test_run') + | --- + | ... +test_run = env.new() + | --- + | ... +bit = require('bit') + | --- + | ... + +-- Init. +box.schema.user.grant('guest', 'replication') + | --- + | ... +_ = box.schema.space.create('glob') + | --- + | ... +_ = box.schema.space.create('loc', {is_local=true}) + | --- + | ... +_ = box.space.glob:create_index('pk') + | --- + | ... +_ = box.space.loc:create_index('pk') + | --- + | ... + +function gen_mixed_tx(i)\ +    box.begin()\ +    if bit.band(i, 1) ~= 0 then\ +        box.space.glob:insert{10 * i + 1}\ +    else\ +        box.space.loc:insert{10 * i + 1}\ +    end\ +    if bit.band(i, 2) ~= 0 then\ +        box.space.glob:insert{10 * i + 2}\ +    else\ +        box.space.loc:insert{10 * i + 2}\ +    end\ +    if bit.band(i, 4) ~= 0 then\ +        box.space.glob:insert{10 * i + 3}\ +    else\ +        box.space.loc:insert{10 * i + 3}\ +    end\ +    box.commit()\ +end + | --- + | ... + +test_run:cmd("create server replica with rpl_master=default,\ +             script='replication/replica.lua'") + | --- + | - true + | ... +test_run:cmd('start server replica') + | --- + | - true + | ... +test_run:wait_downstream(2, {status='follow'}) + | --- + | - true + | ... + +for i = 0, 7 do gen_mixed_tx(i) end + | --- + | ... + +box.info.replication[2].status + | --- + | - null + | ... + +vclock = box.info.vclock + | --- + | ... +vclock[0] = nil + | --- + | ... +test_run:wait_vclock("replica", vclock) + | --- + | ... + +test_run:cmd('switch replica') + | --- + | - true + | ... + +box.info.status + | --- + | - running + | ... +box.info.replication[1].upstream.status + | --- + | - follow + | ... + +box.space.glob:select{} + | --- + | - - [11] + |   - [22] + |   - [31] + |   - [32] + |   - [43] + |   - [51] + |   - [53] + |   - [62] + |   - [63] + |   - [71] + |   - [72] + |   - [73] + | ... + +test_run:cmd('switch default') + | --- + | - true + | ... + +-- Cleanup. +test_run:cmd('stop server replica') + | --- + | - true + | ... +test_run:cmd('delete server replica') + | --- + | - true + | ... +box.schema.user.revoke('guest', 'replication') + | --- + | ... +box.space.loc:drop() + | --- + | ... +box.space.glob:drop() + | --- + | ... diff --git a/test/replication/gh-4928-tx-boundaries.test.lua b/test/replication/gh-4928-tx-boundaries.test.lua new file mode 100644 index 000000000..92526fc51 --- /dev/null +++ b/test/replication/gh-4928-tx-boundaries.test.lua @@ -0,0 +1,61 @@ +-- gh-4928. Test that transactions mixing local and global +-- space operations are replicated correctly. +env = require('test_run') +test_run = env.new() +bit = require('bit') + +-- Init. +box.schema.user.grant('guest', 'replication') +_ = box.schema.space.create('glob') +_ = box.schema.space.create('loc', {is_local=true}) +_ = box.space.glob:create_index('pk') +_ = box.space.loc:create_index('pk') + +function gen_mixed_tx(i)\ +    box.begin()\ +    if bit.band(i, 1) ~= 0 then\ +        box.space.glob:insert{10 * i + 1}\ +    else\ +        box.space.loc:insert{10 * i + 1}\ +    end\ +    if bit.band(i, 2) ~= 0 then\ +        box.space.glob:insert{10 * i + 2}\ +    else\ +        box.space.loc:insert{10 * i + 2}\ +    end\ +    if bit.band(i, 4) ~= 0 then\ +        box.space.glob:insert{10 * i + 3}\ +    else\ +        box.space.loc:insert{10 * i + 3}\ +    end\ +    box.commit()\ +end + +test_run:cmd("create server replica with rpl_master=default,\ +             script='replication/replica.lua'") +test_run:cmd('start server replica') +test_run:wait_downstream(2, {status='follow'}) + +for i = 0, 7 do gen_mixed_tx(i) end + +box.info.replication[2].status + +vclock = box.info.vclock +vclock[0] = nil +test_run:wait_vclock("replica", vclock) + +test_run:cmd('switch replica') + +box.info.status +box.info.replication[1].upstream.status + +box.space.glob:select{} + +test_run:cmd('switch default') + +-- Cleanup. +test_run:cmd('stop server replica') +test_run:cmd('delete server replica') +box.schema.user.revoke('guest', 'replication') +box.space.loc:drop() +box.space.glob:drop() diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index d2743b5ed..f357b07da 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -18,6 +18,7 @@      "gh-4606-admin-creds.test.lua": {},      "gh-4739-vclock-assert.test.lua": {},      "gh-4730-applier-rollback.test.lua": {}, +    "gh-4928-tx-boundaries.test.lua": {},      "*": {          "memtx": {"engine": "memtx"},          "vinyl": {"engine": "vinyl"} -- 2.24.2 (Apple Git-127) -- Serge Petrenko