[Tarantool-patches] [PATCH 2/2] replication: make relay send txs in batches
Serge Petrenko
sergepetrenko at tarantool.org
Tue May 19 13:49:10 MSK 2020
19.05.2020 13:23, Cyrill Gorcunov пишет:
> On Mon, May 18, 2020 at 03:28:46PM +0300, Serge Petrenko wrote:
Hi! Thanks for the review!
>> Sorry for the mess, the correct patch is below:
> space/tabs are screwed (but since we're usually fetching from
> the branch this is not critical)
>
> ...
Yep, that's cos I pasted it from the console, sorry.
>> -/** Send a single row to the client. */
>> +struct relay_tx_row {
>> + struct rlist link;
>> + struct xrow_header row;
>> +};
>> +
>> static void
>> -relay_send_row(struct xstream *stream, struct xrow_header *packet)
>> +relay_send_tx(struct relay *relay, struct rlist *tx_rows)
>> +{
>> + ERROR_INJECT_YIELD(ERRINJ_RELAY_SEND_DELAY);
>> +
>> + struct relay_tx_row *tx_row;
>> + rlist_foreach_entry(tx_row, tx_rows, link) {
>> + tx_row->row.sync = relay->sync;
>> + coio_write_xrow(&relay->io, &tx_row->row);
>> + }
>> + relay->last_row_time = ev_monotonic_now(loop());
>> +
>> + struct errinj *inj = errinj(ERRINJ_RELAY_TIMEOUT, ERRINJ_DOUBLE);
>> + if (inj != NULL && inj->dparam > 0)
>> + fiber_sleep(inj->dparam);
>> +}
>> +
>> +/**
>> + * Collect all the rows belonging to a single transaction and
>> + * send them at once.
>> + */
>> +static void
>> +relay_collect_tx(struct xstream *stream, struct xrow_header *packet)
>> {
>> struct relay *relay = container_of(stream, struct relay, stream);
>> + static RLIST_HEAD(tx_rows);
>> + struct errinj *inj;
>> + struct relay_tx_row *tx_row;
>> assert(iproto_type_is_dml(packet->type));
>> if (packet->group_id == GROUP_LOCAL) {
>> /*
>> @@ -770,8 +798,20 @@ relay_send_row(struct xstream *stream, struct
>> xrow_header *packet)
>> * order to correctly promote the vclock on the
>> * replica.
>> */
>> - if (packet->replica_id == REPLICA_ID_NIL)
>> + if (packet->replica_id == REPLICA_ID_NIL) {
>> + /*
>> + * Make sure is_commit flag from the
>> + * local row makes it to the replica,
>> + * in case the transaction is not fully
>> + * local.
>> + */
>> + if (packet->is_commit && !rlist_empty(&tx_rows)) {
>> + rlist_last_entry(&tx_rows, struct relay_tx_row,
>> + link)->row.is_commit = true;
>> + goto write;
>> + }
> I guees we can do simplier
>
> if (!packet->is_commit || rlist_empty(&tx_rows))
> return;
>
> rlist_last_entry(&tx_rows, struct relay_tx_row,
> link)->row.is_commit = true;
> goto write;
> ?
Yes, we can, thanks. I applied your diff.
>> return;
>> + }
>> packet->type = IPROTO_NOP;
>> packet->group_id = GROUP_DEFAULT;
>> packet->bodycnt = 0;
>> @@ -791,17 +831,41 @@ relay_send_row(struct xstream *stream, struct
>> xrow_header *packet)
>> * it). In the latter case packet's LSN is less than or equal to
>> * local master's LSN at the moment it received 'SUBSCRIBE' request.
>> */
>> - if (relay->replica == NULL ||
>> - packet->replica_id != relay->replica->id ||
>> - packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
>> - packet->replica_id)) {
>> - struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN,
>> - ERRINJ_INT);
>> - if (inj != NULL && packet->lsn == inj->iparam) {
>> - packet->lsn = inj->iparam - 1;
>> - say_warn("injected broken lsn: %lld",
>> - (long long) packet->lsn);
>> - }
>> + if (relay->replica != NULL && packet->replica_id == relay->replica->id
>> &&
>> + packet->lsn > vclock_get(&relay->local_vclock_at_subscribe,
>> + packet->replica_id)) {
>> + return;
>> + }
>> +
>> + inj = errinj(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT);
>> + if (inj != NULL && packet->lsn == inj->iparam) {
>> + packet->lsn = inj->iparam - 1;
>> + say_warn("injected broken lsn: %lld",
>> + (long long) packet->lsn);
>> + }
>> +
>> + /* A short path for single-statement transactions. */
>> + if (packet->is_commit && rlist_empty(&tx_rows)) {
>> relay_send(relay, packet);
>> + return;
>> + }
>> +
>> + tx_row = (struct relay_tx_row *)region_alloc(&fiber()->gc,
>> + sizeof(*tx_row));
>> + if (tx_row == NULL) {
>> + tnt_raise(OutOfMemory, sizeof(*tx_row), "region",
>> + "struct relay_tx_row");
>> + }
>> + tx_row->row = *packet;
>> + rlist_add_tail_entry(&tx_rows, tx_row, link);
>> +
>> + if (packet->is_commit) {
>> +write: relay_send_tx(relay, &tx_rows);
>> + tx_rows = RLIST_HEAD_INITIALIZER(tx_rows);
> Why?! This is stack local variable, no?
Why? It's static.
>
>> + /*
>> + * Free all the relay_tx_rows allocated on the
>> + * fiber region.
>> + */
>> + fiber_gc();
>> }
>> }
--
Serge Petrenko
More information about the Tarantool-patches
mailing list