[Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries

Serge Petrenko sergepetrenko at tarantool.org
Mon Jun 22 13:34:25 MSK 2020


20.06.2020 18:06, Leonid Vasiliev пишет:
> Hi! Thank you for the patch.
> LGTM.
> All the following comments can be skipped silently.
>
> On 09.06.2020 15:20, Serge Petrenko wrote:
>> Make txn_limbo write a CONFIRM entry as soon as a batch of entries
>> receive their acks. CONFIRM entry is written to WAL and later replicated
>> to all the replicas.
>>
>> Now replicas put synchronous transactions into txn_limbo and wait for
>> corresponding confirmation entries to arrive and end up in their WAL
>> before committing the transactions.
>>
>> Part-of #4847
>> ---
>>   src/box/applier.cc    | 81 ++++++++++++++++++++++++++++++++++++++++++-
>>   src/box/box.cc        |  3 ++
>>   src/box/errcode.h     |  1 +
>>   src/box/relay.cc      | 13 ++++---
>>   src/box/txn.c         | 75 ++++++++++++++++++++++++++++++---------
>>   src/box/txn.h         | 23 ++++++++++++
>>   src/box/txn_limbo.c   | 79 ++++++++++++++++++++++++++++++++++++-----
>>   src/box/txn_limbo.h   |  6 ++++
>>   test/box/error.result |  1 +
>>   9 files changed, 252 insertions(+), 30 deletions(-)
>>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index df48b4796..1dc977424 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -51,6 +51,7 @@
>>   #include "txn.h"
>>   #include "box.h"
>>   #include "scoped_guard.h"
>> +#include "txn_limbo.h"
>>     STRS(applier_state, applier_STATE);
>>   @@ -214,6 +215,11 @@ apply_snapshot_row(struct xrow_header *row)
>>       struct txn *txn = txn_begin();
>>       if (txn == NULL)
>>           return -1;
>> +    /*
>> +     * Do not wait for confirmation when fetching a snapshot.
>> +     * Master only sends confirmed rows during join.
>> +     */
>> +    txn_force_async(txn);
>>       if (txn_begin_stmt(txn, space) != 0)
>>           goto rollback;
>>       /* no access checks here - applier always works with admin 
>> privs */
>> @@ -249,10 +255,73 @@ process_nop(struct request *request)
>>       return txn_commit_stmt(txn, request);
>>   }
>>   +/*
>> + * An on_commit trigger set on a txn containing a CONFIRM entry.
>> + * Confirms some of the txs waiting in txn_limbo.
>
> In txn.h "txns" notation is used (up to you).
Thanks! Fixed.
>
>> + */
>> +static int
>> +applier_on_confirm(struct trigger *trig, void *data)
>> +{
>> +    (void) trig;
>> +    int64_t lsn = *(int64_t *)data;
>> +    txn_limbo_read_confirm(&txn_limbo, lsn);
>> +    return 0;
>> +}
>> +
>> +static int
>> +process_confirm(struct request *request)
>> +{
>> +    assert(request->header->type = IPROTO_CONFIRM);
>> +    uint32_t replica_id;
>> +    struct txn *txn = in_txn();
>> +    int64_t *lsn = (int64_t *) region_alloc(&txn->region, 
>> sizeof(int64_t));
>> +    if (lsn == NULL) {
>> +        diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn");
>> +        return -1;
>> +    }
>> +    if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0)
>> +        return -1;
>> +    /*
>> +     * on_commit trigger failure is not allowed, so check for
>> +     * instance id early.
>> +     */
>> +    if (replica_id != txn_limbo.instance_id) {
>> +        diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
>> +             txn_limbo.instance_id);
>> +        return -1;
>> +    }
>> +
>> +    /*
>> +     * Set an on_commit trigger which will perform the actual
>> +     * confirmation processing.
>> +     */
>> +    struct trigger *trig = (struct trigger *)region_alloc(&txn->region,
>> +                                  sizeof(*trig));
>> +    if (trig == NULL) {
>> +        diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig");
>> +        return -1;
>> +    }
>> +    trigger_create(trig, applier_on_confirm, lsn, NULL);
>> +
>> +    if (txn_begin_stmt(txn, NULL) != 0)
>> +        return -1;
>> +
>> +    if (txn_commit_stmt(txn, request) == 0) {
>> +        txn_on_commit(txn, trig);
>> +        return 0;
>> +    } else {
>> +        return -1;
>> +    }
>
> Is I understood corectly that this is a trick like in the process_nop()
> to promote vclock and ...? Maybe, add a coment?

Yes, it's somewhat similar to process_nop. The difference is that unlike

everything else that applier receives, CONFIRM and ROLLBACK aren't dml

requests, and cannot be processed in the same manner. They require some

txn_limbo operations tied to their processing.  Actually, I've just 
invented a

comment you ask for. Added:

@@ -268,6 +268,12 @@ applier_on_confirm(struct trigger *trig, void *data)
         return 0;
  }

+/*
+ * CONFIRM entries aren't dml requests and require special
+ * handling: they're written to WAL right away, without any space
+ * operations, and, once they're committed, txn_limbo performs the
+ * actions dictated by the received CONFIRM entry.
+ */
  static int
  process_confirm(struct request *request)
  {

>
>> +}
>> +
>>   static int
>>   apply_row(struct xrow_header *row)
>>   {
>>       struct request request;
>> +    if (row->type == IPROTO_CONFIRM) {
>> +        request.header = row;
>> +        return process_confirm(&request);
>> +    }
>>       if (xrow_decode_dml(row, &request, 
>> dml_request_key_map(row->type)) != 0)
>>           return -1;
>>       if (request.type == IPROTO_NOP)
>> @@ -273,6 +342,11 @@ apply_final_join_row(struct xrow_header *row)
>>       struct txn *txn = txn_begin();
>>       if (txn == NULL)
>>           return -1;
>> +    /*
>> +     * Do not wait for confirmation while processing final
>> +     * join rows. See apply_snapshot_row().
>> +     */
>> +    txn_force_async(txn);
>>       if (apply_row(row) != 0) {
>>           txn_rollback(txn);
>>           fiber_gc();
>> @@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, 
>> uint64_t row_count)
>>           applier->last_row_time = ev_monotonic_now(loop());
>>           if (iproto_type_is_dml(row.type)) {
>> vclock_follow_xrow(&replicaset.vclock, &row);
>> -            if (apply_final_join_row(&row) != 0)
>> +            /*
>> +             * Confirms are ignored during join. All the
>> +             * data master sends us is valid.
>> +             */
>> +            if (row.type != IPROTO_CONFIRM &&
>> +                apply_final_join_row(&row) != 0)
>>                   diag_raise();
>>               if (++row_count % 100000 == 0)
>>                   say_info("%.1fM rows received", row_count / 1e6);
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index 64ac89975..792c3c394 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -342,6 +342,9 @@ static void
>>   apply_wal_row(struct xstream *stream, struct xrow_header *row)
>>   {
>>       struct request request;
>> +    // TODO: process confirmation during recovery.
>> +    if (row->type == IPROTO_CONFIRM)
>> +        return;
>>       xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
>>       if (request.type != IPROTO_NOP) {
>>           struct space *space = space_cache_find_xc(request.space_id);
>> diff --git a/src/box/errcode.h b/src/box/errcode.h
>> index 019c582af..3ba6866e5 100644
>> --- a/src/box/errcode.h
>> +++ b/src/box/errcode.h
>> @@ -267,6 +267,7 @@ struct errcode_record {
>>       /*212 */_(ER_SEQUENCE_NOT_STARTED, "Sequence '%s' is not 
>> started") \
>>       /*213 */_(ER_NO_SUCH_SESSION_SETTING,    "Session setting %s 
>> doesn't exist") \
>>       /*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted 
>> sync transactions from other instance with id %u") \
>> +    /*215 */_(ER_SYNC_MASTER_MISMATCH,    "CONFIRM message arrived 
>> for an unknown master id %d, expected %d") \
>>     /*
>>    * !IMPORTANT! Please follow instructions at start of the file
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index 333e91ea9..4df3c2f26 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg)
>>       vclock_copy(&status->relay->tx.vclock, &status->vclock);
>>       /*
>>        * Let pending synchronous transactions know, which of
>> -     * them were successfully sent to the replica.
>> +     * them were successfully sent to the replica. Acks are
>> +     * collected only on the master. Other instances wait for
>> +     * master's CONFIRM message instead.
>>        */
>> -    txn_limbo_ack(&txn_limbo, status->relay->replica->id,
>> -             vclock_get(&status->vclock, instance_id));
>> +    if (txn_limbo.instance_id == instance_id) {
>> +        txn_limbo_ack(&txn_limbo, status->relay->replica->id,
>> +                  vclock_get(&status->vclock, instance_id));
>> +    }
>>       static const struct cmsg_hop route[] = {
>>           {relay_status_update, NULL}
>>       };
>> @@ -766,7 +770,8 @@ static void
>>   relay_send_row(struct xstream *stream, struct xrow_header *packet)
>>   {
>>       struct relay *relay = container_of(stream, struct relay, stream);
>> -    assert(iproto_type_is_dml(packet->type));
>> +    assert(iproto_type_is_dml(packet->type) ||
>> +           packet->type == IPROTO_CONFIRM);
>>       if (packet->group_id == GROUP_LOCAL) {
>>           /*
>>            * We do not relay replica-local rows to other
>> diff --git a/src/box/txn.c b/src/box/txn.c
>> index a65100b31..3b331fecc 100644
>> --- a/src/box/txn.c
>> +++ b/src/box/txn.c
>> @@ -36,6 +36,7 @@
>>   #include <fiber.h>
>>   #include "xrow.h"
>>   #include "errinj.h"
>> +#include "iproto_constants.h"
>>     double too_long_threshold;
>>   @@ -81,7 +82,12 @@ txn_add_redo(struct txn *txn, struct txn_stmt 
>> *stmt, struct request *request)
>>        */
>>       struct space *space = stmt->space;
>>       row->group_id = space != NULL ? space_group_id(space) : 0;
>> -    row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
>> +    /*
>> +     * IPROTO_CONFIRM entries are supplementary and aren't
>> +     * valid dml requests. They're encoded manually.
>> +     */
>> +    if (likely(row->type != IPROTO_CONFIRM))
>> +        row->bodycnt = xrow_encode_dml(request, &txn->region, 
>> row->body);
>>       if (row->bodycnt < 0)
>>           return -1;
>>       stmt->row = row;
>> @@ -321,8 +327,10 @@ txn_commit_stmt(struct txn *txn, struct request 
>> *request)
>>        */
>>       struct txn_stmt *stmt = txn_current_stmt(txn);
>>   -    /* Create WAL record for the write requests in non-temporary 
>> spaces.
>> -     * stmt->space can be NULL for IRPOTO_NOP.
>> +    /*
>> +     * Create WAL record for the write requests in
>> +     * non-temporary spaces. stmt->space can be NULL for
>> +     * IRPOTO_NOP or IPROTO_CONFIRM.
>>        */
>>       if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
>>           if (txn_add_redo(txn, stmt, request) != 0)
>> @@ -417,12 +425,12 @@ txn_run_rollback_triggers(struct txn *txn, 
>> struct rlist *triggers)
>>   /**
>>    * Complete transaction processing.
>>    */
>> -static void
>> +void
>>   txn_complete(struct txn *txn)
>>   {
>>       /*
>>        * Note, engine can be NULL if transaction contains
>> -     * IPROTO_NOP statements only.
>> +     * IPROTO_NOP or IPROTO_CONFIRM statements.
>>        */
>>       if (txn->signature < 0) {
>>           /* Undo the transaction. */
>> @@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn)
>>       struct xrow_header **remote_row = req->rows;
>>       struct xrow_header **local_row = req->rows + txn->n_applier_rows;
>>       bool is_sync = false;
>> -    /*
>> -     * Only local transactions, originated from the master,
>> -     * can enter 'waiting for acks' state. It means, only
>> -     * author of the transaction can collect acks. Replicas
>> -     * consider it a normal async transaction so far.
>> -     */
>> -    bool is_local = true;
>>         stailq_foreach_entry(stmt, &txn->stmts, next) {
>>           if (stmt->has_triggers) {
>> @@ -530,17 +531,18 @@ txn_journal_entry_new(struct txn *txn)
>>           if (stmt->row == NULL)
>>               continue;
>>   -        if (stmt->row->replica_id == 0) {
>> +        if (stmt->row->replica_id == 0)
>>               *local_row++ = stmt->row;
>> -        } else {
>> +        else
>>               *remote_row++ = stmt->row;
>> -            is_local = false;
>> -        }
>>             req->approx_len += xrow_approx_len(stmt->row);
>>       }
>> -    if (is_sync && is_local)
>> +
>> +    is_sync = is_sync && !txn_has_flag(txn, TXN_FORCE_ASYNC);
>> +    if (is_sync) {
>>           txn_set_flag(txn, TXN_WAIT_ACK);
>> +    }
>>         assert(remote_row == req->rows + txn->n_applier_rows);
>>       assert(local_row == remote_row + txn->n_new_rows);
>> @@ -601,6 +603,19 @@ txn_commit_nop(struct txn *txn)
>>       return false;
>>   }
>>   +/*
>> + * A trigger called on tx rollback due to a failed WAL write,
>> + * when tx is waiting for confirmation.
>> + */
>> +static int
>> +txn_limbo_on_rollback(struct trigger *trig, void *data)
>> +{
>> +    (void) trig;
>> +    struct txn_limbo_entry *entry = (struct txn_limbo_entry *) data;
>> +    txn_limbo_abort(&txn_limbo, entry);
>> +    return 0;
>> +}
>> +
>>   int
>>   txn_commit_async(struct txn *txn)
>>   {
>> @@ -632,16 +647,42 @@ txn_commit_async(struct txn *txn)
>>           return -1;
>>       }
>>   +    bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
>> +    struct txn_limbo_entry *limbo_entry;
>> +    if (is_sync) {
>> +        /* See txn_commit(). */
>> +        uint32_t origin_id = req->rows[0]->replica_id;
>> +        int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn;
>> +        limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
>> +        if (limbo_entry == NULL) {
>> +            txn_rollback(txn);
>> +            txn_free(txn);
>> +            return -1;
>> +        }
>> +        assert(lsn > 0);
>> +        txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
>> +    }
>> +
>>       fiber_set_txn(fiber(), NULL);
>>       if (journal_write_async(req) != 0) {
>>           fiber_set_txn(fiber(), txn);
>>           txn_rollback(txn);
>> +        txn_limbo_abort(&txn_limbo, limbo_entry);
>>             diag_set(ClientError, ER_WAL_IO);
>>           diag_log();
>>           return -1;
>>       }
>>   +    /*
>> +     * Set a trigger to abort waiting for confirm on WAL write
>> +     * failure.
>> +     */
>> +    if (is_sync) {
>> +        struct trigger trig;
>> +        trigger_create(&trig, txn_limbo_on_rollback, limbo_entry, 
>> NULL);
>> +        txn_on_rollback(txn, &trig);
>> +    }
>>       return 0;
>>   }
>>   diff --git a/src/box/txn.h b/src/box/txn.h
>> index 232cc07a8..e7705bb48 100644
>> --- a/src/box/txn.h
>> +++ b/src/box/txn.h
>> @@ -73,6 +73,13 @@ enum txn_flag {
>>        * then finishes commit and returns success to a user.
>>        */
>>       TXN_WAIT_ACK,
>> +    /**
>> +     * A transaction mustn't wait for confirmation, even if it
>> +     * touches synchronous spaces. Needed for join stage on
>> +     * replica, when all the data coming from the master is
>> +     * already confirmed by design.
>> +     */
>> +    TXN_FORCE_ASYNC,
>>   };
>>     enum {
>> @@ -257,6 +264,16 @@ txn_clear_flag(struct txn *txn, enum txn_flag flag)
>>       txn->flags &= ~(1 << flag);
>>   }
>>   +/**
>> + * Force async mode for transaction. It won't wait for acks
>> + * or confirmation.
>> + */
>> +static inline void
>> +txn_force_async(struct txn *txn)
>> +{
>> +    txn_set_flag(txn, TXN_FORCE_ASYNC);
>> +}
>> +
>>   /* Pointer to the current transaction (if any) */
>>   static inline struct txn *
>>   in_txn(void)
>> @@ -278,6 +295,12 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
>>   struct txn *
>>   txn_begin(void);
>>   +/**
>> + * Complete transaction processing.
>> + */
>> +void
>> +txn_complete(struct txn *txn);
>> +
>>   /**
>>    * Commit a transaction.
>>    * @pre txn == in_txn()
>> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
>> index efb97a591..daec98317 100644
>> --- a/src/box/txn_limbo.c
>> +++ b/src/box/txn_limbo.c
>> @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo 
>> *limbo, struct txn_limbo_entry *entry)
>>           fiber_yield();
>>       fiber_set_cancellable(cancellable);
>>       // TODO: implement rollback.
>> -    // TODO: implement confirm.
>>       assert(!entry->is_rollback);
>> +    assert(entry->is_commit);
>>       txn_limbo_remove(limbo, entry);
>>       txn_clear_flag(txn, TXN_WAIT_ACK);
>>   }
>>   +/**
>> + * Write a confirmation entry to WAL. After it's written all the
>> + * transactions waiting for confirmation may be finished.
>> + */
>> +static int
>> +txn_limbo_write_confirm(struct txn_limbo *limbo, struct 
>> txn_limbo_entry *entry)
>> +{
>> +    /* Prepare a confirm entry. */
>> +    struct xrow_header row = {0};
>> +    struct request request = {0};
>> +    request.header = &row;
>> +
>> +    row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, 
>> entry->lsn);
>> +    if (row.bodycnt < 0)
>> +        return -1;
>> +
>> +    struct txn *txn = txn_begin();
>> +    if (txn == NULL)
>> +        return -1;
>> +
>> +    if (txn_begin_stmt(txn, NULL) != 0)
>> +        goto rollback;
>> +    if (txn_commit_stmt(txn, &request) != 0)
>> +        goto rollback;
>> +
>> +    return txn_commit(txn);
>> +rollback:
>> +    txn_rollback(txn);
>> +    return -1;
>> +}
>> +
>> +void
>> +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
>> +{
>> +    assert(limbo->instance_id != REPLICA_ID_NIL &&
>> +           limbo->instance_id != instance_id);
>> +    struct txn_limbo_entry *e, *tmp;
>> +    rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
>> +        if (e->lsn > lsn)
>> +            break;
>> +        assert(e->txn->fiber == NULL);
>> +        e->is_commit = true;
>> +        txn_limbo_remove(limbo, e);
>> +        txn_clear_flag(e->txn, TXN_WAIT_ACK);
>> +        /*
>> +         * txn_complete_async must've been called already,
>> +         * since CONFIRM always follows the tx in question.
>> +         * So, finish this tx processing right away.
>> +         */
>> +        txn_complete(e->txn);
>> +    }
>> +}
>> +
>>   void
>>   txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t 
>> lsn)
>>   {
>> @@ -143,23 +196,33 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t 
>> replica_id, int64_t lsn)
>>       int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
>>       vclock_follow(&limbo->vclock, replica_id, lsn);
>>       struct txn_limbo_entry *e;
>> +    struct txn_limbo_entry *last_quorum = NULL;
>>       rlist_foreach_entry(e, &limbo->queue, in_queue) {
>>           if (e->lsn <= prev_lsn)
>>               continue;
>>           if (e->lsn > lsn)
>>               break;
>>           if (++e->ack_count >= replication_sync_quorum) {
>> -            // TODO: better call complete() right
>> -            // here. Appliers use async transactions,
>> -            // and their txns don't have fibers to
>> -            // wake up. That becomes actual, when
>> -            // appliers will be supposed to wait for
>> -            // 'confirm' message.
>>               e->is_commit = true;
>> -            fiber_wakeup(e->txn->fiber);
>> +            last_quorum = e;
>>           }
>>           assert(e->ack_count <= VCLOCK_MAX);
>>       }
>> +    if (last_quorum != NULL) {
>> +        if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
>> +            // TODO: rollback.
>> +            return;
>> +        }
>> +        /*
>> +         * Wakeup all the entries in direct order as soon
>> +         * as confirmation message is written to WAL.
>> +         */
>> +        rlist_foreach_entry(e, &limbo->queue, in_queue) {
>> +            fiber_wakeup(e->txn->fiber);
>> +            if (e == last_quorum)
>> +                break;
>> +        }
>> +    }
>>   }
>>     void
>> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
>> index 1ad1c567a..de415cd97 100644
>> --- a/src/box/txn_limbo.h
>> +++ b/src/box/txn_limbo.h
>> @@ -160,6 +160,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t 
>> replica_id, int64_t lsn);
>>   void
>>   txn_limbo_wait_complete(struct txn_limbo *limbo, struct 
>> txn_limbo_entry *entry);
>>   +/**
>> + * Confirm all the entries up to the given master's LSN.
>> + */
>> +void
>> +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
>> +
>>   void
>>   txn_limbo_init();
>>   diff --git a/test/box/error.result b/test/box/error.result
>> index 69c471085..34ded3930 100644
>> --- a/test/box/error.result
>> +++ b/test/box/error.result
>> @@ -433,6 +433,7 @@ t;
>>    |   212: box.error.SEQUENCE_NOT_STARTED
>>    |   213: box.error.NO_SUCH_SESSION_SETTING
>>    |   214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS
>> + |   215: box.error.SYNC_MASTER_MISMATCH
>>    | ...
>>     test_run:cmd("setopt delimiter ''");
>>
-- 
Serge Petrenko



More information about the Tarantool-patches mailing list