[Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches

Cyrill Gorcunov gorcunov at gmail.com
Tue May 19 13:23:03 MSK 2020


On Mon, May 18, 2020 at 03:28:46PM +0300, Serge Petrenko wrote:
> 
> Sorry for the mess, the correct patch is below:

space/tabs are screwed (but since we're usually fetching from
the branch this is not critical)

...
> 
> -/** Send a single row to the client. */
> +struct relay_tx_row {
> +    struct rlist link;
> +    struct xrow_header row;
> +};
> +
>  static void
> -relay_send_row(struct xstream *stream, struct xrow_header *packet)
> +relay_send_tx(struct relay *relay, struct rlist *tx_rows)
> +{
> +    ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
> +
> +    struct relay_tx_row *tx_row;
> +    rlist_foreach_entry(tx_row, tx_rows, link) {
> +        tx_row->row.sync = relay->sync;
> +        coio_write_xrow(&relay->io, &tx_row->row);
> +    }
> +    relay->last_row_time = ev_monotonic_now(loop());
> +
> +    struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
> +    if (inj != NULL && inj->dparam > 0)
> +        fiber_sleep(inj->dparam);
> +}
> +
> +/**
> + * Collect all the rows belonging to a single transaction and
> + * send them at once.
> + */
> +static void
> +relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
>  {
>      struct relay *relay = container_of(stream, struct relay, stream);
> +    static RLIST_HEAD(tx_rows);
> +    struct errinj *inj;
> +    struct relay_tx_row *tx_row;
>      assert(iproto_type_is_dml(packet->type));
>      if (packet->group_id == GROUP_LOCAL) {
>          /*
> @@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct
> xrow_header *packet)
>           * order to correctly promote the vclock on the
>           * replica.
>           */
> -        if (packet->replica_id == REPLICA_ID_NIL)
> +        if (packet->replica_id == REPLICA_ID_NIL) {
> +            /*
> +             * Make sure is_commit flag from the
> +             * local row makes it to the replica,
> +             * in case the transaction is not fully
> +             * local.
> +             */
> +            if (packet->is_commit && !rlist_empty(&tx_rows)) {
> +                rlist_last_entry(&tx_rows, struct relay_tx_row,
> +                         link)->row.is_commit = true;
> +                goto write;
> +            }

I guees we can do simplier

		if (!packet->is_commit || rlist_empty(&tx_rows))
			return;

		rlist_last_entry(&tx_rows, struct relay_tx_row,
			link)->row.is_commit = true;
		goto write;
?

>              return;
> +        }
>          packet->type = IPROTO_NOP;
>          packet->group_id = GROUP_DEFAULT;
>          packet->bodycnt = 0;
> @@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct
> xrow_header *packet)
>       * it). In the latter case packet's LSN is less than or equal to
>       * local master's LSN at the moment it received 'SUBSCRIBE' request.
>       */
> -    if (relay->replica == NULL ||
> -        packet->replica_id != relay->replica->id ||
> -        packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
> -                      packet->replica_id)) {
> -        struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN,
> -                        ERRINJ_INT);
> -        if (inj != NULL && packet->lsn == inj->iparam) {
> -            packet->lsn = inj->iparam - 1;
> -            say_warn("injected broken lsn: %lld",
> -                 (long long) packet->lsn);
> -        }
> +    if (relay->replica != NULL && packet->replica_id == relay->replica->id
> &&
> +        packet->lsn > vclock_get(&relay->local_vclock_at_subscribe,
> +                     packet->replica_id)) {
> +        return;
> +    }
> +
> +    inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT);
> +    if (inj != NULL && packet->lsn == inj->iparam) {
> +        packet->lsn = inj->iparam - 1;
> +        say_warn("injected broken lsn: %lld",
> +             (long long) packet->lsn);
> +    }
> +
> +    /* A short path for single-statement transactions. */
> +    if (packet->is_commit && rlist_empty(&tx_rows)) {
>          relay_send(relay, packet);
> +        return;
> +    }
> +
> +    tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc,
> +                             sizeof(*tx_row));
> +    if (tx_row == NULL) {
> +        tnt_raise(OutOfMemory, sizeof(*tx_row), "region",
> +              "struct relay_tx_row");
> +    }
> +    tx_row->row = *packet;
> +    rlist_add_tail_entry(&tx_rows, tx_row, link);
> +
> +    if (packet->is_commit) {
> +write:        relay_send_tx(relay, &tx_rows);
> +        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);

Why?! This is stack local variable, no?

> +        /*
> +         * Free all the relay_tx_rows allocated on the
> +         * fiber region.
> +         */
> +        fiber_gc();
>      }
>  }


More information about the Tarantool-patches mailing list