From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp48.i.mail.ru (smtp48.i.mail.ru [94.100.177.108]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 514AF42EF5C for ; Mon, 22 Jun 2020 13:34:27 +0300 (MSK) References: <3210e1e6f867cfd1c1f65e05f28a32deae63c172.1591701695.git.sergepetrenko@tarantool.org> <5a4a21b5-b422-5516-ad26-b1637fd235b0@tarantool.org> From: Serge Petrenko Message-ID: Date: Mon, 22 Jun 2020 13:34:25 +0300 MIME-Version: 1.0 In-Reply-To: <5a4a21b5-b422-5516-ad26-b1637fd235b0@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: en-GB Subject: Re: [Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Leonid Vasiliev , v.shpilevoy@tarantool.org, sergos@tarantool.org, gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org 20.06.2020 18:06, Leonid Vasiliev пишет: > Hi! Thank you for the patch. > LGTM. > All the following comments can be skipped silently. > > On 09.06.2020 15:20, Serge Petrenko wrote: >> Make txn_limbo write a CONFIRM entry as soon as a batch of entries >> receive their acks. CONFIRM entry is written to WAL and later replicated >> to all the replicas. >> >> Now replicas put synchronous transactions into txn_limbo and wait for >> corresponding confirmation entries to arrive and end up in their WAL >> before committing the transactions. >> >> Part-of #4847 >> --- >>   src/box/applier.cc    | 81 ++++++++++++++++++++++++++++++++++++++++++- >>   src/box/box.cc        |  3 ++ >>   src/box/errcode.h     |  1 + >>   src/box/relay.cc      | 13 ++++--- >>   src/box/txn.c         | 75 ++++++++++++++++++++++++++++++--------- >>   src/box/txn.h         | 23 ++++++++++++ >>   src/box/txn_limbo.c   | 79 ++++++++++++++++++++++++++++++++++++----- >>   src/box/txn_limbo.h   |  6 ++++ >>   test/box/error.result |  1 + >>   9 files changed, 252 insertions(+), 30 deletions(-) >> >> diff --git a/src/box/applier.cc b/src/box/applier.cc >> index df48b4796..1dc977424 100644 >> --- a/src/box/applier.cc >> +++ b/src/box/applier.cc >> @@ -51,6 +51,7 @@ >>   #include "txn.h" >>   #include "box.h" >>   #include "scoped_guard.h" >> +#include "txn_limbo.h" >>     STRS(applier_state, applier_STATE); >>   @@ -214,6 +215,11 @@ apply_snapshot_row(struct xrow_header *row) >>       struct txn *txn = txn_begin(); >>       if (txn == NULL) >>           return -1; >> +    /* >> +     * Do not wait for confirmation when fetching a snapshot. >> +     * Master only sends confirmed rows during join. >> +     */ >> +    txn_force_async(txn); >>       if (txn_begin_stmt(txn, space) != 0) >>           goto rollback; >>       /* no access checks here - applier always works with admin >> privs */ >> @@ -249,10 +255,73 @@ process_nop(struct request *request) >>       return txn_commit_stmt(txn, request); >>   } >>   +/* >> + * An on_commit trigger set on a txn containing a CONFIRM entry. >> + * Confirms some of the txs waiting in txn_limbo. > > In txn.h "txns" notation is used (up to you). Thanks! Fixed. > >> + */ >> +static int >> +applier_on_confirm(struct trigger *trig, void *data) >> +{ >> +    (void) trig; >> +    int64_t lsn = *(int64_t *)data; >> +    txn_limbo_read_confirm(&txn_limbo, lsn); >> +    return 0; >> +} >> + >> +static int >> +process_confirm(struct request *request) >> +{ >> +    assert(request->header->type = IPROTO_CONFIRM); >> +    uint32_t replica_id; >> +    struct txn *txn = in_txn(); >> +    int64_t *lsn = (int64_t *) region_alloc(&txn->region, >> sizeof(int64_t)); >> +    if (lsn == NULL) { >> +        diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn"); >> +        return -1; >> +    } >> +    if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0) >> +        return -1; >> +    /* >> +     * on_commit trigger failure is not allowed, so check for >> +     * instance id early. >> +     */ >> +    if (replica_id != txn_limbo.instance_id) { >> +        diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id, >> +             txn_limbo.instance_id); >> +        return -1; >> +    } >> + >> +    /* >> +     * Set an on_commit trigger which will perform the actual >> +     * confirmation processing. >> +     */ >> +    struct trigger *trig = (struct trigger *)region_alloc(&txn->region, >> +                                  sizeof(*trig)); >> +    if (trig == NULL) { >> +        diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig"); >> +        return -1; >> +    } >> +    trigger_create(trig, applier_on_confirm, lsn, NULL); >> + >> +    if (txn_begin_stmt(txn, NULL) != 0) >> +        return -1; >> + >> +    if (txn_commit_stmt(txn, request) == 0) { >> +        txn_on_commit(txn, trig); >> +        return 0; >> +    } else { >> +        return -1; >> +    } > > Is I understood corectly that this is a trick like in the process_nop() > to promote vclock and ...? Maybe, add a coment? Yes, it's somewhat similar to process_nop. The difference is that unlike everything else that applier receives, CONFIRM and ROLLBACK aren't dml requests, and cannot be processed in the same manner. They require some txn_limbo operations tied to their processing.  Actually, I've just invented a comment you ask for. Added: @@ -268,6 +268,12 @@ applier_on_confirm(struct trigger *trig, void *data)         return 0;  } +/* + * CONFIRM entries aren't dml requests and require special + * handling: they're written to WAL right away, without any space + * operations, and, once they're committed, txn_limbo performs the + * actions dictated by the received CONFIRM entry. + */  static int  process_confirm(struct request *request)  { > >> +} >> + >>   static int >>   apply_row(struct xrow_header *row) >>   { >>       struct request request; >> +    if (row->type == IPROTO_CONFIRM) { >> +        request.header = row; >> +        return process_confirm(&request); >> +    } >>       if (xrow_decode_dml(row, &request, >> dml_request_key_map(row->type)) != 0) >>           return -1; >>       if (request.type == IPROTO_NOP) >> @@ -273,6 +342,11 @@ apply_final_join_row(struct xrow_header *row) >>       struct txn *txn = txn_begin(); >>       if (txn == NULL) >>           return -1; >> +    /* >> +     * Do not wait for confirmation while processing final >> +     * join rows. See apply_snapshot_row(). >> +     */ >> +    txn_force_async(txn); >>       if (apply_row(row) != 0) { >>           txn_rollback(txn); >>           fiber_gc(); >> @@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, >> uint64_t row_count) >>           applier->last_row_time = ev_monotonic_now(loop()); >>           if (iproto_type_is_dml(row.type)) { >> vclock_follow_xrow(&replicaset.vclock, &row); >> -            if (apply_final_join_row(&row) != 0) >> +            /* >> +             * Confirms are ignored during join. All the >> +             * data master sends us is valid. >> +             */ >> +            if (row.type != IPROTO_CONFIRM && >> +                apply_final_join_row(&row) != 0) >>                   diag_raise(); >>               if (++row_count % 100000 == 0) >>                   say_info("%.1fM rows received", row_count / 1e6); >> diff --git a/src/box/box.cc b/src/box/box.cc >> index 64ac89975..792c3c394 100644 >> --- a/src/box/box.cc >> +++ b/src/box/box.cc >> @@ -342,6 +342,9 @@ static void >>   apply_wal_row(struct xstream *stream, struct xrow_header *row) >>   { >>       struct request request; >> +    // TODO: process confirmation during recovery. >> +    if (row->type == IPROTO_CONFIRM) >> +        return; >>       xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); >>       if (request.type != IPROTO_NOP) { >>           struct space *space = space_cache_find_xc(request.space_id); >> diff --git a/src/box/errcode.h b/src/box/errcode.h >> index 019c582af..3ba6866e5 100644 >> --- a/src/box/errcode.h >> +++ b/src/box/errcode.h >> @@ -267,6 +267,7 @@ struct errcode_record { >>       /*212 */_(ER_SEQUENCE_NOT_STARTED, "Sequence '%s' is not >> started") \ >>       /*213 */_(ER_NO_SUCH_SESSION_SETTING,    "Session setting %s >> doesn't exist") \ >>       /*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted >> sync transactions from other instance with id %u") \ >> +    /*215 */_(ER_SYNC_MASTER_MISMATCH,    "CONFIRM message arrived >> for an unknown master id %d, expected %d") \ >>     /* >>    * !IMPORTANT! Please follow instructions at start of the file >> diff --git a/src/box/relay.cc b/src/box/relay.cc >> index 333e91ea9..4df3c2f26 100644 >> --- a/src/box/relay.cc >> +++ b/src/box/relay.cc >> @@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg) >>       vclock_copy(&status->relay->tx.vclock, &status->vclock); >>       /* >>        * Let pending synchronous transactions know, which of >> -     * them were successfully sent to the replica. >> +     * them were successfully sent to the replica. Acks are >> +     * collected only on the master. Other instances wait for >> +     * master's CONFIRM message instead. >>        */ >> -    txn_limbo_ack(&txn_limbo, status->relay->replica->id, >> -             vclock_get(&status->vclock, instance_id)); >> +    if (txn_limbo.instance_id == instance_id) { >> +        txn_limbo_ack(&txn_limbo, status->relay->replica->id, >> +                  vclock_get(&status->vclock, instance_id)); >> +    } >>       static const struct cmsg_hop route[] = { >>           {relay_status_update, NULL} >>       }; >> @@ -766,7 +770,8 @@ static void >>   relay_send_row(struct xstream *stream, struct xrow_header *packet) >>   { >>       struct relay *relay = container_of(stream, struct relay, stream); >> -    assert(iproto_type_is_dml(packet->type)); >> +    assert(iproto_type_is_dml(packet->type) || >> +           packet->type == IPROTO_CONFIRM); >>       if (packet->group_id == GROUP_LOCAL) { >>           /* >>            * We do not relay replica-local rows to other >> diff --git a/src/box/txn.c b/src/box/txn.c >> index a65100b31..3b331fecc 100644 >> --- a/src/box/txn.c >> +++ b/src/box/txn.c >> @@ -36,6 +36,7 @@ >>   #include >>   #include "xrow.h" >>   #include "errinj.h" >> +#include "iproto_constants.h" >>     double too_long_threshold; >>   @@ -81,7 +82,12 @@ txn_add_redo(struct txn *txn, struct txn_stmt >> *stmt, struct request *request) >>        */ >>       struct space *space = stmt->space; >>       row->group_id = space != NULL ? space_group_id(space) : 0; >> -    row->bodycnt = xrow_encode_dml(request, &txn->region, row->body); >> +    /* >> +     * IPROTO_CONFIRM entries are supplementary and aren't >> +     * valid dml requests. They're encoded manually. >> +     */ >> +    if (likely(row->type != IPROTO_CONFIRM)) >> +        row->bodycnt = xrow_encode_dml(request, &txn->region, >> row->body); >>       if (row->bodycnt < 0) >>           return -1; >>       stmt->row = row; >> @@ -321,8 +327,10 @@ txn_commit_stmt(struct txn *txn, struct request >> *request) >>        */ >>       struct txn_stmt *stmt = txn_current_stmt(txn); >>   -    /* Create WAL record for the write requests in non-temporary >> spaces. >> -     * stmt->space can be NULL for IRPOTO_NOP. >> +    /* >> +     * Create WAL record for the write requests in >> +     * non-temporary spaces. stmt->space can be NULL for >> +     * IRPOTO_NOP or IPROTO_CONFIRM. >>        */ >>       if (stmt->space == NULL || !space_is_temporary(stmt->space)) { >>           if (txn_add_redo(txn, stmt, request) != 0) >> @@ -417,12 +425,12 @@ txn_run_rollback_triggers(struct txn *txn, >> struct rlist *triggers) >>   /** >>    * Complete transaction processing. >>    */ >> -static void >> +void >>   txn_complete(struct txn *txn) >>   { >>       /* >>        * Note, engine can be NULL if transaction contains >> -     * IPROTO_NOP statements only. >> +     * IPROTO_NOP or IPROTO_CONFIRM statements. >>        */ >>       if (txn->signature < 0) { >>           /* Undo the transaction. */ >> @@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn) >>       struct xrow_header **remote_row = req->rows; >>       struct xrow_header **local_row = req->rows + txn->n_applier_rows; >>       bool is_sync = false; >> -    /* >> -     * Only local transactions, originated from the master, >> -     * can enter 'waiting for acks' state. It means, only >> -     * author of the transaction can collect acks. Replicas >> -     * consider it a normal async transaction so far. >> -     */ >> -    bool is_local = true; >>         stailq_foreach_entry(stmt, &txn->stmts, next) { >>           if (stmt->has_triggers) { >> @@ -530,17 +531,18 @@ txn_journal_entry_new(struct txn *txn) >>           if (stmt->row == NULL) >>               continue; >>   -        if (stmt->row->replica_id == 0) { >> +        if (stmt->row->replica_id == 0) >>               *local_row++ = stmt->row; >> -        } else { >> +        else >>               *remote_row++ = stmt->row; >> -            is_local = false; >> -        } >>             req->approx_len += xrow_approx_len(stmt->row); >>       } >> -    if (is_sync && is_local) >> + >> +    is_sync = is_sync && !txn_has_flag(txn, TXN_FORCE_ASYNC); >> +    if (is_sync) { >>           txn_set_flag(txn, TXN_WAIT_ACK); >> +    } >>         assert(remote_row == req->rows + txn->n_applier_rows); >>       assert(local_row == remote_row + txn->n_new_rows); >> @@ -601,6 +603,19 @@ txn_commit_nop(struct txn *txn) >>       return false; >>   } >>   +/* >> + * A trigger called on tx rollback due to a failed WAL write, >> + * when tx is waiting for confirmation. >> + */ >> +static int >> +txn_limbo_on_rollback(struct trigger *trig, void *data) >> +{ >> +    (void) trig; >> +    struct txn_limbo_entry *entry = (struct txn_limbo_entry *) data; >> +    txn_limbo_abort(&txn_limbo, entry); >> +    return 0; >> +} >> + >>   int >>   txn_commit_async(struct txn *txn) >>   { >> @@ -632,16 +647,42 @@ txn_commit_async(struct txn *txn) >>           return -1; >>       } >>   +    bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK); >> +    struct txn_limbo_entry *limbo_entry; >> +    if (is_sync) { >> +        /* See txn_commit(). */ >> +        uint32_t origin_id = req->rows[0]->replica_id; >> +        int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn; >> +        limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn); >> +        if (limbo_entry == NULL) { >> +            txn_rollback(txn); >> +            txn_free(txn); >> +            return -1; >> +        } >> +        assert(lsn > 0); >> +        txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn); >> +    } >> + >>       fiber_set_txn(fiber(), NULL); >>       if (journal_write_async(req) != 0) { >>           fiber_set_txn(fiber(), txn); >>           txn_rollback(txn); >> +        txn_limbo_abort(&txn_limbo, limbo_entry); >>             diag_set(ClientError, ER_WAL_IO); >>           diag_log(); >>           return -1; >>       } >>   +    /* >> +     * Set a trigger to abort waiting for confirm on WAL write >> +     * failure. >> +     */ >> +    if (is_sync) { >> +        struct trigger trig; >> +        trigger_create(&trig, txn_limbo_on_rollback, limbo_entry, >> NULL); >> +        txn_on_rollback(txn, &trig); >> +    } >>       return 0; >>   } >>   diff --git a/src/box/txn.h b/src/box/txn.h >> index 232cc07a8..e7705bb48 100644 >> --- a/src/box/txn.h >> +++ b/src/box/txn.h >> @@ -73,6 +73,13 @@ enum txn_flag { >>        * then finishes commit and returns success to a user. >>        */ >>       TXN_WAIT_ACK, >> +    /** >> +     * A transaction mustn't wait for confirmation, even if it >> +     * touches synchronous spaces. Needed for join stage on >> +     * replica, when all the data coming from the master is >> +     * already confirmed by design. >> +     */ >> +    TXN_FORCE_ASYNC, >>   }; >>     enum { >> @@ -257,6 +264,16 @@ txn_clear_flag(struct txn *txn, enum txn_flag flag) >>       txn->flags &= ~(1 << flag); >>   } >>   +/** >> + * Force async mode for transaction. It won't wait for acks >> + * or confirmation. >> + */ >> +static inline void >> +txn_force_async(struct txn *txn) >> +{ >> +    txn_set_flag(txn, TXN_FORCE_ASYNC); >> +} >> + >>   /* Pointer to the current transaction (if any) */ >>   static inline struct txn * >>   in_txn(void) >> @@ -278,6 +295,12 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn) >>   struct txn * >>   txn_begin(void); >>   +/** >> + * Complete transaction processing. >> + */ >> +void >> +txn_complete(struct txn *txn); >> + >>   /** >>    * Commit a transaction. >>    * @pre txn == in_txn() >> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c >> index efb97a591..daec98317 100644 >> --- a/src/box/txn_limbo.c >> +++ b/src/box/txn_limbo.c >> @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo >> *limbo, struct txn_limbo_entry *entry) >>           fiber_yield(); >>       fiber_set_cancellable(cancellable); >>       // TODO: implement rollback. >> -    // TODO: implement confirm. >>       assert(!entry->is_rollback); >> +    assert(entry->is_commit); >>       txn_limbo_remove(limbo, entry); >>       txn_clear_flag(txn, TXN_WAIT_ACK); >>   } >>   +/** >> + * Write a confirmation entry to WAL. After it's written all the >> + * transactions waiting for confirmation may be finished. >> + */ >> +static int >> +txn_limbo_write_confirm(struct txn_limbo *limbo, struct >> txn_limbo_entry *entry) >> +{ >> +    /* Prepare a confirm entry. */ >> +    struct xrow_header row = {0}; >> +    struct request request = {0}; >> +    request.header = &row; >> + >> +    row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, >> entry->lsn); >> +    if (row.bodycnt < 0) >> +        return -1; >> + >> +    struct txn *txn = txn_begin(); >> +    if (txn == NULL) >> +        return -1; >> + >> +    if (txn_begin_stmt(txn, NULL) != 0) >> +        goto rollback; >> +    if (txn_commit_stmt(txn, &request) != 0) >> +        goto rollback; >> + >> +    return txn_commit(txn); >> +rollback: >> +    txn_rollback(txn); >> +    return -1; >> +} >> + >> +void >> +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) >> +{ >> +    assert(limbo->instance_id != REPLICA_ID_NIL && >> +           limbo->instance_id != instance_id); >> +    struct txn_limbo_entry *e, *tmp; >> +    rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) { >> +        if (e->lsn > lsn) >> +            break; >> +        assert(e->txn->fiber == NULL); >> +        e->is_commit = true; >> +        txn_limbo_remove(limbo, e); >> +        txn_clear_flag(e->txn, TXN_WAIT_ACK); >> +        /* >> +         * txn_complete_async must've been called already, >> +         * since CONFIRM always follows the tx in question. >> +         * So, finish this tx processing right away. >> +         */ >> +        txn_complete(e->txn); >> +    } >> +} >> + >>   void >>   txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t >> lsn) >>   { >> @@ -143,23 +196,33 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t >> replica_id, int64_t lsn) >>       int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id); >>       vclock_follow(&limbo->vclock, replica_id, lsn); >>       struct txn_limbo_entry *e; >> +    struct txn_limbo_entry *last_quorum = NULL; >>       rlist_foreach_entry(e, &limbo->queue, in_queue) { >>           if (e->lsn <= prev_lsn) >>               continue; >>           if (e->lsn > lsn) >>               break; >>           if (++e->ack_count >= replication_sync_quorum) { >> -            // TODO: better call complete() right >> -            // here. Appliers use async transactions, >> -            // and their txns don't have fibers to >> -            // wake up. That becomes actual, when >> -            // appliers will be supposed to wait for >> -            // 'confirm' message. >>               e->is_commit = true; >> -            fiber_wakeup(e->txn->fiber); >> +            last_quorum = e; >>           } >>           assert(e->ack_count <= VCLOCK_MAX); >>       } >> +    if (last_quorum != NULL) { >> +        if (txn_limbo_write_confirm(limbo, last_quorum) != 0) { >> +            // TODO: rollback. >> +            return; >> +        } >> +        /* >> +         * Wakeup all the entries in direct order as soon >> +         * as confirmation message is written to WAL. >> +         */ >> +        rlist_foreach_entry(e, &limbo->queue, in_queue) { >> +            fiber_wakeup(e->txn->fiber); >> +            if (e == last_quorum) >> +                break; >> +        } >> +    } >>   } >>     void >> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h >> index 1ad1c567a..de415cd97 100644 >> --- a/src/box/txn_limbo.h >> +++ b/src/box/txn_limbo.h >> @@ -160,6 +160,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t >> replica_id, int64_t lsn); >>   void >>   txn_limbo_wait_complete(struct txn_limbo *limbo, struct >> txn_limbo_entry *entry); >>   +/** >> + * Confirm all the entries up to the given master's LSN. >> + */ >> +void >> +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn); >> + >>   void >>   txn_limbo_init(); >>   diff --git a/test/box/error.result b/test/box/error.result >> index 69c471085..34ded3930 100644 >> --- a/test/box/error.result >> +++ b/test/box/error.result >> @@ -433,6 +433,7 @@ t; >>    |   212: box.error.SEQUENCE_NOT_STARTED >>    |   213: box.error.NO_SUCH_SESSION_SETTING >>    |   214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS >> + |   215: box.error.SYNC_MASTER_MISMATCH >>    | ... >>     test_run:cmd("setopt delimiter ''"); >> -- Serge Petrenko