Tarantool development patches archive
 help / color / mirror / Atom feed
From: Cyrill Gorcunov via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: tml <tarantool-patches@dev.tarantool.org>
Cc: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Subject: [Tarantool-patches] [PATCH v8 1/2] applier: send transaction's first row WAL time in the applier_writer_f
Date: Mon,  7 Jun 2021 18:55:18 +0300	[thread overview]
Message-ID: <20210607155519.109626-2-gorcunov@gmail.com> (raw)
In-Reply-To: <20210607155519.109626-1-gorcunov@gmail.com>

Applier fiber sends current vclock of the node to remote relay reader,
pointing current state of fetched WAL data so the relay will know which
new data should be sent. The packet applier sends carries xrow_header::tm
field as a zero but we can reuse it to provide information about first
timestamp in a transaction we wrote to our WAL. Since old instances of
Tarantool simply ignore this field such extension won't cause any
problems.

The timestamp will be needed to account lag of downstream replicas
suitable for information purpose and cluster health monitoring.

We update applier statistics in WAL callbacks but since both
apply_synchro_row and apply_plain_tx are used not only in real data
application but in final join stage as well (in this stage we're not
writing the data yet) the apply_synchro_row is extended with replica_id
argument which is non zero when applier is subscribed.

The calculation of the downstream lag itself lag will be addressed
in next patch because sending the timestamp and its observation
are independent actions.

Part-of #5447

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/applier.cc     | 90 +++++++++++++++++++++++++++++++++++-------
 src/box/replication.cc |  1 +
 src/box/replication.h  |  5 +++
 3 files changed, 81 insertions(+), 15 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 33181fdbf..38695a54f 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -163,6 +163,9 @@ applier_writer_f(va_list ap)
 	struct ev_io io;
 	coio_create(&io, applier->io.fd);
 
+	/* ID is permanent while applier is alive */
+	uint32_t replica_id = applier->instance_id;
+
 	while (!fiber_is_cancelled()) {
 		/*
 		 * Tarantool >= 1.7.7 sends periodic heartbeat
@@ -193,6 +196,16 @@ applier_writer_f(va_list ap)
 			applier->has_acks_to_send = false;
 			struct xrow_header xrow;
 			xrow_encode_vclock(&xrow, &replicaset.vclock);
+			/*
+			 * For relay lag statistics we report last
+			 * written transaction timestamp in tm field.
+			 *
+			 * Replica might be dead already so we have to
+			 * test on each iteration.
+			 */
+			struct replica *r = replica_by_id(replica_id);
+			if (likely(r != NULL))
+				xrow.tm = r->applier_txn_start_tm;
 			coio_write_xrow(&io, &xrow);
 			ERROR_INJECT(ERRINJ_APPLIER_SLOW_ACK, {
 				fiber_sleep(0.01);
@@ -490,7 +503,7 @@ static uint64_t
 applier_read_tx(struct applier *applier, struct stailq *rows, double timeout);
 
 static int
-apply_final_join_tx(struct stailq *rows);
+apply_final_join_tx(uint32_t replica_id, struct stailq *rows);
 
 /**
  * A helper struct to link xrow objects in a list.
@@ -535,7 +548,7 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
 						  next)->row);
 			break;
 		}
-		if (apply_final_join_tx(&rows) != 0)
+		if (apply_final_join_tx(applier->instance_id, &rows) != 0)
 			diag_raise();
 	}
 
@@ -751,11 +764,35 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
 	return 0;
 }
 
+struct replica_cb_data {
+	/** Replica ID the data belongs to. */
+	uint32_t replica_id;
+	/**
+	 * Timestamp of a transaction to be accounted
+	 * for relay lag. Usually it is a first row in
+	 * a transaction.
+	 */
+	double txn_start_tm;
+};
+
+/** Update replica associated data once write is complete. */
+static void
+replica_txn_wal_write_cb(struct replica_cb_data *rcb)
+{
+	struct replica *r = replica_by_id(rcb->replica_id);
+	if (likely(r != NULL))
+		r->applier_txn_start_tm = rcb->txn_start_tm;
+}
+
 static int
 applier_txn_wal_write_cb(struct trigger *trigger, void *event)
 {
-	(void) trigger;
 	(void) event;
+
+	struct replica_cb_data *rcb =
+		(struct replica_cb_data *)trigger->data;
+	replica_txn_wal_write_cb(rcb);
+
 	/* Broadcast the WAL write across all appliers. */
 	trigger_run(&replicaset.applier.on_wal_write, NULL);
 	return 0;
@@ -766,6 +803,8 @@ struct synchro_entry {
 	struct synchro_request *req;
 	/** Fiber created the entry. To wakeup when WAL write is done. */
 	struct fiber *owner;
+	/** Replica associated data. */
+	struct replica_cb_data *rcb;
 	/**
 	 * The base journal entry. It has unsized array and then must be the
 	 * last entry in the structure. But can workaround it via a union
@@ -789,6 +828,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
 	if (entry->res < 0) {
 		applier_rollback_by_wal_io();
 	} else {
+		replica_txn_wal_write_cb(synchro_entry->rcb);
 		txn_limbo_process(&txn_limbo, synchro_entry->req);
 		trigger_run(&replicaset.applier.on_wal_write, NULL);
 	}
@@ -797,7 +837,7 @@ apply_synchro_row_cb(struct journal_entry *entry)
 
 /** Process a synchro request. */
 static int
-apply_synchro_row(struct xrow_header *row)
+apply_synchro_row(uint32_t replica_id, struct xrow_header *row)
 {
 	assert(iproto_type_is_synchro_request(row->type));
 
@@ -805,6 +845,7 @@ apply_synchro_row(struct xrow_header *row)
 	if (xrow_decode_synchro(row, &req) != 0)
 		goto err;
 
+	struct replica_cb_data rcb_data;
 	struct synchro_entry entry;
 	/*
 	 * Rows array is cast from *[] to **, because otherwise g++ complains
@@ -817,6 +858,11 @@ apply_synchro_row(struct xrow_header *row)
 			     apply_synchro_row_cb, &entry);
 	entry.req = &req;
 	entry.owner = fiber();
+
+	rcb_data.replica_id = replica_id;
+	rcb_data.txn_start_tm = row->tm;
+	entry.rcb = &rcb_data;
+
 	/*
 	 * The WAL write is blocking. Otherwise it might happen that a CONFIRM
 	 * or ROLLBACK is sent to WAL, and it would empty the limbo, but before
@@ -862,8 +908,9 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
 	return box_raft_process(&req, applier->instance_id);
 }
 
-static inline int
-apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
+static int
+apply_plain_tx(uint32_t replica_id, struct stailq *rows,
+	       bool skip_conflict, bool use_triggers)
 {
 	/*
 	 * Explicitly begin the transaction so that we can
@@ -931,10 +978,21 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
 			goto fail;
 		}
 
+		struct replica_cb_data *rcb;
+		rcb = region_alloc_object(&txn->region, typeof(*rcb), &size);
+		if (rcb == NULL) {
+			diag_set(OutOfMemory, size, "region_alloc_object", "rcb");
+			goto fail;
+		}
+
 		trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
 		txn_on_rollback(txn, on_rollback);
 
-		trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
+		item = stailq_first_entry(rows, struct applier_tx_row, next);
+		rcb->replica_id = replica_id;
+		rcb->txn_start_tm = item->row.tm;
+
+		trigger_create(on_wal_write, applier_txn_wal_write_cb, rcb, NULL);
 		txn_on_wal_write(txn, on_wal_write);
 	}
 
@@ -946,7 +1004,7 @@ apply_plain_tx(struct stailq *rows, bool skip_conflict, bool use_triggers)
 
 /** A simpler version of applier_apply_tx() for final join stage. */
 static int
-apply_final_join_tx(struct stailq *rows)
+apply_final_join_tx(uint32_t replica_id, struct stailq *rows)
 {
 	struct xrow_header *first_row =
 		&stailq_first_entry(rows, struct applier_tx_row, next)->row;
@@ -957,9 +1015,9 @@ apply_final_join_tx(struct stailq *rows)
 	vclock_follow_xrow(&replicaset.vclock, last_row);
 	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
 		assert(first_row == last_row);
-		rc = apply_synchro_row(first_row);
+		rc = apply_synchro_row(replica_id, first_row);
 	} else {
-		rc = apply_plain_tx(rows, false, false);
+		rc = apply_plain_tx(replica_id, rows, false, false);
 	}
 	fiber_gc();
 	return rc;
@@ -1088,12 +1146,14 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 		 * each other.
 		 */
 		assert(first_row == last_row);
-		if ((rc = apply_synchro_row(first_row)) != 0)
-			goto finish;
-	} else if ((rc = apply_plain_tx(rows, replication_skip_conflict,
-					true)) != 0) {
-		goto finish;
+		rc = apply_synchro_row(applier->instance_id, first_row);
+	} else {
+		rc = apply_plain_tx(applier->instance_id, rows,
+				    replication_skip_conflict, true);
 	}
+	if (rc != 0)
+		goto finish;
+
 	vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
 		      last_row->lsn);
 finish:
diff --git a/src/box/replication.cc b/src/box/replication.cc
index aefb812b3..c97c1fc04 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -184,6 +184,7 @@ replica_new(void)
 	trigger_create(&replica->on_applier_state,
 		       replica_on_applier_state_f, NULL, NULL);
 	replica->applier_sync_state = APPLIER_DISCONNECTED;
+	replica->applier_txn_start_tm = 0;
 	latch_create(&replica->order_latch);
 	return replica;
 }
diff --git a/src/box/replication.h b/src/box/replication.h
index 2ad1cbf66..d9817d4ff 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -331,6 +331,11 @@ struct replica {
 	 * separate from applier.
 	 */
 	enum applier_state applier_sync_state;
+	/**
+	 * Applier's last written to WAL transaction timestamp.
+	 * Needed for relay lagging statistics.
+	 */
+	double applier_txn_start_tm;
 	/* The latch is used to order replication requests. */
 	struct latch order_latch;
 };
-- 
2.31.1


  reply	other threads:[~2021-06-07 15:55 UTC|newest]

Thread overview: 17+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-06-07 15:55 [Tarantool-patches] [PATCH v8 0/2] relay: provide downstream lag information Cyrill Gorcunov via Tarantool-patches
2021-06-07 15:55 ` Cyrill Gorcunov via Tarantool-patches [this message]
2021-06-07 19:20   ` [Tarantool-patches] [PATCH v8 1/2] applier: send transaction's first row WAL time in the applier_writer_f Vladislav Shpilevoy via Tarantool-patches
2021-06-15  9:36   ` Serge Petrenko via Tarantool-patches
2021-06-16 13:32     ` Cyrill Gorcunov via Tarantool-patches
2021-06-17  9:16       ` Serge Petrenko via Tarantool-patches
2021-06-07 15:55 ` [Tarantool-patches] [PATCH v8 2/2] relay: provide information about downstream lag Cyrill Gorcunov via Tarantool-patches
2021-06-07 19:21   ` Vladislav Shpilevoy via Tarantool-patches
2021-06-08  8:40     ` Cyrill Gorcunov via Tarantool-patches
2021-06-08  9:58       ` Cyrill Gorcunov via Tarantool-patches
2021-06-08 18:15       ` Vladislav Shpilevoy via Tarantool-patches
2021-06-15 10:03   ` Serge Petrenko via Tarantool-patches
2021-06-17  6:55     ` Cyrill Gorcunov via Tarantool-patches
2021-06-17  9:01       ` Serge Petrenko via Tarantool-patches
2021-06-17  9:58         ` Cyrill Gorcunov via Tarantool-patches
2021-06-07 19:20 ` [Tarantool-patches] [PATCH v8 0/2] relay: provide downstream lag information Vladislav Shpilevoy via Tarantool-patches
2021-06-07 20:00   ` Cyrill Gorcunov via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210607155519.109626-2-gorcunov@gmail.com \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=gorcunov@gmail.com \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH v8 1/2] applier: send transaction'\''s first row WAL time in the applier_writer_f' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox