[tarantool-patches] [PATCH v4 6/9] wal: introduce a journal entry finalization callback
    Vladimir Davydov 
    vdavydov.dev at gmail.com
       
    Thu Jun 20 17:08:48 MSK 2019
    
    
  
On Thu, Jun 20, 2019 at 12:23:13AM +0300, Georgy Kirichenko wrote:
> Finalize a transaction thorough a journal entry callback. So transaction
> processing doesn't rely on fiber schedule. This also enforce transaction
> finalization order as triggers might fail.
> 
> Prerequisites: #1254
> ---
>  src/box/alter.cc  |   5 +++
>  src/box/box.cc    |   7 ++-
>  src/box/journal.c |  10 ++++-
>  src/box/journal.h |  12 ++++-
>  src/box/txn.c     | 112 ++++++++++++++++++++++++++++++----------------
>  src/box/vy_log.c  |   3 +-
>  src/box/wal.c     |  20 ++++++++-
>  7 files changed, 122 insertions(+), 47 deletions(-)
In general, looks fine to me. A few minor comments below.
> 
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index a37a68ce4..aa6a79264 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -3558,6 +3558,11 @@ unlock_after_dd(struct trigger *trigger, void *event)
>  {
>  	(void) trigger;
>  	(void) event;
> +	/*
> +	 * A trigger could be processed by the wal scheduler fiber
> +	 * so steal the latch first.
> +	 */
Not "could be", but "is processed", I guess.
> +	latch_steal(&schema_lock);
>  	latch_unlock(&schema_lock);
>  }
>  
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 5e5cd2b08..2994363ab 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -309,10 +309,13 @@ struct recovery_journal {
>   */
>  static int64_t
>  recovery_journal_write(struct journal *base,
> -		       struct journal_entry * /* entry */)
> +		       struct journal_entry *entry)
>  {
>  	struct recovery_journal *journal = (struct recovery_journal *) base;
> -	return vclock_sum(journal->vclock);
> +	entry->res = vclock_sum(journal->vclock);
> +	if (entry->on_done_cb)
> +		entry->on_done_cb(entry, entry->on_done_cb_data);
This can't be - on_done_cb must always be set.
Note, although vy_log uses journal_entry, they don't go through this
path.
> +	return entry->res;
Shouldn't it return 0? BTW it looks like you need to update the comment
to journal_write().
>  }
>  
>  static inline void
> diff --git a/src/box/journal.c b/src/box/journal.c
> index fe13fb6ee..eb0db9af2 100644
> --- a/src/box/journal.c
> +++ b/src/box/journal.c
> @@ -41,7 +41,9 @@ static int64_t
>  dummy_journal_write(struct journal *journal, struct journal_entry *entry)
>  {
>  	(void) journal;
> -	(void) entry;
> +	entry->res = 0;
> +	if (entry->on_done_cb)
> +		entry->on_done_cb(entry, entry->on_done_cb_data);
>  	return 0;
>  }
>  
> @@ -53,7 +55,9 @@ static struct journal dummy_journal = {
>  struct journal *current_journal = &dummy_journal;
>  
>  struct journal_entry *
> -journal_entry_new(size_t n_rows, struct region *region)
> +journal_entry_new(size_t n_rows, struct region *region,
> +		  void (*on_done_cb)(struct journal_entry *entry, void *data),
> +		  void *on_done_cb_data)
>  {
>  	struct journal_entry *entry;
>  
> @@ -70,6 +74,8 @@ journal_entry_new(size_t n_rows, struct region *region)
>  	entry->n_rows = n_rows;
>  	entry->res = -1;
>  	entry->fiber = fiber();
> +	entry->on_done_cb = on_done_cb;
> +	entry->on_done_cb_data = on_done_cb_data;
>  	return entry;
>  }
>  
> diff --git a/src/box/journal.h b/src/box/journal.h
> index 8ac32ee5e..b704b5c67 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -58,6 +58,14 @@ struct journal_entry {
>  	 * The fiber issuing the request.
>  	 */
>  	struct fiber *fiber;
> +	/**
> +	 * A journal entry completion callback.
> +	 */
I think we should elaborate the comment. When is this callback called?
Is it called on failure? Is it called from the same fiber or some other?
> +	void (*on_done_cb)(struct journal_entry *entry, void *data);
Let's please add a typedef for on_done_cb - you use its signature pretty
often.
> +	/**
> +	 * A journal entry completion callback argument.
> +	 */
> +	void *on_done_cb_data;
>  	/**
>  	 * Approximate size of this request when encoded.
>  	 */
> @@ -80,7 +88,9 @@ struct region;
>   * @return NULL if out of memory, fiber diagnostics area is set
>   */
>  struct journal_entry *
> -journal_entry_new(size_t n_rows, struct region *region);
> +journal_entry_new(size_t n_rows, struct region *region,
> +		  void (*on_done_cb)(struct journal_entry *entry, void *data),
> +		  void *on_done_cb_data);
>  
>  /**
>   * An API for an abstract journal for all transactions of this
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 21f7e98b4..52e16f3e6 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -337,6 +337,66 @@ fail:
>  	return -1;
>  }
>  
> +/**
> + * Complete transaction processing.
> + */
> +static void
> +txn_complete(struct txn *txn)
> +{
> +	if (txn->signature < 0) {
> +		if (txn->engine)
> +			engine_rollback(txn->engine, txn);
> +		/*
> +		 * Some of triggers require for in_txn variable is set so
> +		 * restore it for time a trigger is in progress.
> +		 */
> +		fiber_set_txn(fiber(), txn);
> +		/* Rollback triggers must not throw. */
> +		if (txn->has_triggers &&
> +		    trigger_run(&txn->on_rollback, txn) != 0) {
> +			diag_log();
> +			unreachable();
> +			panic("rollback trigger failed");
> +		}
> +		fiber_set_txn(fiber(), NULL);
> +
> +		return;
> +	}
> +	/*
> +	 * Engine can be NULL if transaction contains IPROTO_NOP
> +	 * statements only.
> +	 */
> +	if (txn->engine != NULL)
> +		engine_commit(txn->engine, txn);
> +	/*
> +	 * Some of triggers require for in_txn variable is set so
> +	 * restore it for time a trigger is in progress.
> +	 */
> +	fiber_set_txn(fiber(), txn);
copy-and-paste... May be, add a helper function for running triggers?
> +	/*
> +	 * The transaction is in the binary log. No action below
> +	 * may throw. In case an error has happened, there is
> +	 * no other option but terminate.
> +	 */
> +	if (txn->has_triggers &&
> +	    trigger_run(&txn->on_commit, txn) != 0) {
> +		diag_log();
> +		unreachable();
> +		panic("commit trigger failed");
> +	}
> +
> +	fiber_set_txn(fiber(), NULL);
> +}
> +
> +static void
> +txn_entry_done_cb(struct journal_entry *entry, void *data)
> +{
> +	struct txn *txn = (struct txn *)data;
> +	txn->signature = entry->res;
> +	txn_complete(txn);
> +}
> +
> +
>  static int64_t
>  txn_write_to_wal(struct txn *txn)
>  {
> @@ -418,31 +472,20 @@ txn_commit(struct txn *txn)
>  	}
>  	trigger_clear(&txn->fiber_on_stop);
>  
> +	fiber_set_txn(fiber(), NULL);
>  	if (txn->n_new_rows + txn->n_applier_rows > 0) {
>  		txn->signature = txn_write_to_wal(txn);
>  		if (txn->signature < 0)
>  			return -1;
> +	} else {
> +		/*
> +		 * However there is noting to write to wal a completion
s/noting/nothing
punctuation marks missing
> +		 * should be fired.
> +		 */
> +		txn->signature = 0;
> +		txn_complete(txn);
>  	}
> -	/*
> -	 * Engine can be NULL if transaction contains IPROTO_NOP
> -	 * statements only.
> -	 */
> -	if (txn->engine != NULL)
> -		engine_commit(txn->engine, txn);
> -	/*
> -	 * The transaction is in the binary log. No action below
> -	 * may throw. In case an error has happened, there is
> -	 * no other option but terminate.
> -	 */
> -	if (txn->has_triggers &&
> -	    trigger_run(&txn->on_commit, txn) != 0) {
> -		diag_log();
> -		unreachable();
> -		panic("commit trigger failed");
> -	}
> -
>  
> -	fiber_set_txn(fiber(), NULL);
>  	txn_free(txn);
>  	return 0;
>  fail:
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 71f6dbb5c..62b6391fd 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -263,6 +263,8 @@ tx_schedule_f(va_list ap)
>  			struct journal_entry *req =
>  				stailq_shift_entry(&writer->schedule_queue,
>  						   struct journal_entry, fifo);
> +			if (req->on_done_cb != NULL)
> +				req->on_done_cb(req, req->on_done_cb_data);
>  			fiber_wakeup(req->fiber);
>  		}
>  		writer->is_in_rollback = false;
> @@ -1158,7 +1160,12 @@ wal_write(struct journal *journal, struct journal_entry *entry)
>  {
>  	struct wal_writer *writer = (struct wal_writer *) journal;
>  
> -	ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
> +	ERROR_INJECT(ERRINJ_WAL_IO, {
> +		entry->res = -1;
> +		if (entry->on_done_cb != NULL)
> +			entry->on_done_cb(entry, entry->on_done_cb_data);
> +		return -1;
> +	});
>  
>  	if (writer->is_in_rollback) {
>  		/*
> @@ -1171,6 +1178,9 @@ wal_write(struct journal *journal, struct journal_entry *entry)
>  		say_error("Aborting transaction %llu during "
>  			  "cascading rollback",
>  			  vclock_sum(&writer->vclock));
> +		entry->res = -1;
> +		if (entry->on_done_cb != NULL)
> +			entry->on_done_cb(entry, entry->on_done_cb_data);
Could you please add 'goto fail' so as not to duplicate code.
I would also add a helper function journal_entry_complete() that would
call the callback - would look neater that way IMO.
>  		return -1;
>  	}
>  
> @@ -1185,6 +1195,9 @@ wal_write(struct journal *journal, struct journal_entry *entry)
>  		if (batch == NULL) {
>  			diag_set(OutOfMemory, sizeof(struct wal_msg),
>  				 "region", "struct wal_msg");
> +			entry->res = -1;
> +			if (entry->on_done_cb != NULL)
> +				entry->on_done_cb(entry, entry->on_done_cb_data);
>  			return -1;
>  		}
>  		wal_msg_create(batch);
> @@ -1222,7 +1235,10 @@ wal_write_in_wal_mode_none(struct journal *journal,
>  		       entry->rows + entry->n_rows);
>  	vclock_merge(&writer->vclock, &vclock_diff);
>  	vclock_copy(&replicaset.vclock, &writer->vclock);
> -	return vclock_sum(&writer->vclock);
> +	entry->res = vclock_sum(&writer->vclock);
> +	if (entry->on_done_cb)
> +		entry->on_done_cb(entry, entry->on_done_cb_data);
> +	return entry->res;
>  }
    
    
More information about the Tarantool-patches
mailing list