From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 677FF6E200; Fri, 18 Jun 2021 12:50:04 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 677FF6E200 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1624009804; bh=yQBOq+ZWlysAoJEmk7sBw/uDytJZiKsdv4lm4LwnP3Y=; h=To:Cc:References:Date:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=p113L1qaMY44ZUagtFiqML5bkfdmcKejvtob0PTPW/qYV09ahqtfUsrE7GFvMDyLx UPbKTXdUCllJszvkUxCLo7Dxr1NkWD4S1uixwrAnQRIE4JzPNqWVZNtCqppJI/0lqB Np9ZgzwOHWUvodlHhzZ616XrSBiCV1Sp+mLKwglw= Received: from smtp38.i.mail.ru (smtp38.i.mail.ru [94.100.177.98]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id ECEC86E200 for ; Fri, 18 Jun 2021 12:50:02 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org ECEC86E200 Received: by smtp38.i.mail.ru with esmtpa (envelope-from ) id 1luB8U-0005lZ-0N; Fri, 18 Jun 2021 12:50:02 +0300 To: Cyrill Gorcunov , tml Cc: Vladislav Shpilevoy References: <20210617154835.315576-1-gorcunov@gmail.com> <20210617154835.315576-3-gorcunov@gmail.com> Message-ID: <816930c9-c890-68e5-c25e-53543beb5df7@tarantool.org> Date: Fri, 18 Jun 2021 12:50:01 +0300 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:78.0) Gecko/20100101 Thunderbird/78.11.0 MIME-Version: 1.0 In-Reply-To: <20210617154835.315576-3-gorcunov@gmail.com> Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit Content-Language: en-GB X-4EC0790: 10 X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD91C2C07775F13263A7E15E163E5632B0345CA5730AF53AB3C00894C459B0CD1B9131776FD954572DD4016C3A1D69652C024E7573F40F3A4294ED41D986C17A953 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7682AC3CFDB0E5E4FEA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F790063748C26B83F2B024408638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D8CFE0195C2028AD6AB60E8B2FCDF4A502117882F4460429724CE54428C33FAD305F5C1EE8F4F765FCAA867293B0326636D2E47CDBA5A96583BD4B6F7A4D31EC0BC014FD901B82EE079FA2833FD35BB23D27C277FBC8AE2E8BF1175FABE1C0F9B6A471835C12D1D977C4224003CC8364762BB6847A3DEAEFB0F43C7A68FF6260569E8FC8737B5C2249EC8D19AE6D49635B68655334FD4449CB9ECD01F8117BC8BEAAAE862A0553A39223F8577A6DFFEA7CE0F3BA37685B2B9043847C11F186F3C59DAA53EE0834AAEE X-C1DE0DAB: 0D63561A33F958A584874784EAAA979AB59BAC4C06758E302776D3810DAB9BDED59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA75448CF9D3A7B2C848410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34C2BE7381A0AD3DF19F1A198A71D7B4F9E411010C311633AA37B8D608EA50A0B694AFD05B056A09BF1D7E09C32AA3244C3F2B6C30AD8237C10D3A8E6E90E2B4E869B6CAE0477E908DFACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2bioj0Roc5o5ut97nz+wUJvgurg== X-Mailru-Sender: 3B9A0136629DC9125D61937A2360A446BCFC3E63DD4E9A90CB430106CCF8460A307B5A32C6BFE164424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BC112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH v9 2/2] relay: provide information about downstream lag X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" 17.06.2021 18:48, Cyrill Gorcunov пишет: > We already have `box.replication.upstream.lag` entry for monitoring > sake. Same time in synchronous replication timeouts are key properties > for quorum gathering procedure. Thus we would like to know how long > it took of a transaction to traverse `initiator WAL -> network -> > remote applier -> initiator ACK reception` path. > > Typical output is > > | tarantool> box.info.replication[2].downstream > | --- > | - status: follow > | idle: 0.61753897101153 > | vclock: {1: 147} > | lag: 0 > | ... > | tarantool> box.space.sync:insert{69} > | --- > | - [69] > | ... > | > | tarantool> box.info.replication[2].downstream > | --- > | - status: follow > | idle: 0.75324084801832 > | vclock: {1: 151} > | lag: 0.0011014938354492 > | ... > > Closes #5447 Hi! Thanks for the fixes! Please find a couple of comments below. > > Signed-off-by: Cyrill Gorcunov > > @TarantoolBot document > Title: Add `box.info.replication[n].downstream.lag` entry > > `replication[n].downstream.lag` represents a lag between the main > node writes a certain transaction to it's own WAL and a moment it > receives an ack for this transaction from a replica. > --- > .../unreleased/gh-5447-downstream-lag.md | 6 + > src/box/lua/info.c | 3 + > src/box/relay.cc | 94 ++++++++++++- > src/box/relay.h | 6 + > .../replication/gh-5447-downstream-lag.result | 128 ++++++++++++++++++ > .../gh-5447-downstream-lag.test.lua | 57 ++++++++ > 6 files changed, 290 insertions(+), 4 deletions(-) > create mode 100644 changelogs/unreleased/gh-5447-downstream-lag.md > create mode 100644 test/replication/gh-5447-downstream-lag.result > create mode 100644 test/replication/gh-5447-downstream-lag.test.lua > > diff --git a/changelogs/unreleased/gh-5447-downstream-lag.md b/changelogs/unreleased/gh-5447-downstream-lag.md > new file mode 100644 > index 000000000..f937ce35e > --- /dev/null > +++ b/changelogs/unreleased/gh-5447-downstream-lag.md > @@ -0,0 +1,6 @@ > +#feature/replication > + > + * Introduced `box.info.replication[n].downstream.lag` field to monitor > + state of replication. This member represents a lag between the main > + node writes a certain transaction to it's own WAL and a moment it > + receives an ack for this transaction from a replica (gh-5447). > diff --git a/src/box/lua/info.c b/src/box/lua/info.c > index 0eb48b823..f201b25e3 100644 > --- a/src/box/lua/info.c > +++ b/src/box/lua/info.c > @@ -143,6 +143,9 @@ lbox_pushrelay(lua_State *L, struct relay *relay) > lua_pushnumber(L, ev_monotonic_now(loop()) - > relay_last_row_time(relay)); > lua_settable(L, -3); > + lua_pushstring(L, "lag"); > + lua_pushnumber(L, relay_txn_lag(relay)); > + lua_settable(L, -3); > break; > case RELAY_STOPPED: > { > diff --git a/src/box/relay.cc b/src/box/relay.cc > index b1571b361..ed6dd193d 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -70,6 +70,18 @@ struct relay_status_msg { > struct vclock vclock; > }; > > +/** > + * Cbus message to update relay reader fiber status in tx thread. > + */ > +struct relay_reader_msg { > + /** Parent */ > + struct cmsg msg; > + /** Relay instance */ > + struct relay *relay; > + /** Transaction lag value. */ > + double txn_lag; > +}; > + > /** > * Cbus message to update replica gc state in tx thread. > */ > @@ -151,6 +163,8 @@ struct relay { > struct cpipe relay_pipe; > /** Status message */ > struct relay_status_msg status_msg; > + /** Reader fiber message */ > + struct relay_reader_msg reader_msg; > /** > * List of garbage collection messages awaiting > * confirmation from the replica. > @@ -158,6 +172,11 @@ struct relay { > struct stailq pending_gc; > /** Time when last row was sent to peer. */ > double last_row_time; > + /** > + * Last timestamp observed from remote node to > + * compute @a tx.txn_lag value. > + */ > + double txn_acked_tm; > /** Relay sync state. */ > enum relay_state state; > > @@ -166,6 +185,14 @@ struct relay { > alignas(CACHELINE_SIZE) > /** Known relay vclock. */ > struct vclock vclock; > + /** > + * A time difference between the moment when we > + * wrote a transaction to the local WAL and when > + * this transaction has been replicated to remote > + * node (ie written to node's WAL) so that ACK get > + * received. > + */ > + double txn_lag; > /** > * True if the relay needs Raft updates. It can live fine > * without sending Raft updates, if it is a relay to an > @@ -217,6 +244,12 @@ relay_last_row_time(const struct relay *relay) > return relay->last_row_time; > } > > +double > +relay_txn_lag(const struct relay *relay) > +{ > + return relay->tx.txn_lag; > +} > + > static void > relay_send(struct relay *relay, struct xrow_header *packet); > static void > @@ -284,6 +317,15 @@ relay_start(struct relay *relay, int fd, uint64_t sync, > relay->state = RELAY_FOLLOW; > relay->row_count = 0; > relay->last_row_time = ev_monotonic_now(loop()); > + /* > + * We assume that previously written rows in WAL > + * are older than current node real time which allows > + * to simplify @a tx.txn_lag calculation. In worst > + * scenario when runtime has been adjusted backwards > + * between restart we simply get some big value in > + * @a tx.txn_lag until next transaction get replicated. > + */ > + relay->txn_acked_tm = ev_now(loop()); > } > > void > @@ -336,6 +378,12 @@ relay_stop(struct relay *relay) > * upon cord_create(). > */ > relay->cord.id = 0; > + /* > + * If relay is stopped then lag statistics should > + * be updated on next new ACK packets obtained. > + */ > + relay->txn_acked_tm = 0; > + relay->tx.txn_lag = 0; > } > > void > @@ -466,11 +514,10 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, > } > > /** > - * The message which updated tx thread with a new vclock has returned back > - * to the relay. > + * The message which updated tx thread returns to the relay thread. > */ > static void > -relay_status_update(struct cmsg *msg) > +tx_update_complete(struct cmsg *msg) Let's name this `relay_update_complete` or something similar, please. Otherwise it looks like something executed in tx thread. > { > msg->route = NULL; > } > @@ -500,7 +547,7 @@ tx_status_update(struct cmsg *msg) > trigger_run(&replicaset.on_ack, &ack); > > static const struct cmsg_hop route[] = { > - {relay_status_update, NULL} > + {tx_update_complete, NULL} > }; > cmsg_init(msg, route); > cpipe_push(&status->relay->relay_pipe, msg); > @@ -607,6 +654,20 @@ relay_process_wal_event(struct wal_watcher *watcher, unsigned events) > } > } > > +static void > +tx_reader_update(struct cmsg *msg) > +{ > + struct relay_reader_msg *rmsg = (struct relay_reader_msg *)msg; > + rmsg->relay->tx.txn_lag = rmsg->txn_lag; > + > + static const struct cmsg_hop route[] = { > + {tx_update_complete, NULL} > + }; > + > + cmsg_init(msg, route); > + cpipe_push(&rmsg->relay->relay_pipe, msg); > +} > + > /* > * Relay reader fiber function. > * Read xrow encoded vclocks sent by the replica. > @@ -616,6 +677,7 @@ relay_reader_f(va_list ap) > { > struct relay *relay = va_arg(ap, struct relay *); > struct fiber *relay_f = va_arg(ap, struct fiber *); > + struct relay_reader_msg *rmsg = &relay->reader_msg; > > struct ibuf ibuf; > struct ev_io io; > @@ -629,6 +691,30 @@ relay_reader_f(va_list ap) > /* vclock is followed while decoding, zeroing it. */ > vclock_create(&relay->recv_vclock); > xrow_decode_vclock_xc(&xrow, &relay->recv_vclock); > + /* > + * Replica send us last replicated transaction > + * timestamp which is needed for relay lag > + * monitoring. Note that this transaction has > + * been written to WAL with our current realtime > + * clock value, thus when it get reported back we > + * can compute time spent regardless of the clock > + * value on remote replica. Since the communication > + * goes between thread make sure the previous update > + * to TX is complete. > + */ > + if (relay->txn_acked_tm < xrow.tm && > + rmsg->msg.route == NULL) { > + relay->txn_acked_tm = xrow.tm; > + > + static const struct cmsg_hop route[] = { > + {tx_reader_update, NULL} > + }; > + > + cmsg_init(&rmsg->msg, route); > + rmsg->txn_lag = ev_now(loop()) - xrow.tm; > + rmsg->relay = relay; > + cpipe_push(&relay->tx_pipe, &rmsg->msg); > + } This bothers me a bit that now we have 2 places which issue cbus messages from relay to tx. I mean relay_reader_f and relay_subscribe_f. It would be nice if you could unify these two messages. For example, enrich tx_status_update message with new txn_lag value. I think this should be fine to send the cbus message a bit later than the actual ACK was received. This is how it's done for replica's vclock. So, I propose to either move vclock update (relay->status_msg) push to relay_reader_f or move time update (relay->reader_msg) to relay_subscribe_f. And unify the messages. This way we'll have a single source for all relay->tx updates and less code duplication. P.S. moving relay->status_msg push to relay_reader_f sounds more logical to me. > fiber_cond_signal(&relay->reader_cond); > } > } catch (Exception *e) { > diff --git a/src/box/relay.h b/src/box/relay.h > index b32e2ea2a..615ffb75d 100644 > --- a/src/box/relay.h > +++ b/src/box/relay.h > @@ -93,6 +93,12 @@ relay_vclock(const struct relay *relay); > double > relay_last_row_time(const struct relay *relay); > > +/** > + * Returns relay's transaction's lag. > + */ > +double > +relay_txn_lag(const struct relay *relay); > + > /** > * Send a Raft update request to the relay channel. It is not > * guaranteed that it will be delivered. The connection may break. > diff --git a/test/replication/gh-5447-downstream-lag.result b/test/replication/gh-5447-downstream-lag.result > new file mode 100644 > index 000000000..2cc020451 > --- /dev/null > +++ b/test/replication/gh-5447-downstream-lag.result > @@ -0,0 +1,128 @@ > +-- test-run result file version 2 > +-- > +-- gh-5447: Test for box.info.replication[n].downstream.lag. > +-- We need to be sure that slow ACKs delivery might be > +-- caught by monitoring tools. > +-- > + > +fiber = require('fiber') > + | --- > + | ... > +test_run = require('test_run').new() > + | --- > + | ... > +engine = test_run:get_cfg('engine') > + | --- > + | ... > + > +box.schema.user.grant('guest', 'replication') > + | --- > + | ... > + > +test_run:cmd('create server replica with rpl_master=default, \ > + script="replication/replica.lua"') > + | --- > + | - true > + | ... > +test_run:cmd('start server replica') > + | --- > + | - true > + | ... > + > +replica_id = test_run:get_server_id('replica') > + | --- > + | ... > + > +-- > +-- Upon replica startup there is no ACKs to process. > +assert(box.info.replication[replica_id].downstream.lag == 0) > + | --- > + | - true > + | ... > + > +s = box.schema.space.create('test', {engine = engine}) > + | --- > + | ... > +_ = s:create_index('pk') > + | --- > + | ... > + > +-- > +-- The replica should wait some time before writing data > +-- to the WAL, otherwise we might not even notice the lag > +-- if media is too fast. Before disabling WAL we need to > +-- wait the space get propagated. > +test_run:switch('replica') > + | --- > + | - true > + | ... > +test_run:wait_lsn('replica', 'default') > + | --- > + | ... > +box.error.injection.set("ERRINJ_WAL_DELAY", true) > + | --- > + | - ok > + | ... > + > +-- > +-- Insert a record and wakeup replica's WAL to process data. > +test_run:switch('default') > + | --- > + | - true > + | ... > +lsn = box.info.lsn > + | --- > + | ... > +box.space.test:insert({1}) > + | --- > + | - [1] > + | ... > +test_run:wait_cond(function() return box.info.lsn > lsn end) > + | --- > + | - true > + | ... > +-- The record is written on the master node. > +test_run:switch('replica') > + | --- > + | - true > + | ... > +box.error.injection.set("ERRINJ_WAL_DELAY", false) > + | --- > + | - ok > + | ... > + > +-- > +-- Wait the record to be ACKed, the lag value should be nonzero. > +test_run:switch('default') > + | --- > + | - true > + | ... > +test_run:wait_lsn('replica', 'default') > + | --- > + | ... > +assert(box.info.replication[replica_id].downstream.lag > 0) > + | --- > + | - true > + | ... > + > +-- > +-- Cleanup everything. > +test_run:switch('default') > + | --- > + | - true > + | ... > +box.schema.user.revoke('guest', 'replication') > + | --- > + | ... > +test_run:cmd('stop server replica') > + | --- > + | - true > + | ... > +test_run:cmd('cleanup server replica') > + | --- > + | - true > + | ... > +test_run:cmd('delete server replica') > + | --- > + | - true > + | ... > diff --git a/test/replication/gh-5447-downstream-lag.test.lua b/test/replication/gh-5447-downstream-lag.test.lua > new file mode 100644 > index 000000000..3096e2ac3 > --- /dev/null > +++ b/test/replication/gh-5447-downstream-lag.test.lua > @@ -0,0 +1,57 @@ > +-- > +-- gh-5447: Test for box.info.replication[n].downstream.lag. > +-- We need to be sure that slow ACKs delivery might be > +-- caught by monitoring tools. > +-- > + > +fiber = require('fiber') > +test_run = require('test_run').new() > +engine = test_run:get_cfg('engine') > + > +box.schema.user.grant('guest', 'replication') > + > +test_run:cmd('create server replica with rpl_master=default, \ > + script="replication/replica.lua"') > +test_run:cmd('start server replica') > + > +replica_id = test_run:get_server_id('replica') > + > +-- > +-- Upon replica startup there is no ACKs to process. > +assert(box.info.replication[replica_id].downstream.lag == 0) > + > +s = box.schema.space.create('test', {engine = engine}) > +_ = s:create_index('pk') > + > +-- > +-- The replica should wait some time before writing data > +-- to the WAL, otherwise we might not even notice the lag > +-- if media is too fast. Before disabling WAL we need to > +-- wait the space get propagated. > +test_run:switch('replica') > +test_run:wait_lsn('replica', 'default') > +box.error.injection.set("ERRINJ_WAL_DELAY", true) > + > +-- > +-- Insert a record and wakeup replica's WAL to process data. > +test_run:switch('default') > +lsn = box.info.lsn > +box.space.test:insert({1}) > +test_run:wait_cond(function() return box.info.lsn > lsn end) > +-- The record is written on the master node. > +test_run:switch('replica') > +box.error.injection.set("ERRINJ_WAL_DELAY", false) > + > +-- > +-- Wait the record to be ACKed, the lag value should be nonzero. > +test_run:switch('default') > +test_run:wait_lsn('replica', 'default') > +assert(box.info.replication[replica_id].downstream.lag > 0) > + > +-- > +-- Cleanup everything. > +test_run:switch('default') > +box.schema.user.revoke('guest', 'replication') > +test_run:cmd('stop server replica') > +test_run:cmd('cleanup server replica') > +test_run:cmd('delete server replica') -- Serge Petrenko