From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp43.i.mail.ru (smtp43.i.mail.ru [94.100.177.103]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 1CA39469710 for ; Tue, 19 May 2020 13:49:11 +0300 (MSK) References: <5428970e5b1574f456870fd7dd69ef67b46383c5.1589804001.git.sergepetrenko@tarantool.org> <1b20e028-c6a8-a717-adfb-1737af36cf65@tarantool.org> <20200519102303.GC121099@grain> From: Serge Petrenko Message-ID: Date: Tue, 19 May 2020 13:49:10 +0300 MIME-Version: 1.0 In-Reply-To: <20200519102303.GC121099@grain> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-GB Subject: Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Cyrill Gorcunov Cc: tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org 19.05.2020 13:23, Cyrill Gorcunov пишет: > On Mon, May 18, 2020 at 03:28:46PM +0300, Serge Petrenko wrote: Hi! Thanks for the review! >> Sorry for the mess, the correct patch is below: > space/tabs are screwed (but since we're usually fetching from > the branch this is not critical) > > ... Yep, that's cos I pasted it from the console, sorry. >> -/** Send a single row to the client. */ >> +struct relay_tx_row { >> +    struct rlist link; >> +    struct xrow_header row; >> +}; >> + >>  static void >> -relay_send_row(struct xstream *stream, struct xrow_header *packet) >> +relay_send_tx(struct relay *relay, struct rlist *tx_rows) >> +{ >> +    ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); >> + >> +    struct relay_tx_row *tx_row; >> +    rlist_foreach_entry(tx_row, tx_rows, link) { >> +        tx_row->row.sync = relay->sync; >> +        coio_write_xrow(&relay->io, &tx_row->row); >> +    } >> +    relay->last_row_time = ev_monotonic_now(loop()); >> + >> +    struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE); >> +    if (inj != NULL && inj->dparam > 0) >> +        fiber_sleep(inj->dparam); >> +} >> + >> +/** >> + * Collect all the rows belonging to a single transaction and >> + * send them at once. >> + */ >> +static void >> +relay_collect_tx(struct xstream *stream, struct xrow_header *packet) >>  { >>      struct relay *relay = container_of(stream, struct relay, stream); >> +    static RLIST_HEAD(tx_rows); >> +    struct errinj *inj; >> +    struct relay_tx_row *tx_row; >>      assert(iproto_type_is_dml(packet->type)); >>      if (packet->group_id == GROUP_LOCAL) { >>          /* >> @@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct >> xrow_header *packet) >>           * order to correctly promote the vclock on the >>           * replica. >>           */ >> -        if (packet->replica_id == REPLICA_ID_NIL) >> +        if (packet->replica_id == REPLICA_ID_NIL) { >> +            /* >> +             * Make sure is_commit flag from the >> +             * local row makes it to the replica, >> +             * in case the transaction is not fully >> +             * local. >> +             */ >> +            if (packet->is_commit && !rlist_empty(&tx_rows)) { >> +                rlist_last_entry(&tx_rows, struct relay_tx_row, >> +                         link)->row.is_commit = true; >> +                goto write; >> +            } > I guees we can do simplier > > if (!packet->is_commit || rlist_empty(&tx_rows)) > return; > > rlist_last_entry(&tx_rows, struct relay_tx_row, > link)->row.is_commit = true; > goto write; > ? Yes, we can, thanks. I applied your diff. >>              return; >> +        } >>          packet->type = IPROTO_NOP; >>          packet->group_id = GROUP_DEFAULT; >>          packet->bodycnt = 0; >> @@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct >> xrow_header *packet) >>       * it). In the latter case packet's LSN is less than or equal to >>       * local master's LSN at the moment it received 'SUBSCRIBE' request. >>       */ >> -    if (relay->replica == NULL || >> -        packet->replica_id != relay->replica->id || >> -        packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe, >> -                      packet->replica_id)) { >> -        struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN, >> -                        ERRINJ_INT); >> -        if (inj != NULL && packet->lsn == inj->iparam) { >> -            packet->lsn = inj->iparam - 1; >> -            say_warn("injected broken lsn: %lld", >> -                 (long long) packet->lsn); >> -        } >> +    if (relay->replica != NULL && packet->replica_id == relay->replica->id >> && >> +        packet->lsn > vclock_get(&relay->local_vclock_at_subscribe, >> +                     packet->replica_id)) { >> +        return; >> +    } >> + >> +    inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT); >> +    if (inj != NULL && packet->lsn == inj->iparam) { >> +        packet->lsn = inj->iparam - 1; >> +        say_warn("injected broken lsn: %lld", >> +             (long long) packet->lsn); >> +    } >> + >> +    /* A short path for single-statement transactions. */ >> +    if (packet->is_commit && rlist_empty(&tx_rows)) { >>          relay_send(relay, packet); >> +        return; >> +    } >> + >> +    tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc, >> +                             sizeof(*tx_row)); >> +    if (tx_row == NULL) { >> +        tnt_raise(OutOfMemory, sizeof(*tx_row), "region", >> +              "struct relay_tx_row"); >> +    } >> +    tx_row->row = *packet; >> +    rlist_add_tail_entry(&tx_rows, tx_row, link); >> + >> +    if (packet->is_commit) { >> +write:        relay_send_tx(relay, &tx_rows); >> +        tx_rows = RLIST_HEAD_INITIALIZER(tx_rows); > Why?! This is stack local variable, no? Why? It's static. > >> +        /* >> +         * Free all the relay_tx_rows allocated on the >> +         * fiber region. >> +         */ >> +        fiber_gc(); >>      } >>  } -- Serge Petrenko