[Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches
Serge Petrenko
sergepetrenko at tarantool.org
Tue May 19 15:31:30 MSK 2020
19.05.2020 14:18, Cyrill Gorcunov пишет:
> On Tue, May 19, 2020 at 01:49:10PM +0300, Serge Petrenko wrote:
> ...
>>>> + if (packet->is_commit) {
>>>> +write: relay_send_tx(relay, &tx_rows);
>>>> + tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
>>> Why?! This is stack local variable, no?
>> Why? It's static.
> Yup, just found out ;) It is so hidden in the other vars
> declaration so was not obvious (a least for me :-). I'm
> appending the more clear approach.
>
> But Serge, this is my personal opinion, I'm fine with your
> current code as well, so
>
> Reviewed-by: Cyrill Gorcunov <gorcunov at gmail.com>
>
> it is up to you to pick or drop the diff below
Thanks! Looks good, applied.
> ---
> src/box/relay.cc | 37 ++++++++++++++++++++++---------------
> 1 file changed, 22 insertions(+), 15 deletions(-)
>
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 17b7bc667..27424e0ca 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -760,12 +760,12 @@ struct relay_tx_row {
> };
>
> static void
> -relay_send_tx(struct relay *relay, struct rlist *tx_rows)
> +relay_send_tx(struct relay *relay, struct rlist *head)
> {
> ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
>
> struct relay_tx_row *tx_row;
> - rlist_foreach_entry(tx_row, tx_rows, link) {
> + rlist_foreach_entry(tx_row, head, link) {
> tx_row->row.sync = relay->sync;
> coio_write_xrow(&relay->io, &tx_row->row);
> }
> @@ -784,10 +784,13 @@ static void
> relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
> {
> struct relay *relay = container_of(stream, struct relay, stream);
> - static RLIST_HEAD(tx_rows);
> - struct errinj *inj;
> struct relay_tx_row *tx_row;
> + struct errinj *inj;
> +
> + static RLIST_HEAD(tx_rows_list);
> +
> assert(iproto_type_is_dml(packet->type));
> +
> if (packet->group_id == GROUP_LOCAL) {
> /*
> * We do not relay replica-local rows to other
> @@ -805,20 +808,22 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
> * in case the transaction is not fully
> * local.
> */
> - if (packet->is_commit && !rlist_empty(&tx_rows)) {
> - rlist_last_entry(&tx_rows, struct relay_tx_row,
> - link)->row.is_commit = true;
> - goto write;
> - }
> - return;
> + if (!packet->is_commit || rlist_empty(&tx_rows_list))
> + return;
> +
> + rlist_last_entry(&tx_rows_list, struct relay_tx_row,
> + link)->row.is_commit = true;
> + goto write;
> }
> packet->type = IPROTO_NOP;
> packet->group_id = GROUP_DEFAULT;
> packet->bodycnt = 0;
> }
> +
> /* Check if the rows from the instance are filtered. */
> - if ((1 << packet->replica_id & relay->id_filter) != 0)
> + if (((1u << packet->replica_id) & relay->id_filter) != 0)
> return;
> +
> /*
> * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
> * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
> @@ -845,7 +850,7 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
> }
>
> /* A short path for single-statement transactions. */
> - if (packet->is_commit && rlist_empty(&tx_rows)) {
> + if (packet->is_commit && rlist_empty(&tx_rows_list)) {
> relay_send(relay, packet);
> return;
> }
> @@ -857,11 +862,13 @@ relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
> "struct relay_tx_row");
> }
> tx_row->row = *packet;
> - rlist_add_tail_entry(&tx_rows, tx_row, link);
> + rlist_add_tail_entry(&tx_rows_list, tx_row, link);
>
> if (packet->is_commit) {
> -write: relay_send_tx(relay, &tx_rows);
> - tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
> +write:
> + relay_send_tx(relay, &tx_rows_list);
> + tx_rows_list = RLIST_HEAD_INITIALIZER(tx_rows_list);
> +
> /*
> * Free all the relay_tx_rows allocated on the
> * fiber region.
--
Serge Petrenko
More information about the Tarantool-patches
mailing list