From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 44A4A6EC55; Fri, 7 May 2021 00:42:09 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 44A4A6EC55 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1620337329; bh=sbX/nP0p9jRrgAeZdzIEXstyYDlyrK+sW3QW8liWI6Y=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To:Cc: From; b=GM6WOiSvB+vpuQR3HoRopqrUxm7FPZAMVgfaVsdIIcY1X3DDtZNZf13A68DSvYKbj 5d0828JjwQu9wf18ZX2aZ4RYLKffpDLRgkl4i+r0R/7XDqXcHb3s+mwl4Qwu2gBMuO 3kDsYbyDVjSZzF1yeezJKHDx7GdvzyRXuPA1ikd4= Received: from mail-lf1-f50.google.com (mail-lf1-f50.google.com [209.85.167.50]) (using TLSv1.3 with cipher TLS_AES_128_GCM_SHA256 (128/128 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 6616B6EC55 for ; Fri, 7 May 2021 00:41:50 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 6616B6EC55 Received: by mail-lf1-f50.google.com with SMTP id x20so9909746lfu.6 for ; Thu, 06 May 2021 14:41:50 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=dVE7/aUiJ5HNnCgcuxgq46CZUHFrtPloOucb6dltZHM=; b=bL42I3aWYZWJNq8sAmQERKtqtAUO1SP/Q9HfxiuoUKTpBwDO0AlgN8U08pkHubXgnc 3Jdn7XAPwUxZ6CUnl53DgTdAoH8QcCmvZwoyvj9wUvgDvv1pXrRGGi1K1oCwEeiWUEUZ G8UXRnESZBLFMIAoEyDUf9Tp+GszLqjGqITkDA5otkOUGg/29zJoFMOEBd4/Mr0i6D2R NFhr+V0H4EKfisDdonl+NCde1y4YFmhMzolXueJahaJfofHRBGiLVo4E7MSIkDYq+z01 Z7k0trB/qw0uYI6Wo6qfHRtHWlbL2xkxxp9Plt7WQWFPXl05GNjRIfgtyHk9pdwdICK/ R6Ag== X-Gm-Message-State: AOAM530uq1pODRQWheRS2mDxI3uC0408b7blL1vRcq8SCH6vn5yK2/4/ Mch0pxqhzwv04AWqCoW5e/9IF0goVDQ= X-Google-Smtp-Source: ABdhPJxqSFIgIo5NXjq3u9w+ljc6xJbW1wCw/AeDYgpHyKaxj/XU/sgq4nfS/Kzpm6yuI3DzylttlQ== X-Received: by 2002:a19:7417:: with SMTP id v23mr4532236lfe.323.1620337309380; Thu, 06 May 2021 14:41:49 -0700 (PDT) Received: from grain.localdomain ([5.18.171.94]) by smtp.gmail.com with ESMTPSA id y7sm955224lfb.62.2021.05.06.14.41.48 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Thu, 06 May 2021 14:41:48 -0700 (PDT) Received: by grain.localdomain (Postfix, from userid 1000) id E8C1D560093; Fri, 7 May 2021 00:41:33 +0300 (MSK) To: tml Date: Fri, 7 May 2021 00:41:31 +0300 Message-Id: <20210506214132.533913-2-gorcunov@gmail.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20210506214132.533913-1-gorcunov@gmail.com> References: <20210506214132.533913-1-gorcunov@gmail.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v4 1/2] applier: send transaction's first row WAL time in the applier_writer_f X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Cyrill Gorcunov via Tarantool-patches Reply-To: Cyrill Gorcunov Cc: Vladislav Shpilevoy Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Applier fiber sends current vclock of the node to remote relay reader, pointing current state of fetched WAL data so the relay will know which new data should be sent. The packet applier sends carries xrow_header::tm field as a zero but we can reuse it to provide information about first timestamp in a transaction we wrote to our WAL. Since old instances of Tarantool simply ignore this field such extension won't cause any problems. The timestamp will be needed to account lag of downstream replicas suitable for information purpose and cluster health monitoring. We update applier statistics in WAL callbacks but since both apply_synchro_row and apply_plain_tx are used not only in real data application but in final join stage as well (in this stage we're not yet writing the data) the apply_synchro_row is extended with replica_id argument which is non zero when applier is subscribed. The calculation of the downstream lag itself lag will be addressed in next patch because sending the timestamp and its observation are independent actions. Main structure which manages timestamp handling is called `applier_wal_stat` because this is a rather remote WAL statistics and we might need to extend some fields in future. Also since we try to minimize branching we carry it inside replica structure and read/write procedures are called as wal_stat_ack/wal_stat_update. Special handling is needed due to WAL batch processing: several transactions might complete before the ACK packet is sent back to the replica. Part-of #5447 Signed-off-by: Cyrill Gorcunov --- src/box/applier.cc | 92 ++++++++++++++++++++++++++++++++++++------ src/box/applier.h | 14 +++++++ src/box/replication.cc | 3 ++ src/box/replication.h | 4 ++ 4 files changed, 100 insertions(+), 13 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 33181fdbf..3038ce05f 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -148,6 +148,37 @@ applier_check_sync(struct applier *applier) } } +/** + * Update WAL statstics once transaction is written. + */ +static void +wal_stat_update(struct applier_wal_stat *wal_st) +{ + struct replica *r = replica_by_id(wal_st->replica_id); + if (likely(r != NULL)) { + assert(wal_st->replica_id == r->wal_st.replica_id); + if (r->wal_st.first_row_tm == 0) + r->wal_st.first_row_tm = wal_st->first_row_tm; + } +} + +/** + * Encode timestamp for ACK sending. We drop the value on purpose + * because it is one time action and if transactions are written + * in a batch we need to account the oldest timestamp to show real + * relay lag. + */ +static void +wal_stat_ack(uint32_t replica_id, struct xrow_header *xrow) +{ + struct replica *r = replica_by_id(replica_id); + if (likely(r != NULL)) { + assert(replica_id == r->wal_st.replica_id); + xrow->tm = r->wal_st.first_row_tm; + r->wal_st.first_row_tm = 0; + } +} + /* * Fiber function to write vclock to replication master. * To track connection status, replica answers master @@ -193,6 +224,7 @@ applier_writer_f(va_list ap) applier->has_acks_to_send = false; struct xrow_header xrow; xrow_encode_vclock(&xrow, &replicaset.vclock); + wal_stat_ack(applier->instance_id, &xrow); coio_write_xrow(&io, &xrow); ERROR_INJECT(ERRINJ_APPLIER_SLOW_ACK, { fiber_sleep(0.01); @@ -490,7 +522,7 @@ static uint64_t applier_read_tx(struct applier *applier, struct stailq *rows, double timeout); static int -apply_final_join_tx(struct stailq *rows); +apply_final_join_tx(uint32_t replica_id, struct stailq *rows); /** * A helper struct to link xrow objects in a list. @@ -535,7 +567,7 @@ applier_wait_register(struct applier *applier, uint64_t row_count) next)->row); break; } - if (apply_final_join_tx(&rows) != 0) + if (apply_final_join_tx(applier->instance_id, &rows) != 0) diag_raise(); } @@ -756,6 +788,11 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event) { (void) trigger; (void) event; + + struct applier_wal_stat *wal_st = + (struct applier_wal_stat *)trigger->data; + wal_stat_update(wal_st); + /* Broadcast the WAL write across all appliers. */ trigger_run(&replicaset.applier.on_wal_write, NULL); return 0; @@ -766,6 +803,8 @@ struct synchro_entry { struct synchro_request *req; /** Fiber created the entry. To wakeup when WAL write is done. */ struct fiber *owner; + /** WAL bound statistics. */ + struct applier_wal_stat *wal_st; /** * The base journal entry. It has unsized array and then must be the * last entry in the structure. But can workaround it via a union @@ -789,6 +828,8 @@ apply_synchro_row_cb(struct journal_entry *entry) if (entry->res < 0) { applier_rollback_by_wal_io(); } else { + if (synchro_entry->wal_st != NULL) + wal_stat_update(synchro_entry->wal_st); txn_limbo_process(&txn_limbo, synchro_entry->req); trigger_run(&replicaset.applier.on_wal_write, NULL); } @@ -797,7 +838,7 @@ apply_synchro_row_cb(struct journal_entry *entry) /** Process a synchro request. */ static int -apply_synchro_row(struct xrow_header *row) +apply_synchro_row(uint32_t replica_id, struct xrow_header *row) { assert(iproto_type_is_synchro_request(row->type)); @@ -805,6 +846,7 @@ apply_synchro_row(struct xrow_header *row) if (xrow_decode_synchro(row, &req) != 0) goto err; + struct applier_wal_stat wal_st; struct synchro_entry entry; /* * Rows array is cast from *[] to **, because otherwise g++ complains @@ -817,6 +859,15 @@ apply_synchro_row(struct xrow_header *row) apply_synchro_row_cb, &entry); entry.req = &req; entry.owner = fiber(); + + if (replica_id != 0) { + wal_st.replica_id = replica_id; + wal_st.first_row_tm = row->tm; + entry.wal_st = &wal_st; + } else { + entry.wal_st = NULL; + } + /* * The WAL write is blocking. Otherwise it might happen that a CONFIRM * or ROLLBACK is sent to WAL, and it would empty the limbo, but before @@ -862,8 +913,9 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row) return box_raft_process(&req, applier->instance_id); } -static inline int -apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) +static int +apply_plain_tx(uint32_t replica_id, struct stailq *rows, + bool skip_conflict, bool use_triggers) { /* * Explicitly begin the transaction so that we can @@ -931,10 +983,21 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) goto fail; } + struct applier_wal_stat *wal_st; + wal_st = region_alloc_object(&txn->region, typeof(*wal_st), &size); + if (wal_st == NULL) { + diag_set(OutOfMemory, size, "region_alloc_object", "wal_st"); + goto fail; + } + trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL); txn_on_rollback(txn, on_rollback); - trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL); + item = stailq_first_entry(rows, struct applier_tx_row, next); + wal_st->replica_id = replica_id; + wal_st->first_row_tm = item->row.tm; + + trigger_create(on_wal_write, applier_txn_wal_write_cb, wal_st, NULL); txn_on_wal_write(txn, on_wal_write); } @@ -946,7 +1009,7 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) /** A simpler version of applier_apply_tx() for final join stage. */ static int -apply_final_join_tx(struct stailq *rows) +apply_final_join_tx(uint32_t replica_id, struct stailq *rows) { struct xrow_header *first_row = &stailq_first_entry(rows, struct applier_tx_row, next)->row; @@ -957,9 +1020,9 @@ apply_final_join_tx(struct stailq *rows) vclock_follow_xrow(&replicaset.vclock, last_row); if (unlikely(iproto_type_is_synchro_request(first_row->type))) { assert(first_row == last_row); - rc = apply_synchro_row(first_row); + rc = apply_synchro_row(replica_id, first_row); } else { - rc = apply_plain_tx(rows, false, false); + rc = apply_plain_tx(replica_id, rows, false, false); } fiber_gc(); return rc; @@ -1088,11 +1151,14 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) * each other. */ assert(first_row == last_row); - if ((rc = apply_synchro_row(first_row)) != 0) + rc = apply_synchro_row(applier->instance_id, first_row); + if (rc != 0) + goto finish; + } else { + rc = apply_plain_tx(applier->instance_id, rows, + replication_skip_conflict, true); + if (rc != 0) goto finish; - } else if ((rc = apply_plain_tx(rows, replication_skip_conflict, - true)) != 0) { - goto finish; } vclock_follow(&replicaset.applier.vclock, last_row->replica_id, last_row->lsn); diff --git a/src/box/applier.h b/src/box/applier.h index 15ca1fcfd..c0352894e 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -133,6 +133,20 @@ struct applier { struct vclock remote_vclock_at_subscribe; }; +/** + * WAL related statistics. + */ +struct applier_wal_stat { + /** Replica ID initiated the transaction. */ + uint32_t replica_id; + /** + * Timestamp of a first row in a transaction + * batch which is not yet ACK'ed. For relay + * statistics. + */ + double first_row_tm; +}; + /** * Start a client to a remote master using a background fiber. * diff --git a/src/box/replication.cc b/src/box/replication.cc index aefb812b3..157944f05 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -184,6 +184,7 @@ replica_new(void) trigger_create(&replica->on_applier_state, replica_on_applier_state_f, NULL, NULL); replica->applier_sync_state = APPLIER_DISCONNECTED; + memset(&replica->wal_st, 0, sizeof(replica->wal_st)); latch_create(&replica->order_latch); return replica; } @@ -234,6 +235,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id) assert(replica_id < VCLOCK_MAX); assert(replica->id == REPLICA_ID_NIL); /* replica id is read-only */ replica->id = replica_id; + replica->wal_st.replica_id = replica_id; if (tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)) { /* Assign local replica id */ @@ -281,6 +283,7 @@ replica_clear_id(struct replica *replica) instance_id = REPLICA_ID_NIL; } replica->id = REPLICA_ID_NIL; + replica->wal_st.replica_id = REPLICA_ID_NIL; say_info("removed replica %s", tt_uuid_str(&replica->uuid)); /* diff --git a/src/box/replication.h b/src/box/replication.h index 2ad1cbf66..a40582cd3 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -331,6 +331,10 @@ struct replica { * separate from applier. */ enum applier_state applier_sync_state; + /** + * Applier's WAL related statistics. + */ + struct applier_wal_stat wal_st; /* The latch is used to order replication requests. */ struct latch order_latch; }; -- 2.30.2