diff --git a/src/box/applier.cc b/src/box/applier.cc index 6239fcfd3..51c0a3c07 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -635,13 +635,19 @@ applier_txn_commit_cb(struct trigger *trigger, void *event) /** * Apply all rows in the rows queue as a single transaction. * - * Return 0 for success or -1 in case of an error. + * Return 0 when a transaction was applied and sent to wal + * 1 when a transaction was skipped + * -1 in case of an error. */ static int applier_apply_tx(struct stailq *rows) { struct xrow_header *first_row = &stailq_first_entry(rows, struct applier_tx_row, next)->row; + /* Check if transaction is a heartbeat message. */ + if (first_row->lsn == 0) + return 1; + struct replica *replica = replica_by_id(first_row->replica_id); /* * In a full mesh topology, the same set of changes @@ -655,7 +661,7 @@ applier_apply_tx(struct stailq *rows) if (vclock_get(&replicaset.applier.vclock, first_row->replica_id) >= first_row->lsn) { latch_unlock(latch); - return 0; + return 1; } /** @@ -902,15 +908,16 @@ applier_subscribe(struct applier *applier) applier_read_tx(applier, &rows); applier->last_row_time = ev_monotonic_now(loop()); + int apply_res = applier_apply_tx(&rows); + + if (apply_res < 0) + diag_raise(); /* - * In case of an heartbeat message wake a writer up - * and check applier state. + * In case of an heartbeat message or already applied + * transaction wake a writer up and check applier state. */ - if (stailq_first_entry(&rows, struct applier_tx_row, - next)->row.lsn == 0) + if (apply_res == 1) fiber_cond_signal(&applier->writer_cond); - else if (applier_apply_tx(&rows) != 0) - diag_raise(); if (ibuf_used(ibuf) == 0) ibuf_reset(ibuf);