* [Tarantool-patches] [PATCH 0/2] fix replication tx boundaries after local space rework @ 2020-05-18 12:24 Serge Petrenko 2020-05-18 12:24 ` [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries Serge Petrenko 2020-05-18 12:24 ` [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches Serge Petrenko 0 siblings, 2 replies; 11+ messages in thread From: Serge Petrenko @ 2020-05-18 12:24 UTC (permalink / raw) To: v.shpilevoy, gorcunov; +Cc: tarantool-patches https://github.com/tarantool/tarantool/issues/4928 https://github.com/tarantool/tarantool/tree/sp/gh-4928-tx-boundary-fix The patchset fixes 2 errors in replication resulting from trying to replicate transactions mixing local and global space requests. The first patch fixes an error when a local row is the first in tx and the second patch fixes an error when a local row is the last one in tx. Serge Petrenko (2): wal: fix tx boundaries replication: make relay send txs in batches src/box/relay.cc | 98 ++++++++++--- src/box/wal.c | 26 +++- test/replication/gh-4928-tx-boundaries.result | 132 ++++++++++++++++++ .../gh-4928-tx-boundaries.test.lua | 61 ++++++++ test/replication/suite.cfg | 1 + 5 files changed, 297 insertions(+), 21 deletions(-) create mode 100644 test/replication/gh-4928-tx-boundaries.result create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua -- 2.24.2 (Apple Git-127) ^ permalink raw reply [flat|nested] 11+ messages in thread
* [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries 2020-05-18 12:24 [Tarantool-patches] [PATCH 0/2] fix replication tx boundaries after local space rework Serge Petrenko @ 2020-05-18 12:24 ` Serge Petrenko 2020-05-19 9:08 ` Cyrill Gorcunov 2020-05-18 12:24 ` [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches Serge Petrenko 1 sibling, 1 reply; 11+ messages in thread From: Serge Petrenko @ 2020-05-18 12:24 UTC (permalink / raw) To: v.shpilevoy, gorcunov; +Cc: tarantool-patches In order to preserve transaction boundaries in replication protocol, wal assigns each tx row a transaction sequence number (tsn). Tsn is equal to the lsn of the first transaction row. Starting with commit 7eb4650eecf1ac382119d0038076c19b2708f4a1, local space requests are assigned a special replica id, 0, and have their own lsns. These operations are not replicated. If a transaction starting with a local space operation ends up in the WAL, it gets a tsn equal to the lsn of the local space request. Then, during replication, when such a transaction is replicated, the local space request is omitted, and replica receives a global part of the transaction with a seemingly random tsn, yielding an ER_PROTOCOL error: "Transaction id must be equal to LSN of the first row in the transaction". Assign tsn as equal to the lsn of the first global row in the transaction to fix the problem, and assign tsn as before for fully local transactions. Follow-up #4114 Part-of #4928 --- src/box/wal.c | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/src/box/wal.c b/src/box/wal.c index b979244e3..d36d4259e 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -956,24 +956,33 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, struct xrow_header **end) { int64_t tsn = 0; + struct xrow_header **start = row; + struct xrow_header **first_glob_row = end; /** Assign LSN to all local rows. */ for ( ; row < end; row++) { if ((*row)->replica_id == 0) { /* * All rows representing local space data - * manipulations are signed wth a zero + * manipulations are signed with a zero * instance id. This is also true for * anonymous replicas, since they are * only capable of writing to local and * temporary spaces. */ - if ((*row)->group_id != GROUP_LOCAL) + if ((*row)->group_id != GROUP_LOCAL) { (*row)->replica_id = instance_id; + } (*row)->lsn = vclock_inc(vclock_diff, (*row)->replica_id) + vclock_get(base, (*row)->replica_id); - /* Use lsn of the first local row as transaction id. */ - tsn = tsn == 0 ? (*row)->lsn : tsn; + /* + * Use lsn of the first global row as + * transaction id. + */ + if ((*row)->group_id != GROUP_LOCAL && tsn == 0) { + tsn = (*row)->lsn; + first_glob_row = row; + } (*row)->tsn = tsn; (*row)->is_commit = row == end - 1; } else { @@ -993,6 +1002,15 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, } } } + if (tsn == 0) + tsn = (*start)->lsn; + /* + * Fill transaction id for all the local rows preceding + * the first global row. tsn was yet unknown when those + * rows were processed. + */ + for (row = start; row < first_glob_row; row++) + (*row)->tsn = tsn; } static void -- 2.24.2 (Apple Git-127) ^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries 2020-05-18 12:24 ` [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries Serge Petrenko @ 2020-05-19 9:08 ` Cyrill Gorcunov 2020-05-19 10:30 ` Serge Petrenko 0 siblings, 1 reply; 11+ messages in thread From: Cyrill Gorcunov @ 2020-05-19 9:08 UTC (permalink / raw) To: Serge Petrenko; +Cc: tarantool-patches, v.shpilevoy On Mon, May 18, 2020 at 03:24:04PM +0300, Serge Petrenko wrote: ... > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -956,24 +956,33 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, > struct xrow_header **end) > { > int64_t tsn = 0; > + struct xrow_header **start = row; > + struct xrow_header **first_glob_row = end; > /** Assign LSN to all local rows. */ > for ( ; row < end; row++) { > if ((*row)->replica_id == 0) { > /* > * All rows representing local space data > - * manipulations are signed wth a zero > + * manipulations are signed with a zero > * instance id. This is also true for > * anonymous replicas, since they are > * only capable of writing to local and > * temporary spaces. > */ > - if ((*row)->group_id != GROUP_LOCAL) > + if ((*row)->group_id != GROUP_LOCAL) { > (*row)->replica_id = instance_id; > + } > > (*row)->lsn = vclock_inc(vclock_diff, (*row)->replica_id) + > vclock_get(base, (*row)->replica_id); > - /* Use lsn of the first local row as transaction id. */ > - tsn = tsn == 0 ? (*row)->lsn : tsn; > + /* > + * Use lsn of the first global row as > + * transaction id. > + */ > + if ((*row)->group_id != GROUP_LOCAL && tsn == 0) { > + tsn = (*row)->lsn; > + first_glob_row = row; > + } > (*row)->tsn = tsn; 1) ^^^ > (*row)->is_commit = row == end - 1; > } else { > @@ -993,6 +1002,15 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, > } > } > } > + if (tsn == 0) > + tsn = (*start)->lsn; > + /* > + * Fill transaction id for all the local rows preceding > + * the first global row. tsn was yet unknown when those > + * rows were processed. > + */ > + for (row = start; row < first_glob_row; row++) > + (*row)->tsn = tsn; > } Wait, the chunk above -- lets assume we've all rows in GROUP_LOCAL thus in (1) we already set this member to 0 and now we re-walk the entries again and assign lsn to the first row's lsn. This is ugly as hell, if only I didn't miss something obvious. Can't we do something like below to eliminate needless rewalk over all rows? Do I make a mistake? --- diff --git a/src/box/wal.c b/src/box/wal.c index d36d4259e..ef4d84920 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -957,7 +957,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, { int64_t tsn = 0; struct xrow_header **start = row; - struct xrow_header **first_glob_row = end; + struct xrow_header **first_glob_row = row; /** Assign LSN to all local rows. */ for ( ; row < end; row++) { if ((*row)->replica_id == 0) { @@ -981,9 +981,12 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, */ if ((*row)->group_id != GROUP_LOCAL && tsn == 0) { tsn = (*row)->lsn; + /* + * Remember the tail being processed. + */ first_glob_row = row; } - (*row)->tsn = tsn; + (*row)->tsn = tsn == 0 ? (*start)->lsn : tsn; (*row)->is_commit = row == end - 1; } else { int64_t diff = (*row)->lsn - vclock_get(base, (*row)->replica_id); @@ -1002,8 +1005,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, } } } - if (tsn == 0) - tsn = (*start)->lsn; + /* * Fill transaction id for all the local rows preceding * the first global row. tsn was yet unknown when those ^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries 2020-05-19 9:08 ` Cyrill Gorcunov @ 2020-05-19 10:30 ` Serge Petrenko 0 siblings, 0 replies; 11+ messages in thread From: Serge Petrenko @ 2020-05-19 10:30 UTC (permalink / raw) To: Cyrill Gorcunov; +Cc: tarantool-patches, v.shpilevoy 19.05.2020 12:08, Cyrill Gorcunov пишет: > On Mon, May 18, 2020 at 03:24:04PM +0300, Serge Petrenko wrote: > ... >> --- a/src/box/wal.c >> +++ b/src/box/wal.c >> @@ -956,24 +956,33 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, >> struct xrow_header **end) >> { >> int64_t tsn = 0; >> + struct xrow_header **start = row; >> + struct xrow_header **first_glob_row = end; >> /** Assign LSN to all local rows. */ >> for ( ; row < end; row++) { >> if ((*row)->replica_id == 0) { >> /* >> * All rows representing local space data >> - * manipulations are signed wth a zero >> + * manipulations are signed with a zero >> * instance id. This is also true for >> * anonymous replicas, since they are >> * only capable of writing to local and >> * temporary spaces. >> */ >> - if ((*row)->group_id != GROUP_LOCAL) >> + if ((*row)->group_id != GROUP_LOCAL) { >> (*row)->replica_id = instance_id; >> + } >> >> (*row)->lsn = vclock_inc(vclock_diff, (*row)->replica_id) + >> vclock_get(base, (*row)->replica_id); >> - /* Use lsn of the first local row as transaction id. */ >> - tsn = tsn == 0 ? (*row)->lsn : tsn; >> + /* >> + * Use lsn of the first global row as >> + * transaction id. >> + */ >> + if ((*row)->group_id != GROUP_LOCAL && tsn == 0) { >> + tsn = (*row)->lsn; >> + first_glob_row = row; >> + } >> (*row)->tsn = tsn; > 1) ^^^ > >> (*row)->is_commit = row == end - 1; >> } else { >> @@ -993,6 +1002,15 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, >> } >> } >> } >> + if (tsn == 0) >> + tsn = (*start)->lsn; >> + /* >> + * Fill transaction id for all the local rows preceding >> + * the first global row. tsn was yet unknown when those >> + * rows were processed. >> + */ >> + for (row = start; row < first_glob_row; row++) >> + (*row)->tsn = tsn; >> } > Wait, the chunk above -- lets assume we've all rows in GROUP_LOCAL thus > in (1) we already set this member to 0 and now we re-walk the entries > again and assign lsn to the first row's lsn. This is ugly as hell, > if only I didn't miss something obvious. > > Can't we do something like below to eliminate needless rewalk over > all rows? Do I make a mistake? > --- > diff --git a/src/box/wal.c b/src/box/wal.c > index d36d4259e..ef4d84920 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -957,7 +957,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, > { > int64_t tsn = 0; > struct xrow_header **start = row; > - struct xrow_header **first_glob_row = end; > + struct xrow_header **first_glob_row = row; > /** Assign LSN to all local rows. */ > for ( ; row < end; row++) { > if ((*row)->replica_id == 0) { > @@ -981,9 +981,12 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, > */ > if ((*row)->group_id != GROUP_LOCAL && tsn == 0) { > tsn = (*row)->lsn; > + /* > + * Remember the tail being processed. > + */ > first_glob_row = row; > } > - (*row)->tsn = tsn; > + (*row)->tsn = tsn == 0 ? (*start)->lsn : tsn; > (*row)->is_commit = row == end - 1; > } else { > int64_t diff = (*row)->lsn - vclock_get(base, (*row)->replica_id); > @@ -1002,8 +1005,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, > } > } > } > - if (tsn == 0) > - tsn = (*start)->lsn; > + > /* > * Fill transaction id for all the local rows preceding > * the first global row. tsn was yet unknown when those Hi! Thanks for the review! You're right, I applied your diff. -- Serge Petrenko ^ permalink raw reply [flat|nested] 11+ messages in thread
* [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches 2020-05-18 12:24 [Tarantool-patches] [PATCH 0/2] fix replication tx boundaries after local space rework Serge Petrenko 2020-05-18 12:24 ` [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries Serge Petrenko @ 2020-05-18 12:24 ` Serge Petrenko 2020-05-18 12:28 ` Serge Petrenko 1 sibling, 1 reply; 11+ messages in thread From: Serge Petrenko @ 2020-05-18 12:24 UTC (permalink / raw) To: v.shpilevoy, gorcunov; +Cc: tarantool-patches Make relay accumulate all the tx rows coming from recovery into a linked list and send them as a batch. This enables relay to make transaction-wide manipulations before sending rows to replica. Add one such manipulation right away: when a transaction ends with a local row, the local row has is_commit flag set to true. However, local rows are not replicated, so the replica never gets an is_commit marker and yields an error `ER_UNSUPPORTED: replication does not support interleaving transactions`, breaking replication. Now relay is assigns is_commit = true to the last global row it's about to send. Closes #4928 --- src/box/relay.cc | 98 ++++++++++--- test/replication/gh-4928-tx-boundaries.result | 132 ++++++++++++++++++ .../gh-4928-tx-boundaries.test.lua | 61 ++++++++ test/replication/suite.cfg | 1 + 4 files changed, 275 insertions(+), 17 deletions(-) create mode 100644 test/replication/gh-4928-tx-boundaries.result create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua diff --git a/src/box/relay.cc b/src/box/relay.cc index 2ad02cb8a..17b7bc667 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -177,7 +177,7 @@ relay_send(struct relay *relay, struct xrow_header *packet); static void relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row); static void -relay_send_row(struct xstream *stream, struct xrow_header *row); +relay_collect_tx(struct xstream *stream, struct xrow_header *row); struct relay * relay_new(struct replica *replica) @@ -355,7 +355,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, if (relay == NULL) diag_raise(); - relay_start(relay, fd, sync, relay_send_row); + relay_start(relay, fd, sync, relay_collect_tx); auto relay_guard = make_scoped_guard([=] { relay_stop(relay); relay_delete(relay); @@ -705,7 +705,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, diag_raise(); } - relay_start(relay, fd, sync, relay_send_row); + relay_start(relay, fd, sync, relay_collect_tx); auto relay_guard = make_scoped_guard([=] { relay_stop(relay); replica_on_relay_stop(replica); @@ -754,11 +754,39 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row) relay_send(relay, row); } -/** Send a single row to the client. */ +struct relay_tx_row { + struct rlist link; + struct xrow_header row; +}; + static void -relay_send_row(struct xstream *stream, struct xrow_header *packet) +relay_send_tx(struct relay *relay, struct rlist *tx_rows) +{ + ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); + + struct relay_tx_row *tx_row; + rlist_foreach_entry(tx_row, tx_rows, link) { + tx_row->row.sync = relay->sync; + coio_write_xrow(&relay->io, &tx_row->row); + } + relay->last_row_time = ev_monotonic_now(loop()); + + struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); + if (inj != NULL && inj->dparam > 0) + fiber_sleep(inj->dparam); +} + +/** + * Collect all the rows belonging to a single transaction and + * send them at once. + */ +static void +relay_collect_tx(struct xstream *stream, struct xrow_header *packet) { struct relay *relay = container_of(stream, struct relay, stream); + static RLIST_HEAD(tx_rows); + struct errinj *inj; + struct relay_tx_row *tx_row; assert(iproto_type_is_dml(packet->type)); if (packet->group_id == GROUP_LOCAL) { /* @@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) * order to correctly promote the vclock on the * replica. */ - if (packet->replica_id == REPLICA_ID_NIL) + if (packet->replica_id == REPLICA_ID_NIL) { + /* + * Make sure is_commit flag from the + * local row makes it to the replica, + * in case the transaction is not fully + * local. + */ + if (packet->is_commit && !rlist_empty(&tx_rows)) { + rlist_last_entry(&tx_rows, struct relay_tx_row, + link)->row.is_commit = true; + goto write; + } return; + } packet->type = IPROTO_NOP; packet->group_id = GROUP_DEFAULT; packet->bodycnt = 0; @@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) * it). In the latter case packet's LSN is less than or equal to * local master's LSN at the moment it received 'SUBSCRIBE' request. */ - if (relay->replica == NULL || - packet->replica_id != relay->replica->id || - packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe, - packet->replica_id)) { - struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN, - ERRINJ_INT); - if (inj != NULL && packet->lsn == inj->iparam) { - packet->lsn = inj->iparam - 1; - say_warn("injected broken lsn: %lld", - (long long) packet->lsn); - } + if (relay->replica != NULL && packet->replica_id == relay->replica->id && + packet->lsn > vclock_get(&relay->local_vclock_at_subscribe, + packet->replica_id)) { + return; + } + + inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT); + if (inj != NULL && packet->lsn == inj->iparam) { + packet->lsn = inj->iparam - 1; + say_warn("injected broken lsn: %lld", + (long long) packet->lsn); + } + + /* A short path for single-statement transactions. */ + if (packet->is_commit && rlist_empty(&tx_rows)) { relay_send(relay, packet); + return; + } + + tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc, + sizeof(*tx_row)); + if (tx_row == NULL) { + tnt_raise(OutOfMemory, sizeof(*tx_row), "region", + "struct relay_tx_row"); + } + tx_row->row = *packet; + rlist_add_tail_entry(&tx_rows, tx_row, link); + + if (packet->is_commit) { +write: relay_send_tx(relay, &tx_rows); + tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); + /* + * Free all the relay_tx_rows allocated on the + * fiber region. + */ + fiber_gc(); } } diff --git a/test/replication/gh-4928-tx-boundaries.result b/test/replication/gh-4928-tx-boundaries.result new file mode 100644 index 000000000..89b569e65 --- /dev/null +++ b/test/replication/gh-4928-tx-boundaries.result @@ -0,0 +1,132 @@ +-- test-run result file version 2 +-- gh-4928. Test that transactions mixing local and global +-- space operations are replicated correctly. +env = require('test_run') + | --- + | ... +test_run = env.new() + | --- + | ... +bit = require('bit') + | --- + | ... + +-- Init. +box.schema.user.grant('guest', 'replication') + | --- + | ... +box.schema.space.create('glob') + | --- + | - engine: memtx + | before_replace: 'function: 0x010392fcb0' + | on_replace: 'function: 0x010392fc78' + | ck_constraint: [] + | field_count: 0 + | temporary: false + | index: [] + | is_local: false + | enabled: false + | name: glob + | id: 512 + | - created + | ... +box.schema.space.create('loc', {is_local=true}) + | --- + | - engine: memtx + | before_replace: 'function: 0x010383f3b0' + | on_replace: 'function: 0x01033dcad8' + | ck_constraint: [] + | field_count: 0 + | temporary: false + | index: [] + | is_local: true + | enabled: false + | name: loc + | id: 513 + | - created + | ... +box.space.glob:create_index('pk') + | --- + | - unique: true + | parts: + | - type: unsigned + | is_nullable: false + | fieldno: 1 + | id: 0 + | space_id: 512 + | type: TREE + | name: pk + | ... +box.space.loc:create_index('pk') + | --- + | - unique: true + | parts: + | - type: unsigned + | is_nullable: false + | fieldno: 1 + | id: 0 + | space_id: 513 + | type: TREE + | name: pk + | ... + +function gen_mixed_tx(i)\ + box.begin()\ + if bit.band(i, 1) ~= 0 then\ + box.space.glob:insert{10 * i + 1}\ + else\ + box.space.loc:insert{10 * i + 1}\ + end\ + if bit.band(i, 2) ~= 0 then\ + box.space.glob:insert{10 * i + 2}\ + else\ + box.space.loc:insert{10 * i + 2}\ + end\ + if bit.band(i, 4) ~= 0 then\ + box.space.glob:insert{10 * i + 3}\ + else\ + box.space.loc:insert{10 * i + 3}\ + end\ + box.commit()\ +end + | --- + | ... + +test_run:cmd("create server replica with rpl_master=default,\ + script='replication/replica.lua'") + | --- + | - true + | ... +test_run:cmd('start server replica') + | --- + | - true + | ... +test_run:wait_downstream(2, {status='follow'}) + | --- + | - true + | ... + +for i = 0, 7 do gen_mixed_tx(i) end + | --- + | ... + +box.info.replication[2].status + | --- + | - null + | ... + +vclock = box.info.vclock + | --- + | ... +vclock[0] = nil + | --- + | ... +test_run:wait_vclock("replica", vclock) + | + +test_run:cmd('switch replica') + | [Lost current connection] + +box.info.status + | [Lost current connection] +box.info.replication[1].upstream.status diff --git a/test/replication/gh-4928-tx-boundaries.test.lua b/test/replication/gh-4928-tx-boundaries.test.lua new file mode 100644 index 000000000..07599c27f --- /dev/null +++ b/test/replication/gh-4928-tx-boundaries.test.lua @@ -0,0 +1,61 @@ +-- gh-4928. Test that transactions mixing local and global +-- space operations are replicated correctly. +env = require('test_run') +test_run = env.new() +bit = require('bit') + +-- Init. +box.schema.user.grant('guest', 'replication') +_ = box.schema.space.create('glob') +_ = box.schema.space.create('loc', {is_local=true}) +_ = box.space.glob:create_index('pk') +_ = box.space.loc:create_index('pk') + +function gen_mixed_tx(i)\ + box.begin()\ + if bit.band(i, 1) ~= 0 then\ + box.space.glob:insert{10 * i + 1}\ + else\ + box.space.loc:insert{10 * i + 1}\ + end\ + if bit.band(i, 2) ~= 0 then\ + box.space.glob:insert{10 * i + 2}\ + else\ + box.space.loc:insert{10 * i + 2}\ + end\ + if bit.band(i, 4) ~= 0 then\ + box.space.glob:insert{10 * i + 3}\ + else\ + box.space.loc:insert{10 * i + 3}\ + end\ + box.commit()\ +end + +test_run:cmd("create server replica with rpl_master=default,\ + script='replication/replica.lua'") +test_run:cmd('start server replica') +test_run:wait_downstream(2, {status='follow'}) + +for i = 0, 7 do gen_mixed_tx(i) end + +box.info.replication[2].status + +vclock = box.info.vclock +vclock[0] = nil +test_run:wait_vclock("replica", vclock) + +test_run:cmd('switch replica') + +box.info.status +box.info.replication[1].upstream.status + +box.space.test:select{} + +test_run:cmd('switch default') + +-- Cleanup. +test_run:cmd('stop server replica') +test_run:cmd('delete server replica') +box.schema.user.revoke('guest', 'replication') +box.space.loc:drop() +box.space.glob:drop() diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index d2743b5ed..f357b07da 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -18,6 +18,7 @@ "gh-4606-admin-creds.test.lua": {}, "gh-4739-vclock-assert.test.lua": {}, "gh-4730-applier-rollback.test.lua": {}, + "gh-4928-tx-boundaries.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} -- 2.24.2 (Apple Git-127) ^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches 2020-05-18 12:24 ` [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches Serge Petrenko @ 2020-05-18 12:28 ` Serge Petrenko 2020-05-19 10:23 ` Cyrill Gorcunov 0 siblings, 1 reply; 11+ messages in thread From: Serge Petrenko @ 2020-05-18 12:28 UTC (permalink / raw) To: v.shpilevoy, gorcunov; +Cc: tarantool-patches 18.05.2020 15:24, Serge Petrenko пишет: > Make relay accumulate all the tx rows coming from recovery into a linked > list and send them as a batch. > > This enables relay to make transaction-wide manipulations before sending > rows to replica. > > Add one such manipulation right away: when a transaction ends with a > local row, the local row has is_commit flag set to true. However, local > rows are not replicated, so the replica never gets an is_commit marker > and yields an error `ER_UNSUPPORTED: replication does not support > interleaving transactions`, breaking replication. > > Now relay is assigns is_commit = true to the last global row it's > about to send. > > Closes #4928 > --- > src/box/relay.cc | 98 ++++++++++--- > test/replication/gh-4928-tx-boundaries.result | 132 ++++++++++++++++++ > .../gh-4928-tx-boundaries.test.lua | 61 ++++++++ > test/replication/suite.cfg | 1 + > 4 files changed, 275 insertions(+), 17 deletions(-) > create mode 100644 test/replication/gh-4928-tx-boundaries.result > create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua Sorry for the mess, the correct patch is below: src/box/relay.cc | 98 ++++++++++--- test/replication/gh-4928-tx-boundaries.result | 138 ++++++++++++++++++ .../gh-4928-tx-boundaries.test.lua | 61 ++++++++ test/replication/suite.cfg | 1 + 4 files changed, 281 insertions(+), 17 deletions(-) create mode 100644 test/replication/gh-4928-tx-boundaries.result create mode 100644 test/replication/gh-4928-tx-boundaries.test.lua diff --git a/src/box/relay.cc b/src/box/relay.cc index 2ad02cb8a..17b7bc667 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -177,7 +177,7 @@ relay_send(struct relay *relay, struct xrow_header *packet); static void relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row); static void -relay_send_row(struct xstream *stream, struct xrow_header *row); +relay_collect_tx(struct xstream *stream, struct xrow_header *row); struct relay * relay_new(struct replica *replica) @@ -355,7 +355,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, if (relay == NULL) diag_raise(); - relay_start(relay, fd, sync, relay_send_row); + relay_start(relay, fd, sync, relay_collect_tx); auto relay_guard = make_scoped_guard([=] { relay_stop(relay); relay_delete(relay); @@ -705,7 +705,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, diag_raise(); } - relay_start(relay, fd, sync, relay_send_row); + relay_start(relay, fd, sync, relay_collect_tx); auto relay_guard = make_scoped_guard([=] { relay_stop(relay); replica_on_relay_stop(replica); @@ -754,11 +754,39 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row) relay_send(relay, row); } -/** Send a single row to the client. */ +struct relay_tx_row { + struct rlist link; + struct xrow_header row; +}; + static void -relay_send_row(struct xstream *stream, struct xrow_header *packet) +relay_send_tx(struct relay *relay, struct rlist *tx_rows) +{ + ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); + + struct relay_tx_row *tx_row; + rlist_foreach_entry(tx_row, tx_rows, link) { + tx_row->row.sync = relay->sync; + coio_write_xrow(&relay->io, &tx_row->row); + } + relay->last_row_time = ev_monotonic_now(loop()); + + struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); + if (inj != NULL && inj->dparam > 0) + fiber_sleep(inj->dparam); +} + +/** + * Collect all the rows belonging to a single transaction and + * send them at once. + */ +static void +relay_collect_tx(struct xstream *stream, struct xrow_header *packet) { struct relay *relay = container_of(stream, struct relay, stream); + static RLIST_HEAD(tx_rows); + struct errinj *inj; + struct relay_tx_row *tx_row; assert(iproto_type_is_dml(packet->type)); if (packet->group_id == GROUP_LOCAL) { /* @@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) * order to correctly promote the vclock on the * replica. */ - if (packet->replica_id == REPLICA_ID_NIL) + if (packet->replica_id == REPLICA_ID_NIL) { + /* + * Make sure is_commit flag from the + * local row makes it to the replica, + * in case the transaction is not fully + * local. + */ + if (packet->is_commit && !rlist_empty(&tx_rows)) { + rlist_last_entry(&tx_rows, struct relay_tx_row, + link)->row.is_commit = true; + goto write; + } return; + } packet->type = IPROTO_NOP; packet->group_id = GROUP_DEFAULT; packet->bodycnt = 0; @@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) * it). In the latter case packet's LSN is less than or equal to * local master's LSN at the moment it received 'SUBSCRIBE' request. */ - if (relay->replica == NULL || - packet->replica_id != relay->replica->id || - packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe, - packet->replica_id)) { - struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN, - ERRINJ_INT); - if (inj != NULL && packet->lsn == inj->iparam) { - packet->lsn = inj->iparam - 1; - say_warn("injected broken lsn: %lld", - (long long) packet->lsn); - } + if (relay->replica != NULL && packet->replica_id == relay->replica->id && + packet->lsn > vclock_get(&relay->local_vclock_at_subscribe, + packet->replica_id)) { + return; + } + + inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT); + if (inj != NULL && packet->lsn == inj->iparam) { + packet->lsn = inj->iparam - 1; + say_warn("injected broken lsn: %lld", + (long long) packet->lsn); + } + + /* A short path for single-statement transactions. */ + if (packet->is_commit && rlist_empty(&tx_rows)) { relay_send(relay, packet); + return; + } + + tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc, + sizeof(*tx_row)); + if (tx_row == NULL) { + tnt_raise(OutOfMemory, sizeof(*tx_row), "region", + "struct relay_tx_row"); + } + tx_row->row = *packet; + rlist_add_tail_entry(&tx_rows, tx_row, link); + + if (packet->is_commit) { +write: relay_send_tx(relay, &tx_rows); + tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); + /* + * Free all the relay_tx_rows allocated on the + * fiber region. + */ + fiber_gc(); } } diff --git a/test/replication/gh-4928-tx-boundaries.result b/test/replication/gh-4928-tx-boundaries.result new file mode 100644 index 000000000..969bd8438 --- /dev/null +++ b/test/replication/gh-4928-tx-boundaries.result @@ -0,0 +1,138 @@ +-- test-run result file version 2 +-- gh-4928. Test that transactions mixing local and global +-- space operations are replicated correctly. +env = require('test_run') + | --- + | ... +test_run = env.new() + | --- + | ... +bit = require('bit') + | --- + | ... + +-- Init. +box.schema.user.grant('guest', 'replication') + | --- + | ... +_ = box.schema.space.create('glob') + | --- + | ... +_ = box.schema.space.create('loc', {is_local=true}) + | --- + | ... +_ = box.space.glob:create_index('pk') + | --- + | ... +_ = box.space.loc:create_index('pk') + | --- + | ... + +function gen_mixed_tx(i)\ + box.begin()\ + if bit.band(i, 1) ~= 0 then\ + box.space.glob:insert{10 * i + 1}\ + else\ + box.space.loc:insert{10 * i + 1}\ + end\ + if bit.band(i, 2) ~= 0 then\ + box.space.glob:insert{10 * i + 2}\ + else\ + box.space.loc:insert{10 * i + 2}\ + end\ + if bit.band(i, 4) ~= 0 then\ + box.space.glob:insert{10 * i + 3}\ + else\ + box.space.loc:insert{10 * i + 3}\ + end\ + box.commit()\ +end + | --- + | ... + +test_run:cmd("create server replica with rpl_master=default,\ + script='replication/replica.lua'") + | --- + | - true + | ... +test_run:cmd('start server replica') + | --- + | - true + | ... +test_run:wait_downstream(2, {status='follow'}) + | --- + | - true + | ... + +for i = 0, 7 do gen_mixed_tx(i) end + | --- + | ... + +box.info.replication[2].status + | --- + | - null + | ... + +vclock = box.info.vclock + | --- + | ... +vclock[0] = nil + | --- + | ... +test_run:wait_vclock("replica", vclock) + | --- + | ... + +test_run:cmd('switch replica') + | --- + | - true + | ... + +box.info.status + | --- + | - running + | ... +box.info.replication[1].upstream.status + | --- + | - follow + | ... + +box.space.glob:select{} + | --- + | - - [11] + | - [22] + | - [31] + | - [32] + | - [43] + | - [51] + | - [53] + | - [62] + | - [63] + | - [71] + | - [72] + | - [73] + | ... + +test_run:cmd('switch default') + | --- + | - true + | ... + +-- Cleanup. +test_run:cmd('stop server replica') + | --- + | - true + | ... +test_run:cmd('delete server replica') + | --- + | - true + | ... +box.schema.user.revoke('guest', 'replication') + | --- + | ... +box.space.loc:drop() + | --- + | ... +box.space.glob:drop() + | --- + | ... diff --git a/test/replication/gh-4928-tx-boundaries.test.lua b/test/replication/gh-4928-tx-boundaries.test.lua new file mode 100644 index 000000000..92526fc51 --- /dev/null +++ b/test/replication/gh-4928-tx-boundaries.test.lua @@ -0,0 +1,61 @@ +-- gh-4928. Test that transactions mixing local and global +-- space operations are replicated correctly. +env = require('test_run') +test_run = env.new() +bit = require('bit') + +-- Init. +box.schema.user.grant('guest', 'replication') +_ = box.schema.space.create('glob') +_ = box.schema.space.create('loc', {is_local=true}) +_ = box.space.glob:create_index('pk') +_ = box.space.loc:create_index('pk') + +function gen_mixed_tx(i)\ + box.begin()\ + if bit.band(i, 1) ~= 0 then\ + box.space.glob:insert{10 * i + 1}\ + else\ + box.space.loc:insert{10 * i + 1}\ + end\ + if bit.band(i, 2) ~= 0 then\ + box.space.glob:insert{10 * i + 2}\ + else\ + box.space.loc:insert{10 * i + 2}\ + end\ + if bit.band(i, 4) ~= 0 then\ + box.space.glob:insert{10 * i + 3}\ + else\ + box.space.loc:insert{10 * i + 3}\ + end\ + box.commit()\ +end + +test_run:cmd("create server replica with rpl_master=default,\ + script='replication/replica.lua'") +test_run:cmd('start server replica') +test_run:wait_downstream(2, {status='follow'}) + +for i = 0, 7 do gen_mixed_tx(i) end + +box.info.replication[2].status + +vclock = box.info.vclock +vclock[0] = nil +test_run:wait_vclock("replica", vclock) + +test_run:cmd('switch replica') + +box.info.status +box.info.replication[1].upstream.status + +box.space.glob:select{} + +test_run:cmd('switch default') + +-- Cleanup. +test_run:cmd('stop server replica') +test_run:cmd('delete server replica') +box.schema.user.revoke('guest', 'replication') +box.space.loc:drop() +box.space.glob:drop() diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index d2743b5ed..f357b07da 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -18,6 +18,7 @@ "gh-4606-admin-creds.test.lua": {}, "gh-4739-vclock-assert.test.lua": {}, "gh-4730-applier-rollback.test.lua": {}, + "gh-4928-tx-boundaries.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} -- 2.24.2 (Apple Git-127) -- Serge Petrenko ^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches 2020-05-18 12:28 ` Serge Petrenko @ 2020-05-19 10:23 ` Cyrill Gorcunov 2020-05-19 10:49 ` Serge Petrenko 0 siblings, 1 reply; 11+ messages in thread From: Cyrill Gorcunov @ 2020-05-19 10:23 UTC (permalink / raw) To: Serge Petrenko; +Cc: tarantool-patches, v.shpilevoy On Mon, May 18, 2020 at 03:28:46PM +0300, Serge Petrenko wrote: > > Sorry for the mess, the correct patch is below: space/tabs are screwed (but since we're usually fetching from the branch this is not critical) ... > > -/** Send a single row to the client. */ > +struct relay_tx_row { > + struct rlist link; > + struct xrow_header row; > +}; > + > static void > -relay_send_row(struct xstream *stream, struct xrow_header *packet) > +relay_send_tx(struct relay *relay, struct rlist *tx_rows) > +{ > + ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); > + > + struct relay_tx_row *tx_row; > + rlist_foreach_entry(tx_row, tx_rows, link) { > + tx_row->row.sync = relay->sync; > + coio_write_xrow(&relay->io, &tx_row->row); > + } > + relay->last_row_time = ev_monotonic_now(loop()); > + > + struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); > + if (inj != NULL && inj->dparam > 0) > + fiber_sleep(inj->dparam); > +} > + > +/** > + * Collect all the rows belonging to a single transaction and > + * send them at once. > + */ > +static void > +relay_collect_tx(struct xstream *stream, struct xrow_header *packet) > { > struct relay *relay = container_of(stream, struct relay, stream); > + static RLIST_HEAD(tx_rows); > + struct errinj *inj; > + struct relay_tx_row *tx_row; > assert(iproto_type_is_dml(packet->type)); > if (packet->group_id == GROUP_LOCAL) { > /* > @@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct > xrow_header *packet) > * order to correctly promote the vclock on the > * replica. > */ > - if (packet->replica_id == REPLICA_ID_NIL) > + if (packet->replica_id == REPLICA_ID_NIL) { > + /* > + * Make sure is_commit flag from the > + * local row makes it to the replica, > + * in case the transaction is not fully > + * local. > + */ > + if (packet->is_commit && !rlist_empty(&tx_rows)) { > + rlist_last_entry(&tx_rows, struct relay_tx_row, > + link)->row.is_commit = true; > + goto write; > + } I guees we can do simplier if (!packet->is_commit || rlist_empty(&tx_rows)) return; rlist_last_entry(&tx_rows, struct relay_tx_row, link)->row.is_commit = true; goto write; ? > return; > + } > packet->type = IPROTO_NOP; > packet->group_id = GROUP_DEFAULT; > packet->bodycnt = 0; > @@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct > xrow_header *packet) > * it). In the latter case packet's LSN is less than or equal to > * local master's LSN at the moment it received 'SUBSCRIBE' request. > */ > - if (relay->replica == NULL || > - packet->replica_id != relay->replica->id || > - packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe, > - packet->replica_id)) { > - struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN, > - ERRINJ_INT); > - if (inj != NULL && packet->lsn == inj->iparam) { > - packet->lsn = inj->iparam - 1; > - say_warn("injected broken lsn: %lld", > - (long long) packet->lsn); > - } > + if (relay->replica != NULL && packet->replica_id == relay->replica->id > && > + packet->lsn > vclock_get(&relay->local_vclock_at_subscribe, > + packet->replica_id)) { > + return; > + } > + > + inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT); > + if (inj != NULL && packet->lsn == inj->iparam) { > + packet->lsn = inj->iparam - 1; > + say_warn("injected broken lsn: %lld", > + (long long) packet->lsn); > + } > + > + /* A short path for single-statement transactions. */ > + if (packet->is_commit && rlist_empty(&tx_rows)) { > relay_send(relay, packet); > + return; > + } > + > + tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc, > + sizeof(*tx_row)); > + if (tx_row == NULL) { > + tnt_raise(OutOfMemory, sizeof(*tx_row), "region", > + "struct relay_tx_row"); > + } > + tx_row->row = *packet; > + rlist_add_tail_entry(&tx_rows, tx_row, link); > + > + if (packet->is_commit) { > +write: relay_send_tx(relay, &tx_rows); > + tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); Why?! This is stack local variable, no? > + /* > + * Free all the relay_tx_rows allocated on the > + * fiber region. > + */ > + fiber_gc(); > } > } ^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches 2020-05-19 10:23 ` Cyrill Gorcunov @ 2020-05-19 10:49 ` Serge Petrenko 2020-05-19 11:18 ` Cyrill Gorcunov 0 siblings, 1 reply; 11+ messages in thread From: Serge Petrenko @ 2020-05-19 10:49 UTC (permalink / raw) To: Cyrill Gorcunov; +Cc: tarantool-patches, v.shpilevoy 19.05.2020 13:23, Cyrill Gorcunov пишет: > On Mon, May 18, 2020 at 03:28:46PM +0300, Serge Petrenko wrote: Hi! Thanks for the review! >> Sorry for the mess, the correct patch is below: > space/tabs are screwed (but since we're usually fetching from > the branch this is not critical) > > ... Yep, that's cos I pasted it from the console, sorry. >> -/** Send a single row to the client. */ >> +struct relay_tx_row { >> + struct rlist link; >> + struct xrow_header row; >> +}; >> + >> static void >> -relay_send_row(struct xstream *stream, struct xrow_header *packet) >> +relay_send_tx(struct relay *relay, struct rlist *tx_rows) >> +{ >> + ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); >> + >> + struct relay_tx_row *tx_row; >> + rlist_foreach_entry(tx_row, tx_rows, link) { >> + tx_row->row.sync = relay->sync; >> + coio_write_xrow(&relay->io, &tx_row->row); >> + } >> + relay->last_row_time = ev_monotonic_now(loop()); >> + >> + struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); >> + if (inj != NULL && inj->dparam > 0) >> + fiber_sleep(inj->dparam); >> +} >> + >> +/** >> + * Collect all the rows belonging to a single transaction and >> + * send them at once. >> + */ >> +static void >> +relay_collect_tx(struct xstream *stream, struct xrow_header *packet) >> { >> struct relay *relay = container_of(stream, struct relay, stream); >> + static RLIST_HEAD(tx_rows); >> + struct errinj *inj; >> + struct relay_tx_row *tx_row; >> assert(iproto_type_is_dml(packet->type)); >> if (packet->group_id == GROUP_LOCAL) { >> /* >> @@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct >> xrow_header *packet) >> * order to correctly promote the vclock on the >> * replica. >> */ >> - if (packet->replica_id == REPLICA_ID_NIL) >> + if (packet->replica_id == REPLICA_ID_NIL) { >> + /* >> + * Make sure is_commit flag from the >> + * local row makes it to the replica, >> + * in case the transaction is not fully >> + * local. >> + */ >> + if (packet->is_commit && !rlist_empty(&tx_rows)) { >> + rlist_last_entry(&tx_rows, struct relay_tx_row, >> + link)->row.is_commit = true; >> + goto write; >> + } > I guees we can do simplier > > if (!packet->is_commit || rlist_empty(&tx_rows)) > return; > > rlist_last_entry(&tx_rows, struct relay_tx_row, > link)->row.is_commit = true; > goto write; > ? Yes, we can, thanks. I applied your diff. >> return; >> + } >> packet->type = IPROTO_NOP; >> packet->group_id = GROUP_DEFAULT; >> packet->bodycnt = 0; >> @@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct >> xrow_header *packet) >> * it). In the latter case packet's LSN is less than or equal to >> * local master's LSN at the moment it received 'SUBSCRIBE' request. >> */ >> - if (relay->replica == NULL || >> - packet->replica_id != relay->replica->id || >> - packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe, >> - packet->replica_id)) { >> - struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN, >> - ERRINJ_INT); >> - if (inj != NULL && packet->lsn == inj->iparam) { >> - packet->lsn = inj->iparam - 1; >> - say_warn("injected broken lsn: %lld", >> - (long long) packet->lsn); >> - } >> + if (relay->replica != NULL && packet->replica_id == relay->replica->id >> && >> + packet->lsn > vclock_get(&relay->local_vclock_at_subscribe, >> + packet->replica_id)) { >> + return; >> + } >> + >> + inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT); >> + if (inj != NULL && packet->lsn == inj->iparam) { >> + packet->lsn = inj->iparam - 1; >> + say_warn("injected broken lsn: %lld", >> + (long long) packet->lsn); >> + } >> + >> + /* A short path for single-statement transactions. */ >> + if (packet->is_commit && rlist_empty(&tx_rows)) { >> relay_send(relay, packet); >> + return; >> + } >> + >> + tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc, >> + sizeof(*tx_row)); >> + if (tx_row == NULL) { >> + tnt_raise(OutOfMemory, sizeof(*tx_row), "region", >> + "struct relay_tx_row"); >> + } >> + tx_row->row = *packet; >> + rlist_add_tail_entry(&tx_rows, tx_row, link); >> + >> + if (packet->is_commit) { >> +write: relay_send_tx(relay, &tx_rows); >> + tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); > Why?! This is stack local variable, no? Why? It's static. > >> + /* >> + * Free all the relay_tx_rows allocated on the >> + * fiber region. >> + */ >> + fiber_gc(); >> } >> } -- Serge Petrenko ^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches 2020-05-19 10:49 ` Serge Petrenko @ 2020-05-19 11:18 ` Cyrill Gorcunov 2020-05-19 12:31 ` Serge Petrenko 0 siblings, 1 reply; 11+ messages in thread From: Cyrill Gorcunov @ 2020-05-19 11:18 UTC (permalink / raw) To: Serge Petrenko; +Cc: tarantool-patches, v.shpilevoy On Tue, May 19, 2020 at 01:49:10PM +0300, Serge Petrenko wrote: ... > > > + if (packet->is_commit) { > > > +write: relay_send_tx(relay, &tx_rows); > > > + tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); > > Why?! This is stack local variable, no? > Why? It's static. Yup, just found out ;) It is so hidden in the other vars declaration so was not obvious (a least for me :-). I'm appending the more clear approach. But Serge, this is my personal opinion, I'm fine with your current code as well, so Reviewed-by: Cyrill Gorcunov <gorcunov@gmail.com> it is up to you to pick or drop the diff below --- src/box/relay.cc | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/box/relay.cc b/src/box/relay.cc index 17b7bc667..27424e0ca 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -760,12 +760,12 @@ struct relay_tx_row { }; static void -relay_send_tx(struct relay *relay, struct rlist *tx_rows) +relay_send_tx(struct relay *relay, struct rlist *head) { ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); struct relay_tx_row *tx_row; - rlist_foreach_entry(tx_row, tx_rows, link) { + rlist_foreach_entry(tx_row, head, link) { tx_row->row.sync = relay->sync; coio_write_xrow(&relay->io, &tx_row->row); } @@ -784,10 +784,13 @@ static void relay_collect_tx(struct xstream *stream, struct xrow_header *packet) { struct relay *relay = container_of(stream, struct relay, stream); - static RLIST_HEAD(tx_rows); - struct errinj *inj; struct relay_tx_row *tx_row; + struct errinj *inj; + + static RLIST_HEAD(tx_rows_list); + assert(iproto_type_is_dml(packet->type)); + if (packet->group_id == GROUP_LOCAL) { /* * We do not relay replica-local rows to other @@ -805,20 +808,22 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet) * in case the transaction is not fully * local. */ - if (packet->is_commit && !rlist_empty(&tx_rows)) { - rlist_last_entry(&tx_rows, struct relay_tx_row, - link)->row.is_commit = true; - goto write; - } - return; + if (!packet->is_commit || rlist_empty(&tx_rows_list)) + return; + + rlist_last_entry(&tx_rows_list, struct relay_tx_row, + link)->row.is_commit = true; + goto write; } packet->type = IPROTO_NOP; packet->group_id = GROUP_DEFAULT; packet->bodycnt = 0; } + /* Check if the rows from the instance are filtered. */ - if ((1 << packet->replica_id & relay->id_filter) != 0) + if (((1u << packet->replica_id) & relay->id_filter) != 0) return; + /* * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE * request. If this is FINAL JOIN (i.e. relay->replica is NULL), @@ -845,7 +850,7 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet) } /* A short path for single-statement transactions. */ - if (packet->is_commit && rlist_empty(&tx_rows)) { + if (packet->is_commit && rlist_empty(&tx_rows_list)) { relay_send(relay, packet); return; } @@ -857,11 +862,13 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet) "struct relay_tx_row"); } tx_row->row = *packet; - rlist_add_tail_entry(&tx_rows, tx_row, link); + rlist_add_tail_entry(&tx_rows_list, tx_row, link); if (packet->is_commit) { -write: relay_send_tx(relay, &tx_rows); - tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); +write: + relay_send_tx(relay, &tx_rows_list); + tx_rows_list = RLIST_HEAD_INITIALIZER(tx_rows_list); + /* * Free all the relay_tx_rows allocated on the * fiber region. -- 2.26.2 ^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches 2020-05-19 11:18 ` Cyrill Gorcunov @ 2020-05-19 12:31 ` Serge Petrenko 2020-05-19 16:24 ` Serge Petrenko 0 siblings, 1 reply; 11+ messages in thread From: Serge Petrenko @ 2020-05-19 12:31 UTC (permalink / raw) To: Cyrill Gorcunov; +Cc: tarantool-patches, v.shpilevoy 19.05.2020 14:18, Cyrill Gorcunov пишет: > On Tue, May 19, 2020 at 01:49:10PM +0300, Serge Petrenko wrote: > ... >>>> + if (packet->is_commit) { >>>> +write: relay_send_tx(relay, &tx_rows); >>>> + tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); >>> Why?! This is stack local variable, no? >> Why? It's static. > Yup, just found out ;) It is so hidden in the other vars > declaration so was not obvious (a least for me :-). I'm > appending the more clear approach. > > But Serge, this is my personal opinion, I'm fine with your > current code as well, so > > Reviewed-by: Cyrill Gorcunov <gorcunov@gmail.com> > > it is up to you to pick or drop the diff below Thanks! Looks good, applied. > --- > src/box/relay.cc | 37 ++++++++++++++++++++++--------------- > 1 file changed, 22 insertions(+), 15 deletions(-) > > diff --git a/src/box/relay.cc b/src/box/relay.cc > index 17b7bc667..27424e0ca 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -760,12 +760,12 @@ struct relay_tx_row { > }; > > static void > -relay_send_tx(struct relay *relay, struct rlist *tx_rows) > +relay_send_tx(struct relay *relay, struct rlist *head) > { > ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); > > struct relay_tx_row *tx_row; > - rlist_foreach_entry(tx_row, tx_rows, link) { > + rlist_foreach_entry(tx_row, head, link) { > tx_row->row.sync = relay->sync; > coio_write_xrow(&relay->io, &tx_row->row); > } > @@ -784,10 +784,13 @@ static void > relay_collect_tx(struct xstream *stream, struct xrow_header *packet) > { > struct relay *relay = container_of(stream, struct relay, stream); > - static RLIST_HEAD(tx_rows); > - struct errinj *inj; > struct relay_tx_row *tx_row; > + struct errinj *inj; > + > + static RLIST_HEAD(tx_rows_list); > + > assert(iproto_type_is_dml(packet->type)); > + > if (packet->group_id == GROUP_LOCAL) { > /* > * We do not relay replica-local rows to other > @@ -805,20 +808,22 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet) > * in case the transaction is not fully > * local. > */ > - if (packet->is_commit && !rlist_empty(&tx_rows)) { > - rlist_last_entry(&tx_rows, struct relay_tx_row, > - link)->row.is_commit = true; > - goto write; > - } > - return; > + if (!packet->is_commit || rlist_empty(&tx_rows_list)) > + return; > + > + rlist_last_entry(&tx_rows_list, struct relay_tx_row, > + link)->row.is_commit = true; > + goto write; > } > packet->type = IPROTO_NOP; > packet->group_id = GROUP_DEFAULT; > packet->bodycnt = 0; > } > + > /* Check if the rows from the instance are filtered. */ > - if ((1 << packet->replica_id & relay->id_filter) != 0) > + if (((1u << packet->replica_id) & relay->id_filter) != 0) > return; > + > /* > * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE > * request. If this is FINAL JOIN (i.e. relay->replica is NULL), > @@ -845,7 +850,7 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet) > } > > /* A short path for single-statement transactions. */ > - if (packet->is_commit && rlist_empty(&tx_rows)) { > + if (packet->is_commit && rlist_empty(&tx_rows_list)) { > relay_send(relay, packet); > return; > } > @@ -857,11 +862,13 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet) > "struct relay_tx_row"); > } > tx_row->row = *packet; > - rlist_add_tail_entry(&tx_rows, tx_row, link); > + rlist_add_tail_entry(&tx_rows_list, tx_row, link); > > if (packet->is_commit) { > -write: relay_send_tx(relay, &tx_rows); > - tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); > +write: > + relay_send_tx(relay, &tx_rows_list); > + tx_rows_list = RLIST_HEAD_INITIALIZER(tx_rows_list); > + > /* > * Free all the relay_tx_rows allocated on the > * fiber region. -- Serge Petrenko ^ permalink raw reply [flat|nested] 11+ messages in thread
* Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches 2020-05-19 12:31 ` Serge Petrenko @ 2020-05-19 16:24 ` Serge Petrenko 0 siblings, 0 replies; 11+ messages in thread From: Serge Petrenko @ 2020-05-19 16:24 UTC (permalink / raw) To: Cyrill Gorcunov; +Cc: tarantool-patches, v.shpilevoy 19.05.2020 15:31, Serge Petrenko пишет: > > 19.05.2020 14:18, Cyrill Gorcunov пишет: >> On Tue, May 19, 2020 at 01:49:10PM +0300, Serge Petrenko wrote: >> ... >>>>> + if (packet->is_commit) { >>>>> +write: relay_send_tx(relay, &tx_rows); >>>>> + tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); >>>> Why?! This is stack local variable, no? >>> Why? It's static. >> Yup, just found out ;) It is so hidden in the other vars >> declaration so was not obvious (a least for me :-). I'm >> appending the more clear approach. >> >> But Serge, this is my personal opinion, I'm fine with your >> current code as well, so >> >> Reviewed-by: Cyrill Gorcunov <gorcunov@gmail.com> >> >> it is up to you to pick or drop the diff below > > Thanks! Looks good, applied. > >> --- >> src/box/relay.cc | 37 ++++++++++++++++++++++--------------- >> 1 file changed, 22 insertions(+), 15 deletions(-) >> >> diff --git a/src/box/relay.cc b/src/box/relay.cc >> index 17b7bc667..27424e0ca 100644 >> --- a/src/box/relay.cc >> +++ b/src/box/relay.cc >> @@ -760,12 +760,12 @@ struct relay_tx_row { >> }; >> static void >> -relay_send_tx(struct relay *relay, struct rlist *tx_rows) >> +relay_send_tx(struct relay *relay, struct rlist *head) >> { >> ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); >> struct relay_tx_row *tx_row; >> - rlist_foreach_entry(tx_row, tx_rows, link) { >> + rlist_foreach_entry(tx_row, head, link) { >> tx_row->row.sync = relay->sync; >> coio_write_xrow(&relay->io, &tx_row->row); >> } >> @@ -784,10 +784,13 @@ static void >> relay_collect_tx(struct xstream *stream, struct xrow_header *packet) >> { >> struct relay *relay = container_of(stream, struct relay, stream); >> - static RLIST_HEAD(tx_rows); >> - struct errinj *inj; >> struct relay_tx_row *tx_row; >> + struct errinj *inj; >> + >> + static RLIST_HEAD(tx_rows_list); >> + >> assert(iproto_type_is_dml(packet->type)); >> + >> if (packet->group_id == GROUP_LOCAL) { >> /* >> * We do not relay replica-local rows to other >> @@ -805,20 +808,22 @@ relay_collect_tx(struct xstream *stream, struct >> xrow_header *packet) >> * in case the transaction is not fully >> * local. >> */ >> - if (packet->is_commit && !rlist_empty(&tx_rows)) { >> - rlist_last_entry(&tx_rows, struct relay_tx_row, >> - link)->row.is_commit = true; >> - goto write; >> - } >> - return; >> + if (!packet->is_commit || rlist_empty(&tx_rows_list)) >> + return; >> + >> + rlist_last_entry(&tx_rows_list, struct relay_tx_row, >> + link)->row.is_commit = true; >> + goto write; >> } >> packet->type = IPROTO_NOP; >> packet->group_id = GROUP_DEFAULT; >> packet->bodycnt = 0; >> } >> + >> /* Check if the rows from the instance are filtered. */ >> - if ((1 << packet->replica_id & relay->id_filter) != 0) >> + if (((1u << packet->replica_id) & relay->id_filter) != 0) >> return; >> + >> /* >> * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE >> * request. If this is FINAL JOIN (i.e. relay->replica is NULL), >> @@ -845,7 +850,7 @@ relay_collect_tx(struct xstream *stream, struct >> xrow_header *packet) >> } >> /* A short path for single-statement transactions. */ >> - if (packet->is_commit && rlist_empty(&tx_rows)) { >> + if (packet->is_commit && rlist_empty(&tx_rows_list)) { >> relay_send(relay, packet); >> return; >> } >> @@ -857,11 +862,13 @@ relay_collect_tx(struct xstream *stream, struct >> xrow_header *packet) >> "struct relay_tx_row"); >> } >> tx_row->row = *packet; >> - rlist_add_tail_entry(&tx_rows, tx_row, link); >> + rlist_add_tail_entry(&tx_rows_list, tx_row, link); >> if (packet->is_commit) { >> -write: relay_send_tx(relay, &tx_rows); >> - tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); >> +write: >> + relay_send_tx(relay, &tx_rows_list); >> + tx_rows_list = RLIST_HEAD_INITIALIZER(tx_rows_list); >> + >> /* >> * Free all the relay_tx_rows allocated on the >> * fiber region. > This patch needs some more thought. I noticed test failures on GitLab: https://gitlab.com/tarantool/tarantool/-/jobs/559102582 Looks like the problem is with row bodies, which are stored in an ibuf owned by xlog_cursor. The cursor was designed to work in a row-by-row manner, just like the recovery system, which calls stream_write() on each recovered row. So the ibuf may be invalidated after recovery writes a single row (i.e. after a single call to relay_collect_tx()). Even though I copy the xrow_header itself, the header contains pointers to its body which is stored in the xlog_cursor's ibuf, and it gets rewritten as soon as ibuf_reserve_slow() is called. -- Serge Petrenko ^ permalink raw reply [flat|nested] 11+ messages in thread
end of thread, other threads:[~2020-05-19 16:24 UTC | newest] Thread overview: 11+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2020-05-18 12:24 [Tarantool-patches] [PATCH 0/2] fix replication tx boundaries after local space rework Serge Petrenko 2020-05-18 12:24 ` [Tarantool-patches] [PATCH 1/2] wal: fix tx boundaries Serge Petrenko 2020-05-19 9:08 ` Cyrill Gorcunov 2020-05-19 10:30 ` Serge Petrenko 2020-05-18 12:24 ` [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches Serge Petrenko 2020-05-18 12:28 ` Serge Petrenko 2020-05-19 10:23 ` Cyrill Gorcunov 2020-05-19 10:49 ` Serge Petrenko 2020-05-19 11:18 ` Cyrill Gorcunov 2020-05-19 12:31 ` Serge Petrenko 2020-05-19 16:24 ` Serge Petrenko
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox