From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 3F6F26E200; Fri, 18 Jun 2021 12:51:04 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 3F6F26E200 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1624009864; bh=n6Fb/7hXk+jFPyMwFzh4SuMHgy9MrLRA4W7a187/Zyk=; h=To:Cc:References:Date:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=SD48k5DjTBpbTT3Pyr3NF4Ut8sjzcK5JtrEaOmyZTfJDzqVjfyNN5sJSZ0YWdO6eM KIucrEBQQK+CSBoS+EgHBVcVIoiM4dbRFXAaWMcB0FRerQnzD5aA6qMzd0ryjiYZi1 J29QcIfOtIwfU7SHLGJCQ2rsZ6gH6uzcZeQYQjB0= Received: from smtp60.i.mail.ru (smtp60.i.mail.ru [217.69.128.40]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id CC15D6E200 for ; Fri, 18 Jun 2021 12:51:03 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org CC15D6E200 Received: by smtp60.i.mail.ru with esmtpa (envelope-from ) id 1luB9T-0003MP-1d; Fri, 18 Jun 2021 12:51:03 +0300 To: Cyrill Gorcunov , tml Cc: Vladislav Shpilevoy References: <20210617154835.315576-1-gorcunov@gmail.com> <20210617154835.315576-2-gorcunov@gmail.com> Message-ID: <3ebc9966-6c9f-5206-4cd4-1fff97530709@tarantool.org> Date: Fri, 18 Jun 2021 12:51:02 +0300 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:78.0) Gecko/20100101 Thunderbird/78.11.0 MIME-Version: 1.0 In-Reply-To: <20210617154835.315576-2-gorcunov@gmail.com> Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit Content-Language: ru X-4EC0790: 10 X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD91C2C07775F13263A38E2610A94B554561BE460F683D6B25800894C459B0CD1B96E009A641E589ECFB1768D3341793AA224E7573F40F3A429A7427CD01E6BCEC9 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE78E88BD1CA827EF00C2099A533E45F2D0395957E7521B51C2CFCAF695D4D8E9FCEA1F7E6F0F101C6778DA827A17800CE7F16C4DE526EFCC04EA1F7E6F0F101C6723150C8DA25C47586E58E00D9D99D84E1BDDB23E98D2D38BD6CF32B5F8F9D404FE38E2F7A5189F6B0946ED631ABD2061CC7F00164DA146DAFE8445B8C89999728AA50765F790063783E00425F71A4181389733CBF5DBD5E9C8A9BA7A39EFB766F5D81C698A659EA7CC7F00164DA146DA9985D098DBDEAEC8989FD0BDF65E50FBF6B57BC7E6449061A352F6E88A58FB86F5D81C698A659EA7E827F84554CEF5019E625A9149C048EE9ECD01F8117BC8BEE2021AF6380DFAD18AA50765F790063735872C767BF85DA227C277FBC8AE2E8B9149C560DC76099D75ECD9A6C639B01B4E70A05D1297E1BBCB5012B2E24CD356 X-C1DE0DAB: 0D63561A33F958A55EF087DBA3DD4961D4840239720CB0802AF25C57F5732AA3D59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA75448CF9D3A7B2C848410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34AAC3D1FDB34D0488A71D9FE62EE84C0E81F0AE2762D0A7637EF408AB5B4353A9AD8A7550F0065DE71D7E09C32AA3244C1CB3B1A5D9D38436700150577D222999D9ADFF0C0BDB8D1FFACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2bioj0Roc5o5ut96wbYjHrisXXw== X-Mailru-Sender: 3B9A0136629DC9125D61937A2360A4460DFEEEB35216117A3AEEED5D1ADDB438770536845BCBA975424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BC112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH v9 1/2] applier: send transaction's first row WAL time in the applier_writer_f X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" 17.06.2021 18:48, Cyrill Gorcunov пишет: > Applier fiber sends current vclock of the node to remote relay reader, > pointing current state of fetched WAL data so the relay will know which > new data should be sent. The packet applier sends carries xrow_header::tm > field as a zero but we can reuse it to provide information about first > timestamp in a transaction we wrote to our WAL. Since old instances of > Tarantool simply ignore this field such extension won't cause any > problems. > > The timestamp will be needed to account lag of downstream replicas > suitable for information purpose and cluster health monitoring. > > We update applier statistics in WAL callbacks but since both > apply_synchro_row and apply_plain_tx are used not only in real data > application but in final join stage as well (in this stage we're not > writing the data yet) the apply_synchro_row is extended with replica_id > argument which is non zero when applier is subscribed. > > The calculation of the downstream lag itself lag will be addressed > in next patch because sending the timestamp and its observation > are independent actions. > > Part-of #5447 > > Signed-off-by: Cyrill Gorcunov Thanks for the patch! LGTM! > --- > src/box/applier.cc | 97 +++++++++++++++++++++++++++++++++++------- > src/box/replication.cc | 1 + > src/box/replication.h | 5 +++ > 3 files changed, 88 insertions(+), 15 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 10cea26a7..0782be513 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -163,6 +163,9 @@ applier_writer_f(va_list ap) > struct ev_io io; > coio_create(&io, applier->io.fd); > > + /* ID is permanent while applier is alive */ > + uint32_t replica_id = applier->instance_id; > + > while (!fiber_is_cancelled()) { > /* > * Tarantool >= 1.7.7 sends periodic heartbeat > @@ -193,6 +196,16 @@ applier_writer_f(va_list ap) > applier->has_acks_to_send = false; > struct xrow_header xrow; > xrow_encode_vclock(&xrow, &replicaset.vclock); > + /* > + * For relay lag statistics we report last > + * written transaction timestamp in tm field. > + * > + * If user delete the node from _cluster space, > + * we obtain a nil pointer here. > + */ > + struct replica *r = replica_by_id(replica_id); > + if (likely(r != NULL)) > + xrow.tm = r->applier_txn_last_tm; > coio_write_xrow(&io, &xrow); > ERROR_INJECT(ERRINJ_APPLIER_SLOW_ACK, { > fiber_sleep(0.01); > @@ -490,7 +503,7 @@ static uint64_t > applier_read_tx(struct applier *applier, struct stailq *rows, double timeout); > > static int > -apply_final_join_tx(struct stailq *rows); > +apply_final_join_tx(uint32_t replica_id, struct stailq *rows); > > /** > * A helper struct to link xrow objects in a list. > @@ -535,7 +548,7 @@ applier_wait_register(struct applier *applier, uint64_t row_count) > next)->row); > break; > } > - if (apply_final_join_tx(&rows) != 0) > + if (apply_final_join_tx(applier->instance_id, &rows) != 0) > diag_raise(); > } > > @@ -751,11 +764,35 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event) > return 0; > } > > +struct replica_cb_data { > + /** Replica ID the data belongs to. */ > + uint32_t replica_id; > + /** > + * Timestamp of a transaction to be accounted > + * for relay lag. Usually it is a last row in > + * a transaction. > + */ > + double txn_last_tm; > +}; > + > +/** Update replica associated data once write is complete. */ > +static void > +replica_txn_wal_write_cb(struct replica_cb_data *rcb) > +{ > + struct replica *r = replica_by_id(rcb->replica_id); > + if (likely(r != NULL)) > + r->applier_txn_last_tm = rcb->txn_last_tm; > +} > + > static int > applier_txn_wal_write_cb(struct trigger *trigger, void *event) > { > - (void) trigger; > (void) event; > + > + struct replica_cb_data *rcb = > + (struct replica_cb_data *)trigger->data; > + replica_txn_wal_write_cb(rcb); > + > /* Broadcast the WAL write across all appliers. */ > trigger_run(&replicaset.applier.on_wal_write, NULL); > return 0; > @@ -766,6 +803,8 @@ struct synchro_entry { > struct synchro_request *req; > /** Fiber created the entry. To wakeup when WAL write is done. */ > struct fiber *owner; > + /** Replica associated data. */ > + struct replica_cb_data *rcb; > /** > * The base journal entry. It has unsized array and then must be the > * last entry in the structure. But can workaround it via a union > @@ -789,6 +828,7 @@ apply_synchro_row_cb(struct journal_entry *entry) > if (entry->res < 0) { > applier_rollback_by_wal_io(entry->res); > } else { > + replica_txn_wal_write_cb(synchro_entry->rcb); > txn_limbo_process(&txn_limbo, synchro_entry->req); > trigger_run(&replicaset.applier.on_wal_write, NULL); > } > @@ -797,7 +837,7 @@ apply_synchro_row_cb(struct journal_entry *entry) > > /** Process a synchro request. */ > static int > -apply_synchro_row(struct xrow_header *row) > +apply_synchro_row(uint32_t replica_id, struct xrow_header *row) > { > assert(iproto_type_is_synchro_request(row->type)); > > @@ -805,6 +845,7 @@ apply_synchro_row(struct xrow_header *row) > if (xrow_decode_synchro(row, &req) != 0) > goto err; > > + struct replica_cb_data rcb_data; > struct synchro_entry entry; > /* > * Rows array is cast from *[] to **, because otherwise g++ complains > @@ -817,6 +858,11 @@ apply_synchro_row(struct xrow_header *row) > apply_synchro_row_cb, &entry); > entry.req = &req; > entry.owner = fiber(); > + > + rcb_data.replica_id = replica_id; > + rcb_data.txn_last_tm = row->tm; > + entry.rcb = &rcb_data; > + > /* > * The WAL write is blocking. Otherwise it might happen that a CONFIRM > * or ROLLBACK is sent to WAL, and it would empty the limbo, but before > @@ -864,8 +910,9 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row) > return box_raft_process(&req, applier->instance_id); > } > > -static inline int > -apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) > +static int > +apply_plain_tx(uint32_t replica_id, struct stailq *rows, > + bool skip_conflict, bool use_triggers) > { > /* > * Explicitly begin the transaction so that we can > @@ -933,10 +980,28 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) > goto fail; > } > > + struct replica_cb_data *rcb; > + rcb = region_alloc_object(&txn->region, typeof(*rcb), &size); > + if (rcb == NULL) { > + diag_set(OutOfMemory, size, "region_alloc_object", "rcb"); > + goto fail; > + } > + > trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL); > txn_on_rollback(txn, on_rollback); > > - trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL); > + /* > + * We use *last* entry timestamp because ack comes up to > + * last entry in transaction. Same time this shows more > + * precise result because we're interested in how long > + * transaction traversed network + remote WAL bundle before > + * ack get received. > + */ > + item = stailq_last_entry(rows, struct applier_tx_row, next); > + rcb->replica_id = replica_id; > + rcb->txn_last_tm = item->row.tm; > + > + trigger_create(on_wal_write, applier_txn_wal_write_cb, rcb, NULL); > txn_on_wal_write(txn, on_wal_write); > } > > @@ -948,7 +1013,7 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers) > > /** A simpler version of applier_apply_tx() for final join stage. */ > static int > -apply_final_join_tx(struct stailq *rows) > +apply_final_join_tx(uint32_t replica_id, struct stailq *rows) > { > struct xrow_header *first_row = > &stailq_first_entry(rows, struct applier_tx_row, next)->row; > @@ -959,9 +1024,9 @@ apply_final_join_tx(struct stailq *rows) > vclock_follow_xrow(&replicaset.vclock, last_row); > if (unlikely(iproto_type_is_synchro_request(first_row->type))) { > assert(first_row == last_row); > - rc = apply_synchro_row(first_row); > + rc = apply_synchro_row(replica_id, first_row); > } else { > - rc = apply_plain_tx(rows, false, false); > + rc = apply_plain_tx(replica_id, rows, false, false); > } > fiber_gc(); > return rc; > @@ -1090,12 +1155,14 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) > * each other. > */ > assert(first_row == last_row); > - if ((rc = apply_synchro_row(first_row)) != 0) > - goto finish; > - } else if ((rc = apply_plain_tx(rows, replication_skip_conflict, > - true)) != 0) { > - goto finish; > + rc = apply_synchro_row(applier->instance_id, first_row); > + } else { > + rc = apply_plain_tx(applier->instance_id, rows, > + replication_skip_conflict, true); > } > + if (rc != 0) > + goto finish; > + > vclock_follow(&replicaset.applier.vclock, last_row->replica_id, > last_row->lsn); > finish: > diff --git a/src/box/replication.cc b/src/box/replication.cc > index 903390686..a0b3e0186 100644 > --- a/src/box/replication.cc > +++ b/src/box/replication.cc > @@ -184,6 +184,7 @@ replica_new(void) > trigger_create(&replica->on_applier_state, > replica_on_applier_state_f, NULL, NULL); > replica->applier_sync_state = APPLIER_DISCONNECTED; > + replica->applier_txn_last_tm = 0; > latch_create(&replica->order_latch); > return replica; > } > diff --git a/src/box/replication.h b/src/box/replication.h > index 5cc380373..57e0f10ae 100644 > --- a/src/box/replication.h > +++ b/src/box/replication.h > @@ -331,6 +331,11 @@ struct replica { > * separate from applier. > */ > enum applier_state applier_sync_state; > + /** > + * Applier's last written to WAL transaction timestamp. > + * Needed for relay lagging statistics. > + */ > + double applier_txn_last_tm; > /* The latch is used to order replication requests. */ > struct latch order_latch; > }; -- Serge Petrenko