[Tarantool-patches] [PATCH 8/8] replication: write and read CONFIRM entries

Leonid Vasiliev lvasiliev at tarantool.org
Sat Jun 20 18:06:28 MSK 2020


Hi! Thank you for the patch.
LGTM.
All the following comments can be skipped silently.

On 09.06.2020 15:20, Serge Petrenko wrote:
> Make txn_limbo write a CONFIRM entry as soon as a batch of entries
> receive their acks. CONFIRM entry is written to WAL and later replicated
> to all the replicas.
> 
> Now replicas put synchronous transactions into txn_limbo and wait for
> corresponding confirmation entries to arrive and end up in their WAL
> before committing the transactions.
> 
> Part-of #4847
> ---
>   src/box/applier.cc    | 81 ++++++++++++++++++++++++++++++++++++++++++-
>   src/box/box.cc        |  3 ++
>   src/box/errcode.h     |  1 +
>   src/box/relay.cc      | 13 ++++---
>   src/box/txn.c         | 75 ++++++++++++++++++++++++++++++---------
>   src/box/txn.h         | 23 ++++++++++++
>   src/box/txn_limbo.c   | 79 ++++++++++++++++++++++++++++++++++++-----
>   src/box/txn_limbo.h   |  6 ++++
>   test/box/error.result |  1 +
>   9 files changed, 252 insertions(+), 30 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index df48b4796..1dc977424 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -51,6 +51,7 @@
>   #include "txn.h"
>   #include "box.h"
>   #include "scoped_guard.h"
> +#include "txn_limbo.h"
>   
>   STRS(applier_state, applier_STATE);
>   
> @@ -214,6 +215,11 @@ apply_snapshot_row(struct xrow_header *row)
>   	struct txn *txn = txn_begin();
>   	if (txn == NULL)
>   		return -1;
> +	/*
> +	 * Do not wait for confirmation when fetching a snapshot.
> +	 * Master only sends confirmed rows during join.
> +	 */
> +	txn_force_async(txn);
>   	if (txn_begin_stmt(txn, space) != 0)
>   		goto rollback;
>   	/* no access checks here - applier always works with admin privs */
> @@ -249,10 +255,73 @@ process_nop(struct request *request)
>   	return txn_commit_stmt(txn, request);
>   }
>   
> +/*
> + * An on_commit trigger set on a txn containing a CONFIRM entry.
> + * Confirms some of the txs waiting in txn_limbo.

In txn.h "txns" notation is used (up to you).

> + */
> +static int
> +applier_on_confirm(struct trigger *trig, void *data)
> +{
> +	(void) trig;
> +	int64_t lsn = *(int64_t *)data;
> +	txn_limbo_read_confirm(&txn_limbo, lsn);
> +	return 0;
> +}
> +
> +static int
> +process_confirm(struct request *request)
> +{
> +	assert(request->header->type = IPROTO_CONFIRM);
> +	uint32_t replica_id;
> +	struct txn *txn = in_txn();
> +	int64_t *lsn = (int64_t *) region_alloc(&txn->region, sizeof(int64_t));
> +	if (lsn == NULL) {
> +		diag_set(OutOfMemory, sizeof(int64_t), "region_alloc", "lsn");
> +		return -1;
> +	}
> +	if (xrow_decode_confirm(request->header, &replica_id, lsn) != 0)
> +		return -1;
> +	/*
> +	 * on_commit trigger failure is not allowed, so check for
> +	 * instance id early.
> +	 */
> +	if (replica_id != txn_limbo.instance_id) {
> +		diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
> +			 txn_limbo.instance_id);
> +		return -1;
> +	}
> +
> +	/*
> +	 * Set an on_commit trigger which will perform the actual
> +	 * confirmation processing.
> +	 */
> +	struct trigger *trig = (struct trigger *)region_alloc(&txn->region,
> +							      sizeof(*trig));
> +	if (trig == NULL) {
> +		diag_set(OutOfMemory, sizeof(*trig), "region_alloc", "trig");
> +		return -1;
> +	}
> +	trigger_create(trig, applier_on_confirm, lsn, NULL);
> +
> +	if (txn_begin_stmt(txn, NULL) != 0)
> +		return -1;
> +
> +	if (txn_commit_stmt(txn, request) == 0) {
> +		txn_on_commit(txn, trig);
> +		return 0;
> +	} else {
> +		return -1;
> +	}

Is I understood corectly that this is a trick like in the process_nop()
to promote vclock and ...? Maybe, add a coment?

> +}
> +
>   static int
>   apply_row(struct xrow_header *row)
>   {
>   	struct request request;
> +	if (row->type == IPROTO_CONFIRM) {
> +		request.header = row;
> +		return process_confirm(&request);
> +	}
>   	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
>   		return -1;
>   	if (request.type == IPROTO_NOP)
> @@ -273,6 +342,11 @@ apply_final_join_row(struct xrow_header *row)
>   	struct txn *txn = txn_begin();
>   	if (txn == NULL)
>   		return -1;
> +	/*
> +	 * Do not wait for confirmation while processing final
> +	 * join rows. See apply_snapshot_row().
> +	 */
> +	txn_force_async(txn);
>   	if (apply_row(row) != 0) {
>   		txn_rollback(txn);
>   		fiber_gc();
> @@ -492,7 +566,12 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>   		applier->last_row_time = ev_monotonic_now(loop());
>   		if (iproto_type_is_dml(row.type)) {
>   			vclock_follow_xrow(&replicaset.vclock, &row);
> -			if (apply_final_join_row(&row) != 0)
> +			/*
> +			 * Confirms are ignored during join. All the
> +			 * data master sends us is valid.
> +			 */
> +			if (row.type != IPROTO_CONFIRM &&
> +			    apply_final_join_row(&row) != 0)
>   				diag_raise();
>   			if (++row_count % 100000 == 0)
>   				say_info("%.1fM rows received", row_count / 1e6);
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 64ac89975..792c3c394 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -342,6 +342,9 @@ static void
>   apply_wal_row(struct xstream *stream, struct xrow_header *row)
>   {
>   	struct request request;
> +	// TODO: process confirmation during recovery.
> +	if (row->type == IPROTO_CONFIRM)
> +		return;
>   	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
>   	if (request.type != IPROTO_NOP) {
>   		struct space *space = space_cache_find_xc(request.space_id);
> diff --git a/src/box/errcode.h b/src/box/errcode.h
> index 019c582af..3ba6866e5 100644
> --- a/src/box/errcode.h
> +++ b/src/box/errcode.h
> @@ -267,6 +267,7 @@ struct errcode_record {
>   	/*212 */_(ER_SEQUENCE_NOT_STARTED,		"Sequence '%s' is not started") \
>   	/*213 */_(ER_NO_SUCH_SESSION_SETTING,	"Session setting %s doesn't exist") \
>   	/*214 */_(ER_UNCOMMITTED_FOREIGN_SYNC_TXNS, "Found uncommitted sync transactions from other instance with id %u") \
> +	/*215 */_(ER_SYNC_MASTER_MISMATCH,	"CONFIRM message arrived for an unknown master id %d, expected %d") \
>   
>   /*
>    * !IMPORTANT! Please follow instructions at start of the file
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 333e91ea9..4df3c2f26 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -402,10 +402,14 @@ tx_status_update(struct cmsg *msg)
>   	vclock_copy(&status->relay->tx.vclock, &status->vclock);
>   	/*
>   	 * Let pending synchronous transactions know, which of
> -	 * them were successfully sent to the replica.
> +	 * them were successfully sent to the replica. Acks are
> +	 * collected only on the master. Other instances wait for
> +	 * master's CONFIRM message instead.
>   	 */
> -	txn_limbo_ack(&txn_limbo, status->relay->replica->id,
> -		     vclock_get(&status->vclock, instance_id));
> +	if (txn_limbo.instance_id == instance_id) {
> +		txn_limbo_ack(&txn_limbo, status->relay->replica->id,
> +			      vclock_get(&status->vclock, instance_id));
> +	}
>   	static const struct cmsg_hop route[] = {
>   		{relay_status_update, NULL}
>   	};
> @@ -766,7 +770,8 @@ static void
>   relay_send_row(struct xstream *stream, struct xrow_header *packet)
>   {
>   	struct relay *relay = container_of(stream, struct relay, stream);
> -	assert(iproto_type_is_dml(packet->type));
> +	assert(iproto_type_is_dml(packet->type) ||
> +	       packet->type == IPROTO_CONFIRM);
>   	if (packet->group_id == GROUP_LOCAL) {
>   		/*
>   		 * We do not relay replica-local rows to other
> diff --git a/src/box/txn.c b/src/box/txn.c
> index a65100b31..3b331fecc 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -36,6 +36,7 @@
>   #include <fiber.h>
>   #include "xrow.h"
>   #include "errinj.h"
> +#include "iproto_constants.h"
>   
>   double too_long_threshold;
>   
> @@ -81,7 +82,12 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
>   	 */
>   	struct space *space = stmt->space;
>   	row->group_id = space != NULL ? space_group_id(space) : 0;
> -	row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
> +	/*
> +	 * IPROTO_CONFIRM entries are supplementary and aren't
> +	 * valid dml requests. They're encoded manually.
> +	 */
> +	if (likely(row->type != IPROTO_CONFIRM))
> +		row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
>   	if (row->bodycnt < 0)
>   		return -1;
>   	stmt->row = row;
> @@ -321,8 +327,10 @@ txn_commit_stmt(struct txn *txn, struct request *request)
>   	 */
>   	struct txn_stmt *stmt = txn_current_stmt(txn);
>   
> -	/* Create WAL record for the write requests in non-temporary spaces.
> -	 * stmt->space can be NULL for IRPOTO_NOP.
> +	/*
> +	 * Create WAL record for the write requests in
> +	 * non-temporary spaces. stmt->space can be NULL for
> +	 * IRPOTO_NOP or IPROTO_CONFIRM.
>   	 */
>   	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
>   		if (txn_add_redo(txn, stmt, request) != 0)
> @@ -417,12 +425,12 @@ txn_run_rollback_triggers(struct txn *txn, struct rlist *triggers)
>   /**
>    * Complete transaction processing.
>    */
> -static void
> +void
>   txn_complete(struct txn *txn)
>   {
>   	/*
>   	 * Note, engine can be NULL if transaction contains
> -	 * IPROTO_NOP statements only.
> +	 * IPROTO_NOP or IPROTO_CONFIRM statements.
>   	 */
>   	if (txn->signature < 0) {
>   		/* Undo the transaction. */
> @@ -510,13 +518,6 @@ txn_journal_entry_new(struct txn *txn)
>   	struct xrow_header **remote_row = req->rows;
>   	struct xrow_header **local_row = req->rows + txn->n_applier_rows;
>   	bool is_sync = false;
> -	/*
> -	 * Only local transactions, originated from the master,
> -	 * can enter 'waiting for acks' state. It means, only
> -	 * author of the transaction can collect acks. Replicas
> -	 * consider it a normal async transaction so far.
> -	 */
> -	bool is_local = true;
>   
>   	stailq_foreach_entry(stmt, &txn->stmts, next) {
>   		if (stmt->has_triggers) {
> @@ -530,17 +531,18 @@ txn_journal_entry_new(struct txn *txn)
>   		if (stmt->row == NULL)
>   			continue;
>   
> -		if (stmt->row->replica_id == 0) {
> +		if (stmt->row->replica_id == 0)
>   			*local_row++ = stmt->row;
> -		} else {
> +		else
>   			*remote_row++ = stmt->row;
> -			is_local = false;
> -		}
>   
>   		req->approx_len += xrow_approx_len(stmt->row);
>   	}
> -	if (is_sync && is_local)
> +
> +	is_sync = is_sync && !txn_has_flag(txn, TXN_FORCE_ASYNC);
> +	if (is_sync) {
>   		txn_set_flag(txn, TXN_WAIT_ACK);
> +	}
>   
>   	assert(remote_row == req->rows + txn->n_applier_rows);
>   	assert(local_row == remote_row + txn->n_new_rows);
> @@ -601,6 +603,19 @@ txn_commit_nop(struct txn *txn)
>   	return false;
>   }
>   
> +/*
> + * A trigger called on tx rollback due to a failed WAL write,
> + * when tx is waiting for confirmation.
> + */
> +static int
> +txn_limbo_on_rollback(struct trigger *trig, void *data)
> +{
> +	(void) trig;
> +	struct txn_limbo_entry *entry = (struct txn_limbo_entry *) data;
> +	txn_limbo_abort(&txn_limbo, entry);
> +	return 0;
> +}
> +
>   int
>   txn_commit_async(struct txn *txn)
>   {
> @@ -632,16 +647,42 @@ txn_commit_async(struct txn *txn)
>   		return -1;
>   	}
>   
> +	bool is_sync = txn_has_flag(txn, TXN_WAIT_ACK);
> +	struct txn_limbo_entry *limbo_entry;
> +	if (is_sync) {
> +		/* See txn_commit(). */
> +		uint32_t origin_id = req->rows[0]->replica_id;
> +		int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn;
> +		limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
> +		if (limbo_entry == NULL) {
> +			txn_rollback(txn);
> +			txn_free(txn);
> +			return -1;
> +		}
> +		assert(lsn > 0);
> +		txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
> +	}
> +
>   	fiber_set_txn(fiber(), NULL);
>   	if (journal_write_async(req) != 0) {
>   		fiber_set_txn(fiber(), txn);
>   		txn_rollback(txn);
> +		txn_limbo_abort(&txn_limbo, limbo_entry);
>   
>   		diag_set(ClientError, ER_WAL_IO);
>   		diag_log();
>   		return -1;
>   	}
>   
> +	/*
> +	 * Set a trigger to abort waiting for confirm on WAL write
> +	 * failure.
> +	 */
> +	if (is_sync) {
> +		struct trigger trig;
> +		trigger_create(&trig, txn_limbo_on_rollback, limbo_entry, NULL);
> +		txn_on_rollback(txn, &trig);
> +	}
>   	return 0;
>   }
>   
> diff --git a/src/box/txn.h b/src/box/txn.h
> index 232cc07a8..e7705bb48 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -73,6 +73,13 @@ enum txn_flag {
>   	 * then finishes commit and returns success to a user.
>   	 */
>   	TXN_WAIT_ACK,
> +	/**
> +	 * A transaction mustn't wait for confirmation, even if it
> +	 * touches synchronous spaces. Needed for join stage on
> +	 * replica, when all the data coming from the master is
> +	 * already confirmed by design.
> +	 */
> +	TXN_FORCE_ASYNC,
>   };
>   
>   enum {
> @@ -257,6 +264,16 @@ txn_clear_flag(struct txn *txn, enum txn_flag flag)
>   	txn->flags &= ~(1 << flag);
>   }
>   
> +/**
> + * Force async mode for transaction. It won't wait for acks
> + * or confirmation.
> + */
> +static inline void
> +txn_force_async(struct txn *txn)
> +{
> +	txn_set_flag(txn, TXN_FORCE_ASYNC);
> +}
> +
>   /* Pointer to the current transaction (if any) */
>   static inline struct txn *
>   in_txn(void)
> @@ -278,6 +295,12 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
>   struct txn *
>   txn_begin(void);
>   
> +/**
> + * Complete transaction processing.
> + */
> +void
> +txn_complete(struct txn *txn);
> +
>   /**
>    * Commit a transaction.
>    * @pre txn == in_txn()
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index efb97a591..daec98317 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -128,12 +128,65 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>   		fiber_yield();
>   	fiber_set_cancellable(cancellable);
>   	// TODO: implement rollback.
> -	// TODO: implement confirm.
>   	assert(!entry->is_rollback);
> +	assert(entry->is_commit);
>   	txn_limbo_remove(limbo, entry);
>   	txn_clear_flag(txn, TXN_WAIT_ACK);
>   }
>   
> +/**
> + * Write a confirmation entry to WAL. After it's written all the
> + * transactions waiting for confirmation may be finished.
> + */
> +static int
> +txn_limbo_write_confirm(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> +{
> +	/* Prepare a confirm entry. */
> +	struct xrow_header row = {0};
> +	struct request request = {0};
> +	request.header = &row;
> +
> +	row.bodycnt = xrow_encode_confirm(&row, limbo->instance_id, entry->lsn);
> +	if (row.bodycnt < 0)
> +		return -1;
> +
> +	struct txn *txn = txn_begin();
> +	if (txn == NULL)
> +		return -1;
> +
> +	if (txn_begin_stmt(txn, NULL) != 0)
> +		goto rollback;
> +	if (txn_commit_stmt(txn, &request) != 0)
> +		goto rollback;
> +
> +	return txn_commit(txn);
> +rollback:
> +	txn_rollback(txn);
> +	return -1;
> +}
> +
> +void
> +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
> +{
> +	assert(limbo->instance_id != REPLICA_ID_NIL &&
> +	       limbo->instance_id != instance_id);
> +	struct txn_limbo_entry *e, *tmp;
> +	rlist_foreach_entry_safe(e, &limbo->queue, in_queue, tmp) {
> +		if (e->lsn > lsn)
> +			break;
> +		assert(e->txn->fiber == NULL);
> +		e->is_commit = true;
> +		txn_limbo_remove(limbo, e);
> +		txn_clear_flag(e->txn, TXN_WAIT_ACK);
> +		/*
> +		 * txn_complete_async must've been called already,
> +		 * since CONFIRM always follows the tx in question.
> +		 * So, finish this tx processing right away.
> +		 */
> +		txn_complete(e->txn);
> +	}
> +}
> +
>   void
>   txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
>   {
> @@ -143,23 +196,33 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn)
>   	int64_t prev_lsn = vclock_get(&limbo->vclock, replica_id);
>   	vclock_follow(&limbo->vclock, replica_id, lsn);
>   	struct txn_limbo_entry *e;
> +	struct txn_limbo_entry *last_quorum = NULL;
>   	rlist_foreach_entry(e, &limbo->queue, in_queue) {
>   		if (e->lsn <= prev_lsn)
>   			continue;
>   		if (e->lsn > lsn)
>   			break;
>   		if (++e->ack_count >= replication_sync_quorum) {
> -			// TODO: better call complete() right
> -			// here. Appliers use async transactions,
> -			// and their txns don't have fibers to
> -			// wake up. That becomes actual, when
> -			// appliers will be supposed to wait for
> -			// 'confirm' message.
>   			e->is_commit = true;
> -			fiber_wakeup(e->txn->fiber);
> +			last_quorum = e;
>   		}
>   		assert(e->ack_count <= VCLOCK_MAX);
>   	}
> +	if (last_quorum != NULL) {
> +		if (txn_limbo_write_confirm(limbo, last_quorum) != 0) {
> +			// TODO: rollback.
> +			return;
> +		}
> +		/*
> +		 * Wakeup all the entries in direct order as soon
> +		 * as confirmation message is written to WAL.
> +		 */
> +		rlist_foreach_entry(e, &limbo->queue, in_queue) {
> +			fiber_wakeup(e->txn->fiber);
> +			if (e == last_quorum)
> +				break;
> +		}
> +	}
>   }
>   
>   void
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index 1ad1c567a..de415cd97 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -160,6 +160,12 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
>   void
>   txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
>   
> +/**
> + * Confirm all the entries up to the given master's LSN.
> + */
> +void
> +txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
> +
>   void
>   txn_limbo_init();
>   
> diff --git a/test/box/error.result b/test/box/error.result
> index 69c471085..34ded3930 100644
> --- a/test/box/error.result
> +++ b/test/box/error.result
> @@ -433,6 +433,7 @@ t;
>    |   212: box.error.SEQUENCE_NOT_STARTED
>    |   213: box.error.NO_SUCH_SESSION_SETTING
>    |   214: box.error.UNCOMMITTED_FOREIGN_SYNC_TXNS
> + |   215: box.error.SYNC_MASTER_MISMATCH
>    | ...
>   
>   test_run:cmd("setopt delimiter ''");
> 


More information about the Tarantool-patches mailing list