From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail-lf1-f66.google.com (mail-lf1-f66.google.com [209.85.167.66]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 26DD0469710 for ; Tue, 19 May 2020 13:23:07 +0300 (MSK) Received: by mail-lf1-f66.google.com with SMTP id a9so10754588lfb.8 for ; Tue, 19 May 2020 03:23:07 -0700 (PDT) Date: Tue, 19 May 2020 13:23:03 +0300 From: Cyrill Gorcunov Message-ID: <20200519102303.GC121099@grain> References: <5428970e5b1574f456870fd7dd69ef67b46383c5.1589804001.git.sergepetrenko@tarantool.org> <1b20e028-c6a8-a717-adfb-1737af36cf65@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset="iso-8859-1" Content-Disposition: inline Content-Transfer-Encoding: quoted-printable In-Reply-To: <1b20e028-c6a8-a717-adfb-1737af36cf65@tarantool.org> Subject: Re: [Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Serge Petrenko Cc: tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org On Mon, May 18, 2020 at 03:28:46PM +0300, Serge Petrenko wrote: >=20 > Sorry for the mess, the correct patch is below: space/tabs are screwed (but since we're usually fetching from the branch this is not critical) ... >=20 > -/** Send a single row to the client. */ > +struct relay_tx_row { > +=A0=A0=A0 struct rlist link; > +=A0=A0=A0 struct xrow_header row; > +}; > + > =A0static void > -relay_send_row(struct xstream *stream, struct xrow_header *packet) > +relay_send_tx(struct relay *relay, struct rlist *tx_rows) > +{ > +=A0=A0=A0 ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY); > + > +=A0=A0=A0 struct relay_tx_row *tx_row; > +=A0=A0=A0 rlist_foreach_entry(tx_row, tx_rows, link) { > +=A0=A0=A0 =A0=A0=A0 tx_row->row.sync =3D relay->sync; > +=A0=A0=A0 =A0=A0=A0 coio_write_xrow(&relay->io, &tx_row->row); > +=A0=A0=A0 } > +=A0=A0=A0 relay->last_row_time =3D ev_monotonic_now(loop()); > + > +=A0=A0=A0 struct errinj *inj =3D errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOU= BLE); > +=A0=A0=A0 if (inj !=3D NULL && inj->dparam > 0) > +=A0=A0=A0 =A0=A0=A0 fiber_sleep(inj->dparam); > +} > + > +/** > + * Collect all the rows belonging to a single transaction and > + * send them at once. > + */ > +static void > +relay_collect_tx(struct xstream *stream, struct xrow_header *packet) > =A0{ > =A0=A0=A0=A0 struct relay *relay =3D container_of(stream, struct relay, s= tream); > +=A0=A0=A0 static RLIST_HEAD(tx_rows); > +=A0=A0=A0 struct errinj *inj; > +=A0=A0=A0 struct relay_tx_row *tx_row; > =A0=A0=A0=A0 assert(iproto_type_is_dml(packet->type)); > =A0=A0=A0=A0 if (packet->group_id =3D=3D GROUP_LOCAL) { > =A0=A0=A0=A0 =A0=A0=A0 /* > @@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct > xrow_header *packet) > =A0=A0=A0=A0 =A0=A0=A0 =A0* order to correctly promote the vclock on the > =A0=A0=A0=A0 =A0=A0=A0 =A0* replica. > =A0=A0=A0=A0 =A0=A0=A0 =A0*/ > -=A0=A0=A0 =A0=A0=A0 if (packet->replica_id =3D=3D REPLICA_ID_NIL) > +=A0=A0=A0 =A0=A0=A0 if (packet->replica_id =3D=3D REPLICA_ID_NIL) { > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 /* > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0* Make sure is_commit flag from the > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0* local row makes it to the replica, > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0* in case the transaction is not fully > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0* local. > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0*/ > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 if (packet->is_commit && !rlist_empty(&tx_= rows)) { > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 rlist_last_entry(&tx_rows, struc= t relay_tx_row, > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0link)->ro= w.is_commit =3D true; > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 goto write; > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 } I guees we can do simplier if (!packet->is_commit || rlist_empty(&tx_rows)) return; rlist_last_entry(&tx_rows, struct relay_tx_row, link)->row.is_commit =3D true; goto write; ? > =A0=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 return; > +=A0=A0=A0 =A0=A0=A0 } > =A0=A0=A0=A0 =A0=A0=A0 packet->type =3D IPROTO_NOP; > =A0=A0=A0=A0 =A0=A0=A0 packet->group_id =3D GROUP_DEFAULT; > =A0=A0=A0=A0 =A0=A0=A0 packet->bodycnt =3D 0; > @@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct > xrow_header *packet) > =A0=A0=A0=A0 =A0* it). In the latter case packet's LSN is less than or eq= ual to > =A0=A0=A0=A0 =A0* local master's LSN at the moment it received 'SUBSCRIBE= ' request. > =A0=A0=A0=A0 =A0*/ > -=A0=A0=A0 if (relay->replica =3D=3D NULL || > -=A0=A0=A0 =A0=A0=A0 packet->replica_id !=3D relay->replica->id || > -=A0=A0=A0 =A0=A0=A0 packet->lsn <=3D vclock_get(&relay->local_vclock_at_= subscribe, > -=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0=A0=A0 packet->replica_= id)) { > -=A0=A0=A0 =A0=A0=A0 struct errinj *inj =3D errinj(ERRINJ_RELAY_BREAK_LSN, > -=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 ERRINJ_INT); > -=A0=A0=A0 =A0=A0=A0 if (inj !=3D NULL && packet->lsn =3D=3D inj->iparam)= { > -=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 packet->lsn =3D inj->iparam - 1; > -=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 say_warn("injected broken lsn: %lld", > -=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0(long long) packet->lsn); > -=A0=A0=A0 =A0=A0=A0 } > +=A0=A0=A0 if (relay->replica !=3D NULL && packet->replica_id =3D=3D rela= y->replica->id > && > +=A0=A0=A0 =A0=A0=A0 packet->lsn > vclock_get(&relay->local_vclock_at_sub= scribe, > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0=A0 packet->replica_id)= ) { > +=A0=A0=A0 =A0=A0=A0 return; > +=A0=A0=A0 } > + > +=A0=A0=A0 inj =3D errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT); > +=A0=A0=A0 if (inj !=3D NULL && packet->lsn =3D=3D inj->iparam) { > +=A0=A0=A0 =A0=A0=A0 packet->lsn =3D inj->iparam - 1; > +=A0=A0=A0 =A0=A0=A0 say_warn("injected broken lsn: %lld", > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0(long long) packet->lsn); > +=A0=A0=A0 } > + > +=A0=A0=A0 /* A short path for single-statement transactions. */ > +=A0=A0=A0 if (packet->is_commit && rlist_empty(&tx_rows)) { > =A0=A0=A0=A0 =A0=A0=A0 relay_send(relay, packet); > +=A0=A0=A0 =A0=A0=A0 return; > +=A0=A0=A0 } > + > +=A0=A0=A0 tx_row =3D (struct relay_tx_row *)region_alloc(&fiber()->gc, > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0=A0=A0=A0= sizeof(*tx_row)); > +=A0=A0=A0 if (tx_row =3D=3D NULL) { > +=A0=A0=A0 =A0=A0=A0 tnt_raise(OutOfMemory, sizeof(*tx_row), "region", > +=A0=A0=A0 =A0=A0=A0 =A0=A0=A0 =A0 "struct relay_tx_row"); > +=A0=A0=A0 } > +=A0=A0=A0 tx_row->row =3D *packet; > +=A0=A0=A0 rlist_add_tail_entry(&tx_rows, tx_row, link); > + > +=A0=A0=A0 if (packet->is_commit) { > +write:=A0=A0=A0 =A0=A0=A0 relay_send_tx(relay, &tx_rows); > +=A0=A0=A0 =A0=A0=A0 tx_rows =3D RLIST_HEAD_INITIALIZER(tx_rows); Why?! This is stack local variable, no? > +=A0=A0=A0 =A0=A0=A0 /* > +=A0=A0=A0 =A0=A0=A0 =A0* Free all the relay_tx_rows allocated on the > +=A0=A0=A0 =A0=A0=A0 =A0* fiber region. > +=A0=A0=A0 =A0=A0=A0 =A0*/ > +=A0=A0=A0 =A0=A0=A0 fiber_gc(); > =A0=A0=A0=A0 } > =A0}