From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 34C4C6EC58; Tue, 22 Jun 2021 18:23:23 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 34C4C6EC58 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1624375403; bh=MkFtUmuAygJfh4bvcOBgab0GYEzq/YvqnLkcMybJNOU=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To:Cc: From; b=F3cX2eJXP28B/UMnbKjP0SYE99iqFTJyd/o3ixE4r1Zy59vbXMlMu2qEwEpngmtg5 DJMIDGg5EPar5Y+fiVEdU9FfksDeodnpMTlpyFHvF0uN2BxovWVD/fa6oFxryW+qtP BDmGexMjI9iu7pov+xVeOyspcb/KDN+Uwfwu6hEA= Received: from mail-lf1-f46.google.com (mail-lf1-f46.google.com [209.85.167.46]) (using TLSv1.3 with cipher TLS_AES_128_GCM_SHA256 (128/128 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 233376EC58 for ; Tue, 22 Jun 2021 18:23:03 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 233376EC58 Received: by mail-lf1-f46.google.com with SMTP id i18so2135929lfl.8 for ; Tue, 22 Jun 2021 08:23:03 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:from:to:cc:subject:date:message-id:in-reply-to :references:mime-version:content-transfer-encoding; bh=wKWy+JxCuaIeP0h10s0ii5wWEe28Pl4obyHFrLDn/r8=; b=tyC4Q+TiC8X1I3Uml3DApEq8XTS+vbKREMckPNQgfDNDpCKlWbc8DeE9NX1zl87jNb +5LS/OPfH57qkSuGIh1TDv68F2u0KKLnDI9k9QDQsn2a/sHognKqeBcefUBKjnL3uLNm zKTpp/su59PIwFpeZRTDtRYQ8mrb07qel8CiFIcVYh35dOupVBAiSiwVHn1y1Ql30Y0A v3MfKvPhXPCDM7+GpadvbK9rIeNOSrL95yVcqXNBJeqPm7Q/pmt7Zlrt7JPN3e4eRIQ/ z053iLnGQcSYK1DpZSB0bzbiMs7HjpQ0ZeJxVKJn79COhUwCkEGjjEH0OABEXPeEzcEr tn3w== X-Gm-Message-State: AOAM530k10PBjIX1hgsck6KHXqnxxv4QWHwQaCaRoseHFMNtE0bZHjYT nEdnbYYKjE4l/+ET2tDa9Holm+Wxx3w= X-Google-Smtp-Source: ABdhPJwgd55726mpZg1w10uIFJzbF2uR3jipheYJdILN9E7yyns0/YBdG8QAhk4RF1qb1kW7vjoN7g== X-Received: by 2002:ac2:4e64:: with SMTP id y4mr3359472lfs.238.1624375382266; Tue, 22 Jun 2021 08:23:02 -0700 (PDT) Received: from grain.localdomain ([5.18.199.94]) by smtp.gmail.com with ESMTPSA id c14sm2236872lfv.293.2021.06.22.08.23.00 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Tue, 22 Jun 2021 08:23:00 -0700 (PDT) Received: by grain.localdomain (Postfix, from userid 1000) id 1A46D5A001E; Tue, 22 Jun 2021 18:22:48 +0300 (MSK) To: tml Date: Tue, 22 Jun 2021 18:22:45 +0300 Message-Id: <20210622152246.497955-2-gorcunov@gmail.com> X-Mailer: git-send-email 2.31.1 In-Reply-To: <20210622152246.497955-1-gorcunov@gmail.com> References: <20210622152246.497955-1-gorcunov@gmail.com> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v10 1/2] applier: send transaction's first row WAL time in the applier_writer_f X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Cyrill Gorcunov via Tarantool-patches Reply-To: Cyrill Gorcunov Cc: Vladislav Shpilevoy Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Applier fiber sends current vclock of the node to remote relay reader, pointing current state of fetched WAL data so the relay will know which new data should be sent. The packet applier sends carries xrow_header::tm field as a zero but we can reuse it to provide information about first timestamp in a transaction we wrote to our WAL. Since old instances of Tarantool simply ignore this field such extension won't cause any problems. The timestamp will be needed to account lag of downstream replicas suitable for information purpose and cluster health monitoring. We update applier statistics in WAL callbacks but since both apply_synchro_row and apply_plain_tx are used not only in real data application but in final join stage as well (in this stage we're not writing the data yet) the apply_synchro_row is extended with replica_id argument which is non zero when applier is subscribed. The calculation of the downstream lag itself lag will be addressed in next patch because sending the timestamp and its observation are independent actions. Part-of #5447 Signed-off-by: Cyrill Gorcunov --- src/box/applier.cc | 101 ++++++++++++++++++++++++++++++++++------- src/box/replication.cc | 1 + src/box/replication.h | 5 ++ 3 files changed, 90 insertions(+), 17 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index ee5c05c0d..07fe7f5c7 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -163,6 +163,9 @@ applier_writer_f(va_list ap) struct ev_io io; coio_create(&io, applier->io.fd); + /* ID is permanent while applier is alive */ + uint32_t replica_id = applier->instance_id; + while (!fiber_is_cancelled()) { /* * Tarantool >= 1.7.7 sends periodic heartbeat @@ -193,6 +196,16 @@ applier_writer_f(va_list ap) applier->has_acks_to_send = false; struct xrow_header xrow; xrow_encode_vclock(&xrow, &replicaset.vclock); + /* + * For relay lag statistics we report last + * written transaction timestamp in tm field. + * + * If user delete the node from _cluster space, + * we obtain a nil pointer here. + */ + struct replica *r = replica_by_id(replica_id); + if (likely(r != NULL)) + xrow.tm = r->applier_txn_last_tm; coio_write_xrow(&io, &xrow); ERROR_INJECT(ERRINJ_APPLIER_SLOW_ACK, { fiber_sleep(0.01); @@ -490,7 +503,7 @@ static uint64_t applier_read_tx(struct applier *applier, struct stailq *rows, double timeout); static int -apply_final_join_tx(struct stailq *rows); +apply_final_join_tx(uint32_t replica_id, struct stailq *rows); /** * A helper struct to link xrow objects in a list. @@ -535,7 +548,7 @@ applier_wait_register(struct applier *applier, uint64_t row_count) next)->row); break; } - if (apply_final_join_tx(&rows) != 0) + if (apply_final_join_tx(applier->instance_id, &rows) != 0) diag_raise(); } @@ -751,11 +764,35 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event) return 0; } +struct replica_cb_data { + /** Replica ID the data belongs to. */ + uint32_t replica_id; + /** + * Timestamp of a transaction to be accounted + * for relay lag. Usually it is a last row in + * a transaction. + */ + double txn_last_tm; +}; + +/** Update replica associated data once write is complete. */ +static void +replica_txn_wal_write_cb(struct replica_cb_data *rcb) +{ + struct replica *r = replica_by_id(rcb->replica_id); + if (likely(r != NULL)) + r->applier_txn_last_tm = rcb->txn_last_tm; +} + static int applier_txn_wal_write_cb(struct trigger *trigger, void *event) { - (void) trigger; (void) event; + + struct replica_cb_data *rcb = + (struct replica_cb_data *)trigger->data; + replica_txn_wal_write_cb(rcb); + /* Broadcast the WAL write across all appliers. */ trigger_run(&replicaset.applier.on_wal_write, NULL); return 0; @@ -766,6 +803,8 @@ struct synchro_entry { struct synchro_request *req; /** Fiber created the entry. To wakeup when WAL write is done. */ struct fiber *owner; + /** Replica associated data. */ + struct replica_cb_data *rcb; /** * The base journal entry. It has unsized array and then must be the * last entry in the structure. But can workaround it via a union @@ -789,6 +828,7 @@ apply_synchro_row_cb(struct journal_entry *entry) if (entry->res < 0) { applier_rollback_by_wal_io(entry->res); } else { + replica_txn_wal_write_cb(synchro_entry->rcb); txn_limbo_process(&txn_limbo, synchro_entry->req); trigger_run(&replicaset.applier.on_wal_write, NULL); } @@ -797,7 +837,7 @@ apply_synchro_row_cb(struct journal_entry *entry) /** Process a synchro request. */ static int -apply_synchro_row(struct xrow_header *row) +apply_synchro_row(uint32_t replica_id, struct xrow_header *row) { assert(iproto_type_is_synchro_request(row->type)); @@ -805,6 +845,7 @@ apply_synchro_row(struct xrow_header *row) if (xrow_decode_synchro(row, &req) != 0) goto err; + struct replica_cb_data rcb_data; struct synchro_entry entry; /* * Rows array is cast from *[] to **, because otherwise g++ complains @@ -817,6 +858,11 @@ apply_synchro_row(struct xrow_header *row) apply_synchro_row_cb, &entry); entry.req = &req; entry.owner = fiber(); + + rcb_data.replica_id = replica_id; + rcb_data.txn_last_tm = row->tm; + entry.rcb = &rcb_data; + /* * The WAL write is blocking. Otherwise it might happen that a CONFIRM * or ROLLBACK is sent to WAL, and it would empty the limbo, but before @@ -864,8 +910,9 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row) return box_raft_process(&req, applier->instance_id); } -static inline int -apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) +static int +apply_plain_tx(uint32_t replica_id, struct stailq *rows, + bool skip_conflict, bool use_triggers) { /* * Explicitly begin the transaction so that we can @@ -933,10 +980,28 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) goto fail; } + struct replica_cb_data *rcb; + rcb = region_alloc_object(&txn->region, typeof(*rcb), &size); + if (rcb == NULL) { + diag_set(OutOfMemory, size, "region_alloc_object", "rcb"); + goto fail; + } + trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL); txn_on_rollback(txn, on_rollback); - trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL); + /* + * We use *last* entry timestamp because ack comes up to + * last entry in transaction. Same time this shows more + * precise result because we're interested in how long + * transaction traversed network + remote WAL bundle before + * ack get received. + */ + item = stailq_last_entry(rows, struct applier_tx_row, next); + rcb->replica_id = replica_id; + rcb->txn_last_tm = item->row.tm; + + trigger_create(on_wal_write, applier_txn_wal_write_cb, rcb, NULL); txn_on_wal_write(txn, on_wal_write); } @@ -948,7 +1013,7 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) /** A simpler version of applier_apply_tx() for final join stage. */ static int -apply_final_join_tx(struct stailq *rows) +apply_final_join_tx(uint32_t replica_id, struct stailq *rows) { struct xrow_header *first_row = &stailq_first_entry(rows, struct applier_tx_row, next)->row; @@ -959,9 +1024,9 @@ apply_final_join_tx(struct stailq *rows) vclock_follow_xrow(&replicaset.vclock, last_row); if (unlikely(iproto_type_is_synchro_request(first_row->type))) { assert(first_row == last_row); - rc = apply_synchro_row(first_row); + rc = apply_synchro_row(replica_id, first_row); } else { - rc = apply_plain_tx(rows, false, false); + rc = apply_plain_tx(replica_id, rows, false, false); } fiber_gc(); return rc; @@ -1029,7 +1094,7 @@ nopify:; * Return 0 for success or -1 in case of an error. */ static int -applier_apply_tx(struct stailq *rows) +applier_apply_tx(struct applier *applier, struct stailq *rows) { /* * Initially we've been filtering out data if it came from @@ -1093,12 +1158,14 @@ applier_apply_tx(struct stailq *rows) * each other. */ assert(first_row == last_row); - if ((rc = apply_synchro_row(first_row)) != 0) - goto finish; - } else if ((rc = apply_plain_tx(rows, replication_skip_conflict, - true)) != 0) { - goto finish; + rc = apply_synchro_row(applier->instance_id, first_row); + } else { + rc = apply_plain_tx(applier->instance_id, rows, + replication_skip_conflict, true); } + if (rc != 0) + goto finish; + vclock_follow(&replicaset.applier.vclock, last_row->replica_id, last_row->lsn); finish: @@ -1317,7 +1384,7 @@ applier_subscribe(struct applier *applier) diag_raise(); } applier_signal_ack(applier); - } else if (applier_apply_tx(&rows) != 0) { + } else if (applier_apply_tx(applier, &rows) != 0) { diag_raise(); } diff --git a/src/box/replication.cc b/src/box/replication.cc index 903390686..a0b3e0186 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -184,6 +184,7 @@ replica_new(void) trigger_create(&replica->on_applier_state, replica_on_applier_state_f, NULL, NULL); replica->applier_sync_state = APPLIER_DISCONNECTED; + replica->applier_txn_last_tm = 0; latch_create(&replica->order_latch); return replica; } diff --git a/src/box/replication.h b/src/box/replication.h index 5cc380373..57e0f10ae 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -331,6 +331,11 @@ struct replica { * separate from applier. */ enum applier_state applier_sync_state; + /** + * Applier's last written to WAL transaction timestamp. + * Needed for relay lagging statistics. + */ + double applier_txn_last_tm; /* The latch is used to order replication requests. */ struct latch order_latch; }; -- 2.31.1