[Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sat Aug 15 18:06:05 MSK 2020


Thanks for the patch!

See 10 comments below.

On 14.08.2020 23:14, Cyrill Gorcunov wrote:
> Transaction processing code is very heavy simply because
> trasactions are carrying various data and involves a number
> of other mechanisms to procceed.

1. trasactions -> transactions.
   procceed -> proceed.

> In turn, when we receive confirm or rollback packed from
> another node in a cluster we just need to inspect limbo
> queue and write this packed into a WAL journal. So calling
> a bunch of txn engine helpers is simply waste of cycles.
> 
> Thus lets rather handle them in a special light way:
> 
>  - allocate synchro_entry structure which would carry
>    the journal entry itself and encoded message
>  - process limbo queue to mark confirmed/rollback'ed
>    messages
>  - finally write this synchro_entry into a journal
> 
> Which is a way more simplier.

2. 'more simplier' -> simpler. Otherwise looks like 'более лучше'.

> Part-of #5129
> 
> Suggedsted-by: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
> Signed-off-by: Cyrill Gorcunov <gorcunov at gmail.com>
> ---
>  src/box/applier.cc | 179 +++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 172 insertions(+), 7 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index a71516282..a1ce7a23f 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -841,6 +843,151 @@ applier_unlock(struct latch *latch)
>  	latch_unlock(latch);
>  }
>  
> +struct synchro_entry {
> +	/** An applier initiated the syncho request. */
> +	struct applier *applier;

3. Actually 'applier' is not needed. I looked around and realized it is never
used. I even dropped it and nothing changed.

> +
> +	/** Encoded form of a synchro record. */
> +	struct synchro_body_bin	body_bin;
> +
> +	/** xrow to write, used by the journal engine. */
> +	struct xrow_header row;
> +
> +	/**
> +	 * The journal entry itself. Note since
> +	 * it has unsized array it must be the
> +	 * last entry in the structure.
> +	 */
> +	struct journal_entry journal_entry;
> +};
> +
> +static void
> +synchro_entry_delete(struct synchro_entry *entry)
> +{
> +	free(entry);
> +}
> +
> +/**
> + * Async write journal completion.
> + */
> +static void
> +apply_synchro_row_cb(struct journal_entry *entry)
> +{
> +	assert(entry->complete_data != NULL);
> +	struct synchro_entry *synchro_entry =
> +		(struct synchro_entry *)entry->complete_data;
> +	struct applier *applier = synchro_entry->applier;
> +
> +	/*
> +	 * We can reuse triggers, they are allocated when
> +	 * applier get subscribed and since packets handling
> +	 * is processed after the subscribtion phase the triggers
> +	 * will be alive.

4. subscribtion -> subscription. Also I don't think I understood
the comment.

> +	 */
> +	if (entry->res < 0) {
> +		trigger_run(&replicaset.applier.on_rollback, applier);
> +		/*
> +		 * Restore the last written vlock value.
> +		 */
> +		vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
> +		diag_set(ClientError, ER_WAL_IO);
> +		diag_log();

5. Error should be set before running on_rollback, and it should be installed
into replicaset.applier.diag.

> +	} else {
> +		trigger_run(&replicaset.applier.on_wal_write, applier);
> +	}
> +
> +	synchro_entry_delete(synchro_entry);
> +}
> +
> +/**
> + * Allocate a new synchro_entry to be passed to
> + * the journal engine in async write way.
> + */
> +static struct synchro_entry *
> +synchro_entry_new(struct applier *applier,
> +		  struct xrow_header *applier_row,
> +		  struct synchro_request *req)
> +{
> +	struct synchro_entry *entry;
> +	size_t size = sizeof(*entry) + sizeof(struct xrow_header *);

6. Why don't you just add 'struct xrow_header*[1]' to the end of
struct synchro_entry? There is no a case, when the entry is needed
without the xrow_header pointer in the end.

> +
> +	/*
> +	 * For simplicity we use malloc here but
> +	 * probably should provide some cache similar
> +	 * to txn cache.
> +	 */
> +	entry = (struct synchro_entry *)malloc(size);
> +	if (entry == NULL) {
> +		diag_set(OutOfMemory, size, "malloc", "synchro_entry");
> +		return NULL;
> +	}
> +
> +	struct journal_entry *journal_entry = &entry->journal_entry;
> +	struct synchro_body_bin *body_bin = &entry->body_bin;
> +	struct xrow_header *row = &entry->row;
> +
> +	entry->applier = applier;
> +	journal_entry->rows[0] = row;
> +
> +	xrow_encode_synchro(row, body_bin, req);
> +
> +	row->lsn = applier_row->lsn;
> +	row->replica_id = applier_row->replica_id;
> +
> +	journal_entry_create(journal_entry, 1, xrow_approx_len(row),
> +			     apply_synchro_row_cb, entry);
> +	return entry;
> +}
> +
> +/*
> + * Process a synchro request from incoming applier packet
> + * without using txn engine, for a speed sake.

7. It is not about speed. Txn module is fast, it is one of the hottest and
most optimized places in the whole code base. And this is exactly why synchro
requests *should not* be there - they slow down and complicate txn, not vice
versa.

> + */
> +static int
> +apply_synchro_row(struct applier *applier, struct xrow_header *row)
> +{
> +	assert(iproto_type_is_synchro_request(row->type));
> +
> +	struct latch *latch = applier_lock(row->replica_id);
> +	if (vclock_get(&replicaset.applier.vclock,
> +		       row->replica_id) >= row->lsn) {
> +		applier_unlock(latch);
> +		return 0;
> +	}
> +
> +	struct synchro_request req;
> +	if (xrow_decode_synchro(row, &req) != 0)
> +		goto out;
> +
> +	if (txn_limbo_process(&txn_limbo, &req))
> +		goto out;
> +
> +	struct synchro_entry *entry;
> +	entry = synchro_entry_new(applier, row, &req);
> +	if (entry == NULL)
> +		goto out;
> +
> +	if (journal_write_async(&entry->journal_entry) != 0) {
> +		diag_set(ClientError, ER_WAL_IO);
> +		goto out;
> +	}
> +
> +	/*
> +	 * In case if something get wrong the journal completion
> +	 * handler will set the applier's vclock back to last
> +	 * successfully WAL written value.
> +	 */
> +	vclock_follow(&replicaset.applier.vclock,
> +		      row->replica_id, row->lsn);
> +	applier_unlock(latch);

8. Code duplication is too big. And I wouldn't mind if it was just applier locks,
but vclock propagation and rollback is not that simple.

I think we should do all that inside {applier_apply_tx()}. Because technically you
apply tx - the synchro row is stored inside {struct stailq *rows} which is the tx.

I moved it into {applier_apply_tx()}, and the code became smaller, simpler, with
less duplication, and even less diff. It also allows to drop the commits 5/8 and
6/8. Take a look and lets discuss.

> +	return 0;
> +
> +out:
> +	diag_log();
> +	applier_unlock(latch);
> +	return -1;
> +}
> +
>  /**
>   * Apply all rows in the rows queue as a single transaction.
>   *
> @@ -1148,15 +1301,27 @@ applier_subscribe(struct applier *applier)
>  		applier_read_tx(applier, &rows);
>  
>  		applier->last_row_time = ev_monotonic_now(loop());
> +		struct xrow_header *row = applier_first_row(&rows);
>  
> -		/*
> -		 * In case of an heartbeat message wake a writer up
> -		 * and check applier state.
> -		 */
> -		if (applier_first_row(&rows)->lsn == 0)
> +		if (row->lsn == 0) {
> +			/*
> +			 * In case of an heartbeat message
> +			 * wake a writer up and check
> +			 * the applier state.
> +			 */
>  			applier_signal_ack(applier);
> -		else if (applier_apply_tx(&rows) != 0)
> +		} else if (iproto_type_is_synchro_request(row->type)) {
> +			/*
> +			 * Make sure synchro messages are never reached
> +			 * in a batch (this is by design for simplicity
> +			 * sake).

9. It is not about simplicity. It is about being not necessary. Transactions
exist for DML and DDL (which is also DML on system spaces) only. For other
WAL writes transactions in their common sense don't exist. So each row is a
'transaction'. In future we may want to change that and, for example,
incorporate several synchro requests into a 'tx' (don't know why would we
need that, but it is technically possible).

> +			 */
> +			assert(stailq_first(&rows) == stailq_last(&rows));
> +			if (apply_synchro_row(applier, row) != 0)
> +				diag_raise();
> +		} else if (applier_apply_tx(&rows) != 0) {
>  			diag_raise();
> +		}
>  
>  		if (ibuf_used(ibuf) == 0)
>  			ibuf_reset(ibuf);
> 

10. Consider my changes on top of this commit on your branch. Below I paste my
diff squashed into your commit (on the branch they are not squashed).

====================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index a71516282..dfa62b72a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -51,8 +51,10 @@
 #include "schema.h"
 #include "txn.h"
 #include "box.h"
+#include "xrow.h"
 #include "scoped_guard.h"
 #include "txn_limbo.h"
+#include "journal.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -772,19 +774,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
 	} while (!applier_last_row(rows)->is_commit);
 }
 
-static int
-applier_txn_rollback_cb(struct trigger *trigger, void *event)
+static void
+applier_rollback_by_wal_io(void)
 {
-	(void) trigger;
-	struct txn *txn = (struct txn *) event;
-	/*
-	 * Synchronous transaction rollback due to receiving a
-	 * ROLLBACK entry is a normal event and requires no
-	 * special handling.
-	 */
-	if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK)
-		return 0;
-
 	/*
 	 * Setup shared applier diagnostic area.
 	 *
@@ -793,19 +785,32 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
 	 * diag use per-applier diag instead all the time
 	 * (which actually already present in the structure).
 	 *
-	 * But remember that transactions are asynchronous
-	 * and rollback may happen a way latter after it
-	 * passed to the journal engine.
+	 * But remember that WAL writes are asynchronous and
+	 * rollback may happen a way later after it was passed to
+	 * the journal engine.
 	 */
 	diag_set(ClientError, ER_WAL_IO);
 	diag_set_error(&replicaset.applier.diag,
 		       diag_last_error(diag_get()));
 
-	/* Broadcast the rollback event across all appliers. */
-	trigger_run(&replicaset.applier.on_rollback, event);
-
+	/* Broadcast the rollback across all appliers. */
+	trigger_run(&replicaset.applier.on_rollback, NULL);
 	/* Rollback applier vclock to the committed one. */
 	vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+}
+
+static int
+applier_txn_rollback_cb(struct trigger *trigger, void *event)
+{
+	(void) trigger;
+	struct txn *txn = (struct txn *) event;
+	/*
+	 * Synchronous transaction rollback due to receiving a
+	 * ROLLBACK entry is a normal event and requires no
+	 * special handling.
+	 */
+	if (txn->signature != TXN_SIGNATURE_SYNC_ROLLBACK)
+		applier_rollback_by_wal_io();
 	return 0;
 }
 
@@ -841,6 +846,110 @@ applier_unlock(struct latch *latch)
 	latch_unlock(latch);
 }
 
+struct synchro_entry {
+	/** Encoded form of a synchro record. */
+	struct synchro_body_bin	body_bin;
+
+	/** xrow to write, used by the journal engine. */
+	struct xrow_header row;
+
+	/**
+	 * The journal entry itself. Note since
+	 * it has unsized array it must be the
+	 * last entry in the structure.
+	 */
+	struct journal_entry journal_entry;
+};
+
+static void
+synchro_entry_delete(struct synchro_entry *entry)
+{
+	free(entry);
+}
+
+/**
+ * Async write journal completion.
+ */
+static void
+apply_synchro_row_cb(struct journal_entry *entry)
+{
+	assert(entry->complete_data != NULL);
+	struct synchro_entry *synchro_entry =
+		(struct synchro_entry *)entry->complete_data;
+	if (entry->res < 0)
+		applier_rollback_by_wal_io();
+	else
+		trigger_run(&replicaset.applier.on_wal_write, NULL);
+
+	synchro_entry_delete(synchro_entry);
+}
+
+/**
+ * Allocate a new synchro_entry to be passed to
+ * the journal engine in async write way.
+ */
+static struct synchro_entry *
+synchro_entry_new(struct xrow_header *applier_row,
+		  struct synchro_request *req)
+{
+	struct synchro_entry *entry;
+	size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
+
+	/*
+	 * For simplicity we use malloc here but
+	 * probably should provide some cache similar
+	 * to txn cache.
+	 */
+	entry = (struct synchro_entry *)malloc(size);
+	if (entry == NULL) {
+		diag_set(OutOfMemory, size, "malloc", "synchro_entry");
+		return NULL;
+	}
+
+	struct journal_entry *journal_entry = &entry->journal_entry;
+	struct synchro_body_bin *body_bin = &entry->body_bin;
+	struct xrow_header *row = &entry->row;
+
+	journal_entry->rows[0] = row;
+
+	xrow_encode_synchro(row, body_bin, req);
+
+	row->lsn = applier_row->lsn;
+	row->replica_id = applier_row->replica_id;
+
+	journal_entry_create(journal_entry, 1, xrow_approx_len(row),
+			     apply_synchro_row_cb, entry);
+	return entry;
+}
+
+/** Process a synchro request. */
+static int
+apply_synchro_row(struct xrow_header *row)
+{
+	assert(iproto_type_is_synchro_request(row->type));
+
+	struct synchro_request req;
+	if (xrow_decode_synchro(row, &req) != 0)
+		goto err;
+
+	if (txn_limbo_process(&txn_limbo, &req))
+		goto err;
+
+	struct synchro_entry *entry;
+	entry = synchro_entry_new(row, &req);
+	if (entry == NULL)
+		goto err;
+
+	if (journal_write_async(&entry->journal_entry) != 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		goto err;
+	}
+	return 0;
+err:
+	diag_log();
+	return -1;
+}
+
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
@@ -876,13 +985,26 @@ applier_apply_tx(struct stailq *rows)
 		}
 	}
 
+	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
+		/*
+		 * Synchro messages are not transactions, in terms
+		 * of DML. Always sent and written isolated from
+		 * each other.
+		 */
+		assert(first_row == last_row);
+		if (apply_synchro_row(first_row) != 0)
+			diag_raise();
+		goto success;
+	}
+
 	/**
 	 * Explicitly begin the transaction so that we can
 	 * control fiber->gc life cycle and, in case of apply
 	 * conflict safely access failed xrow object and allocate
 	 * IPROTO_NOP on gc.
 	 */
-	struct txn *txn = txn_begin();
+	struct txn *txn;
+	txn = txn_begin();
 	struct applier_tx_row *item;
 	if (txn == NULL) {
 		applier_unlock(latch);
@@ -951,6 +1073,7 @@ applier_apply_tx(struct stailq *rows)
 	if (txn_commit_async(txn) < 0)
 		goto fail;
 
+success:
 	/*
 	 * The transaction was sent to journal so promote vclock.
 	 *
@@ -1118,7 +1241,13 @@ applier_subscribe(struct applier *applier)
 
 	applier->lag = TIMEOUT_INFINITY;
 
-	/* Register triggers to handle WAL writes and rollbacks. */
+	/*
+	 * Register triggers to handle WAL writes and rollbacks.
+	 *
+	 * Note we use them for syncronous packets handling as well
+	 * thus when changing make sure that synchro handling won't
+	 * be broken.
+	 */
 	struct trigger on_wal_write;
 	trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);
 	trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);



More information about the Tarantool-patches mailing list