[Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches
Serge Petrenko
sergepetrenko at tarantool.org
Tue May 19 19:24:46 MSK 2020
19.05.2020 15:31, Serge Petrenko пишет:
>
> 19.05.2020 14:18, Cyrill Gorcunov пишет:
>> On Tue, May 19, 2020 at 01:49:10PM +0300, Serge Petrenko wrote:
>> ...
>>>>> + if (packet->is_commit) {
>>>>> +write: relay_send_tx(relay, &tx_rows);
>>>>> + tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
>>>> Why?! This is stack local variable, no?
>>> Why? It's static.
>> Yup, just found out ;) It is so hidden in the other vars
>> declaration so was not obvious (a least for me :-). I'm
>> appending the more clear approach.
>>
>> But Serge, this is my personal opinion, I'm fine with your
>> current code as well, so
>>
>> Reviewed-by: Cyrill Gorcunov <gorcunov at gmail.com>
>>
>> it is up to you to pick or drop the diff below
>
> Thanks! Looks good, applied.
>
>> ---
>> src/box/relay.cc | 37 ++++++++++++++++++++++---------------
>> 1 file changed, 22 insertions(+), 15 deletions(-)
>>
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index 17b7bc667..27424e0ca 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -760,12 +760,12 @@ struct relay_tx_row {
>> };
>> static void
>> -relay_send_tx(struct relay *relay, struct rlist *tx_rows)
>> +relay_send_tx(struct relay *relay, struct rlist *head)
>> {
>> ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
>> struct relay_tx_row *tx_row;
>> - rlist_foreach_entry(tx_row, tx_rows, link) {
>> + rlist_foreach_entry(tx_row, head, link) {
>> tx_row->row.sync = relay->sync;
>> coio_write_xrow(&relay->io, &tx_row->row);
>> }
>> @@ -784,10 +784,13 @@ static void
>> relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
>> {
>> struct relay *relay = container_of(stream, struct relay, stream);
>> - static RLIST_HEAD(tx_rows);
>> - struct errinj *inj;
>> struct relay_tx_row *tx_row;
>> + struct errinj *inj;
>> +
>> + static RLIST_HEAD(tx_rows_list);
>> +
>> assert(iproto_type_is_dml(packet->type));
>> +
>> if (packet->group_id == GROUP_LOCAL) {
>> /*
>> * We do not relay replica-local rows to other
>> @@ -805,20 +808,22 @@ relay_collect_tx(struct xstream *stream, struct
>> xrow_header *packet)
>> * in case the transaction is not fully
>> * local.
>> */
>> - if (packet->is_commit && !rlist_empty(&tx_rows)) {
>> - rlist_last_entry(&tx_rows, struct relay_tx_row,
>> - link)->row.is_commit = true;
>> - goto write;
>> - }
>> - return;
>> + if (!packet->is_commit || rlist_empty(&tx_rows_list))
>> + return;
>> +
>> + rlist_last_entry(&tx_rows_list, struct relay_tx_row,
>> + link)->row.is_commit = true;
>> + goto write;
>> }
>> packet->type = IPROTO_NOP;
>> packet->group_id = GROUP_DEFAULT;
>> packet->bodycnt = 0;
>> }
>> +
>> /* Check if the rows from the instance are filtered. */
>> - if ((1 << packet->replica_id & relay->id_filter) != 0)
>> + if (((1u << packet->replica_id) & relay->id_filter) != 0)
>> return;
>> +
>> /*
>> * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
>> * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
>> @@ -845,7 +850,7 @@ relay_collect_tx(struct xstream *stream, struct
>> xrow_header *packet)
>> }
>> /* A short path for single-statement transactions. */
>> - if (packet->is_commit && rlist_empty(&tx_rows)) {
>> + if (packet->is_commit && rlist_empty(&tx_rows_list)) {
>> relay_send(relay, packet);
>> return;
>> }
>> @@ -857,11 +862,13 @@ relay_collect_tx(struct xstream *stream, struct
>> xrow_header *packet)
>> "struct relay_tx_row");
>> }
>> tx_row->row = *packet;
>> - rlist_add_tail_entry(&tx_rows, tx_row, link);
>> + rlist_add_tail_entry(&tx_rows_list, tx_row, link);
>> if (packet->is_commit) {
>> -write: relay_send_tx(relay, &tx_rows);
>> - tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
>> +write:
>> + relay_send_tx(relay, &tx_rows_list);
>> + tx_rows_list = RLIST_HEAD_INITIALIZER(tx_rows_list);
>> +
>> /*
>> * Free all the relay_tx_rows allocated on the
>> * fiber region.
>
This patch needs some more thought. I noticed test failures on GitLab:
https://gitlab.com/tarantool/tarantool/-/jobs/559102582
Looks like the problem is with row bodies, which are stored in an ibuf
owned by xlog_cursor. The cursor was designed to work in a row-by-row
manner, just like the recovery system, which calls stream_write() on each
recovered row. So the ibuf may be invalidated after recovery writes a single
row (i.e. after a single call to relay_collect_tx()).
Even though I copy the xrow_header itself, the header contains pointers
to its
body which is stored in the xlog_cursor's ibuf, and it gets rewritten as
soon as
ibuf_reserve_slow() is called.
--
Serge Petrenko
More information about the Tarantool-patches
mailing list