From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id ECB1F6EC5F; Fri, 30 Apr 2021 18:40:45 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org ECB1F6EC5F DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1619797246; bh=tC61fzxXA9NSY7KiOBk8qGdHgZZQj/zERr5aHMGN2Kk=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To:Cc: From; b=dkrd5hADzynQLO/TIBo9Z1ni0luu1rl/r/TtZASq7MxRQnBxrOYNNf5KEOL/4AuV/ E42w3M+k7CNHOu4+Xp74XZb3TDScBF6jv4vEACmryoACG3vepYBDkNR5G6wdfKYl+f e6jg2oi6JBodXxI96N4B6wYIYmgzFdLnwHU4qxnM= Received: from mail-lj1-f170.google.com (mail-lj1-f170.google.com [209.85.208.170]) (using TLSv1.3 with cipher TLS_AES_128_GCM_SHA256 (128/128 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 3D79B6EC61 for ; Fri, 30 Apr 2021 18:40:08 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 3D79B6EC61 Received: by mail-lj1-f170.google.com with SMTP id v6so4560263ljj.5 for ; Fri, 30 Apr 2021 08:40:08 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=eC6SZDMocobsZCTFDY2n5kOjRg4F1V2dbAjNyfiuutw=; b=VYtDbxwavS8ldKxZvfRhxx7B5DtuX1AwnJXNnD8ke1EGx2PjXHohG/r5EJ6B+vtHCN vlz5NfMQm5BKkPqCRXcVWUqqwA/Hw78DBI/yq5rKg4qpYAeJUYy5UBdXLLWympUbkx72 wF/KuNzYYVly02mm6IcI+sdaCmHE2YG5BJ6d47pTV0jLBlqTSwGfXJi/rOPGgUyTP2ab PM3IHdl91uJElDrzs6Fc80DpyrbrsJbmfC+46nhXWQk+U7qr7vs0sOUx95kCEi04QLYH qlUCwn5biKIq1k2RE4CKBpJ01ubzVW9TCTA4zSr6KH286CXXopTgWZLbqxsrVIrA9ajR ORlw== X-Gm-Message-State: AOAM532WGM1o4B+MpJaz1RDRfFdrofZCRsoa724xcleJrJ3LrLR0qLbp bPfXltEz2T7lbVghrNYt1Sg= X-Google-Smtp-Source: ABdhPJzoopyXVyLyqWn+x6OUz8sg5PNIKy5ng3rf2sRRxv0Xr2CrgObk7rj9svwQAljc0fQTaNHi5w== X-Received: by 2002:a2e:7e0b:: with SMTP id z11mr4101575ljc.343.1619797207834; Fri, 30 Apr 2021 08:40:07 -0700 (PDT) Received: from grain.localdomain ([5.18.199.94]) by smtp.gmail.com with ESMTPSA id q17sm316527lfn.302.2021.04.30.08.40.06 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Fri, 30 Apr 2021 08:40:06 -0700 (PDT) Received: by grain.localdomain (Postfix, from userid 1000) id 72250560162; Fri, 30 Apr 2021 18:39:41 +0300 (MSK) To: tml Date: Fri, 30 Apr 2021 18:39:39 +0300 Message-Id: <20210430153940.121271-3-gorcunov@gmail.com> X-Mailer: git-send-email 2.30.2 In-Reply-To: <20210430153940.121271-1-gorcunov@gmail.com> References: <20210430153940.121271-1-gorcunov@gmail.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [RFC v3 2/3] applier: send first row's WAL time in the applier_writer_f X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Cyrill Gorcunov via Tarantool-patches Reply-To: Cyrill Gorcunov Cc: Mons Anderson , Vladislav Shpilevoy Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" This fibers sends current vclock of the node to remote relay reader. This packet carries xrow_header::tm field as a zero but we can reuse it to provide information about first timestamp in a transaction we wrote to our WAL. Since old instances of Tarantool simply ignore this field such extension won't cause any problems. This timestamp will be needed to account lags of downstream replicas suitable for information purpose and cluster health monitoring. We update applier statistics in WAL callbacks but since both apply_synchro_row and apply_plain_tx are used not only in real data application but in final join stage as well (in this stage we're not yet writing the data) the apply_synchro_row is extended with a flag pointing that applier update is needed. Same time the apply_plain_tx uses asynchronous WAL write completion and at moment when the write procedure is finished the applier might be removed from replicaset already thus we use applier's instance to lookup if it is still alive. The calculation of the downstream lag itself lag will be addressed in next patch because sending the timestamp and its observation are independent actions. Part-of #5447 Signed-off-by: Cyrill Gorcunov --- src/box/applier.cc | 84 ++++++++++++++++++++++++++++++++++++++-------- src/box/applier.h | 5 +++ 2 files changed, 75 insertions(+), 14 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 33181fdbf..626dc0324 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -192,7 +192,8 @@ applier_writer_f(va_list ap) try { applier->has_acks_to_send = false; struct xrow_header xrow; - xrow_encode_vclock(&xrow, &replicaset.vclock); + xrow_encode_vclock_timed(&xrow, &replicaset.vclock, + applier->first_row_wal_time); coio_write_xrow(&io, &xrow); ERROR_INJECT(ERRINJ_APPLIER_SLOW_ACK, { fiber_sleep(0.01); @@ -490,7 +491,7 @@ static uint64_t applier_read_tx(struct applier *applier, struct stailq *rows, double timeout); static int -apply_final_join_tx(struct stailq *rows); +apply_final_join_tx(struct applier *applier, struct stailq *rows); /** * A helper struct to link xrow objects in a list. @@ -535,7 +536,7 @@ applier_wait_register(struct applier *applier, uint64_t row_count) next)->row); break; } - if (apply_final_join_tx(&rows) != 0) + if (apply_final_join_tx(applier, &rows) != 0) diag_raise(); } @@ -751,11 +752,41 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event) return 0; } +/** Applier WAL related statistics */ +struct awstat { + uint32_t instance_id; + double first_row_tm; +}; + +static void +awstat_update(struct awstat *awstat) +{ + /* Ignore if not needed */ + if (awstat->instance_id == 0) + return; + + /* + * Write to WAL happens in two contexts: as + * synchronous writes and as asynchronous. In + * second case the applier might be already + * stopped and removed. + */ + struct replica *r = replica_by_id(awstat->instance_id); + if (r == NULL && r->applier == NULL) + return; + + r->applier->first_row_wal_time = awstat->first_row_tm; +} + static int applier_txn_wal_write_cb(struct trigger *trigger, void *event) { (void) trigger; (void) event; + + struct awstat *awstat = (struct awstat *)trigger->data; + awstat_update(awstat); + /* Broadcast the WAL write across all appliers. */ trigger_run(&replicaset.applier.on_wal_write, NULL); return 0; @@ -766,6 +797,8 @@ struct synchro_entry { struct synchro_request *req; /** Fiber created the entry. To wakeup when WAL write is done. */ struct fiber *owner; + /** WAL bound statistics. */ + struct awstat awstat; /** * The base journal entry. It has unsized array and then must be the * last entry in the structure. But can workaround it via a union @@ -789,6 +822,7 @@ apply_synchro_row_cb(struct journal_entry *entry) if (entry->res < 0) { applier_rollback_by_wal_io(); } else { + awstat_update(&synchro_entry->awstat); txn_limbo_process(&txn_limbo, synchro_entry->req); trigger_run(&replicaset.applier.on_wal_write, NULL); } @@ -797,7 +831,8 @@ apply_synchro_row_cb(struct journal_entry *entry) /** Process a synchro request. */ static int -apply_synchro_row(struct xrow_header *row) +apply_synchro_row(struct applier *applier, struct xrow_header *row, + bool use_awstat) { assert(iproto_type_is_synchro_request(row->type)); @@ -817,6 +852,12 @@ apply_synchro_row(struct xrow_header *row) apply_synchro_row_cb, &entry); entry.req = &req; entry.owner = fiber(); + if (use_awstat) { + entry.awstat.instance_id = applier->instance_id; + entry.awstat.first_row_tm = row->tm; + } else { + entry.awstat.instance_id = 0; + } /* * The WAL write is blocking. Otherwise it might happen that a CONFIRM * or ROLLBACK is sent to WAL, and it would empty the limbo, but before @@ -862,8 +903,9 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row) return box_raft_process(&req, applier->instance_id); } -static inline int -apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) +static int +apply_plain_tx(struct applier *applier, struct stailq *rows, + bool skip_conflict, bool use_triggers) { /* * Explicitly begin the transaction so that we can @@ -931,10 +973,21 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) goto fail; } + struct awstat *awstat; + awstat = region_alloc_object(&txn->region, typeof(*awstat), &size); + if (awstat == NULL) { + diag_set(OutOfMemory, size, "region_alloc_object", "awstat"); + goto fail; + } + trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL); txn_on_rollback(txn, on_rollback); - trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL); + item = stailq_first_entry(rows, struct applier_tx_row, next); + awstat->instance_id = applier->instance_id; + awstat->first_row_tm = item->row.tm; + + trigger_create(on_wal_write, applier_txn_wal_write_cb, awstat, NULL); txn_on_wal_write(txn, on_wal_write); } @@ -946,7 +999,7 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) /** A simpler version of applier_apply_tx() for final join stage. */ static int -apply_final_join_tx(struct stailq *rows) +apply_final_join_tx(struct applier *applier, struct stailq *rows) { struct xrow_header *first_row = &stailq_first_entry(rows, struct applier_tx_row, next)->row; @@ -957,9 +1010,9 @@ apply_final_join_tx(struct stailq *rows) vclock_follow_xrow(&replicaset.vclock, last_row); if (unlikely(iproto_type_is_synchro_request(first_row->type))) { assert(first_row == last_row); - rc = apply_synchro_row(first_row); + rc = apply_synchro_row(applier, first_row, false); } else { - rc = apply_plain_tx(rows, false, false); + rc = apply_plain_tx(applier, rows, false, false); } fiber_gc(); return rc; @@ -1088,11 +1141,14 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) * each other. */ assert(first_row == last_row); - if ((rc = apply_synchro_row(first_row)) != 0) + rc = apply_synchro_row(applier, first_row, true); + if (rc != 0) + goto finish; + } else { + rc = apply_plain_tx(applier, rows, + replication_skip_conflict, true); + if (rc != 0) goto finish; - } else if ((rc = apply_plain_tx(rows, replication_skip_conflict, - true)) != 0) { - goto finish; } vclock_follow(&replicaset.applier.vclock, last_row->replica_id, last_row->lsn); diff --git a/src/box/applier.h b/src/box/applier.h index 15ca1fcfd..bd98827e7 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -93,6 +93,11 @@ struct applier { ev_tstamp last_row_time; /** Number of seconds this replica is behind the remote master */ ev_tstamp lag; + /** + * WAL time of first applied row in a transaction. + * For relay statistics sake. + */ + double first_row_wal_time; /** The last box_error_code() logged to avoid log flooding */ uint32_t last_logged_errcode; /** Remote instance ID. */ -- 2.30.2