[tarantool-patches] [PATCH v4 6/9] wal: introduce a journal entry finalization callback

Георгий Кириченко georgy at tarantool.org
Thu Jun 20 23:22:14 MSK 2019


On Thursday, June 20, 2019 5:08:48 PM MSK Vladimir Davydov wrote:
> On Thu, Jun 20, 2019 at 12:23:13AM +0300, Georgy Kirichenko wrote:
> > Finalize a transaction thorough a journal entry callback. So transaction
> > processing doesn't rely on fiber schedule. This also enforce transaction
> > finalization order as triggers might fail.
> > 
> > Prerequisites: #1254
> > ---
> > 
> >  src/box/alter.cc  |   5 +++
> >  src/box/box.cc    |   7 ++-
> >  src/box/journal.c |  10 ++++-
> >  src/box/journal.h |  12 ++++-
> >  src/box/txn.c     | 112 ++++++++++++++++++++++++++++++----------------
> >  src/box/vy_log.c  |   3 +-
> >  src/box/wal.c     |  20 ++++++++-
> >  7 files changed, 122 insertions(+), 47 deletions(-)
> 
> In general, looks fine to me. A few minor comments below.
> 
> > diff --git a/src/box/alter.cc b/src/box/alter.cc
> > index a37a68ce4..aa6a79264 100644
> > --- a/src/box/alter.cc
> > +++ b/src/box/alter.cc
> > @@ -3558,6 +3558,11 @@ unlock_after_dd(struct trigger *trigger, void
> > *event)> 
> >  {
> >  
> >  	(void) trigger;
> >  	(void) event;
> > 
> > +	/*
> > +	 * A trigger could be processed by the wal scheduler fiber
> > +	 * so steal the latch first.
> > +	 */
> 
> Not "could be", but "is processed", I guess.
There are two cases: for non-yielding journal (wal none or recovery) it would 
be the same fiber but for yielding one - this would be a tx_prio callback.
> 
> > +	latch_steal(&schema_lock);
> > 
> >  	latch_unlock(&schema_lock);
> >  
> >  }
> > 
> > diff --git a/src/box/box.cc b/src/box/box.cc
> > index 5e5cd2b08..2994363ab 100644
> > --- a/src/box/box.cc
> > +++ b/src/box/box.cc
> > @@ -309,10 +309,13 @@ struct recovery_journal {
> > 
> >   */
> >  
> >  static int64_t
> >  recovery_journal_write(struct journal *base,
> > 
> > -		       struct journal_entry * /* entry */)
> > +		       struct journal_entry *entry)
> > 
> >  {
> >  
> >  	struct recovery_journal *journal = (struct recovery_journal *) base;
> > 
> > -	return vclock_sum(journal->vclock);
> > +	entry->res = vclock_sum(journal->vclock);
> > +	if (entry->on_done_cb)
> > +		entry->on_done_cb(entry, entry->on_done_cb_data);
> 
> This can't be - on_done_cb must always be set.
> 
> Note, although vy_log uses journal_entry, they don't go through this
> path.
Ok, so I will add an assert instead of the condition.
> 
> > +	return entry->res;
> 
> Shouldn't it return 0? BTW it looks like you need to update the comment
> to journal_write().
Accepted
> 
> >  }
> >  
> >  static inline void
> > 
> > diff --git a/src/box/journal.c b/src/box/journal.c
> > index fe13fb6ee..eb0db9af2 100644
> > --- a/src/box/journal.c
> > +++ b/src/box/journal.c
> > @@ -41,7 +41,9 @@ static int64_t
> > 
> >  dummy_journal_write(struct journal *journal, struct journal_entry *entry)
> >  {
> >  
> >  	(void) journal;
> > 
> > -	(void) entry;
> > +	entry->res = 0;
> > +	if (entry->on_done_cb)
> > +		entry->on_done_cb(entry, entry->on_done_cb_data);
> > 
> >  	return 0;
> >  
> >  }
> > 
> > @@ -53,7 +55,9 @@ static struct journal dummy_journal = {
> > 
> >  struct journal *current_journal = &dummy_journal;
> >  
> >  struct journal_entry *
> > 
> > -journal_entry_new(size_t n_rows, struct region *region)
> > +journal_entry_new(size_t n_rows, struct region *region,
> > +		  void (*on_done_cb)(struct journal_entry *entry, void *data),
> > +		  void *on_done_cb_data)
> > 
> >  {
> >  
> >  	struct journal_entry *entry;
> > 
> > @@ -70,6 +74,8 @@ journal_entry_new(size_t n_rows, struct region *region)
> > 
> >  	entry->n_rows = n_rows;
> >  	entry->res = -1;
> >  	entry->fiber = fiber();
> > 
> > +	entry->on_done_cb = on_done_cb;
> > +	entry->on_done_cb_data = on_done_cb_data;
> > 
> >  	return entry;
> >  
> >  }
> > 
> > diff --git a/src/box/journal.h b/src/box/journal.h
> > index 8ac32ee5e..b704b5c67 100644
> > --- a/src/box/journal.h
> > +++ b/src/box/journal.h
> > @@ -58,6 +58,14 @@ struct journal_entry {
> > 
> >  	 * The fiber issuing the request.
> >  	 */
> >  	
> >  	struct fiber *fiber;
> > 
> > +	/**
> > +	 * A journal entry completion callback.
> > +	 */
> 
> I think we should elaborate the comment. When is this callback called?
> Is it called on failure? Is it called from the same fiber or some other?
Ok
> 
> > +	void (*on_done_cb)(struct journal_entry *entry, void *data);
> 
> Let's please add a typedef for on_done_cb - you use its signature pretty
> often.
Ok
> 
> > +	/**
> > +	 * A journal entry completion callback argument.
> > +	 */
> > +	void *on_done_cb_data;
> > 
> >  	/**
> >  	
> >  	 * Approximate size of this request when encoded.
> >  	 */
> > 
> > @@ -80,7 +88,9 @@ struct region;
> > 
> >   * @return NULL if out of memory, fiber diagnostics area is set
> >   */
> >  
> >  struct journal_entry *
> > 
> > -journal_entry_new(size_t n_rows, struct region *region);
> > +journal_entry_new(size_t n_rows, struct region *region,
> > +		  void (*on_done_cb)(struct journal_entry *entry, void *data),
> > +		  void *on_done_cb_data);
> > 
> >  /**
> >  
> >   * An API for an abstract journal for all transactions of this
> > 
> > diff --git a/src/box/txn.c b/src/box/txn.c
> > index 21f7e98b4..52e16f3e6 100644
> > --- a/src/box/txn.c
> > +++ b/src/box/txn.c
> > 
> > @@ -337,6 +337,66 @@ fail:
> >  	return -1;
> >  
> >  }
> > 
> > +/**
> > + * Complete transaction processing.
> > + */
> > +static void
> > +txn_complete(struct txn *txn)
> > +{
> > +	if (txn->signature < 0) {
> > +		if (txn->engine)
> > +			engine_rollback(txn->engine, txn);
> > +		/*
> > +		 * Some of triggers require for in_txn variable is set so
> > +		 * restore it for time a trigger is in progress.
> > +		 */
> > +		fiber_set_txn(fiber(), txn);
> > +		/* Rollback triggers must not throw. */
> > +		if (txn->has_triggers &&
> > +		    trigger_run(&txn->on_rollback, txn) != 0) {
> > +			diag_log();
> > +			unreachable();
> > +			panic("rollback trigger failed");
> > +		}
> > +		fiber_set_txn(fiber(), NULL);
> > +
> > +		return;
> > +	}
> > +	/*
> > +	 * Engine can be NULL if transaction contains IPROTO_NOP
> > +	 * statements only.
> > +	 */
> > +	if (txn->engine != NULL)
> > +		engine_commit(txn->engine, txn);
> > +	/*
> > +	 * Some of triggers require for in_txn variable is set so
> > +	 * restore it for time a trigger is in progress.
> > +	 */
> > +	fiber_set_txn(fiber(), txn);
> 
> copy-and-paste... May be, add a helper function for running triggers?
Accepted
> 
> > +	/*
> > +	 * The transaction is in the binary log. No action below
> > +	 * may throw. In case an error has happened, there is
> > +	 * no other option but terminate.
> > +	 */
> > +	if (txn->has_triggers &&
> > +	    trigger_run(&txn->on_commit, txn) != 0) {
> > +		diag_log();
> > +		unreachable();
> > +		panic("commit trigger failed");
> > +	}
> > +
> > +	fiber_set_txn(fiber(), NULL);
> > +}
> > +
> > +static void
> > +txn_entry_done_cb(struct journal_entry *entry, void *data)
> > +{
> > +	struct txn *txn = (struct txn *)data;
> > +	txn->signature = entry->res;
> > +	txn_complete(txn);
> > +}
> > +
> > +
> > 
> >  static int64_t
> >  txn_write_to_wal(struct txn *txn)
> >  {
> > 
> > @@ -418,31 +472,20 @@ txn_commit(struct txn *txn)
> > 
> >  	}
> >  	trigger_clear(&txn->fiber_on_stop);
> > 
> > +	fiber_set_txn(fiber(), NULL);
> > 
> >  	if (txn->n_new_rows + txn->n_applier_rows > 0) {
> >  	
> >  		txn->signature = txn_write_to_wal(txn);
> >  		if (txn->signature < 0)
> >  		
> >  			return -1;
> > 
> > +	} else {
> > +		/*
> > +		 * However there is noting to write to wal a completion
> 
> s/noting/nothing
> 
> punctuation marks missing
> 
> > +		 * should be fired.
> > +		 */
> > +		txn->signature = 0;
> > +		txn_complete(txn);
> > 
> >  	}
> > 
> > -	/*
> > -	 * Engine can be NULL if transaction contains IPROTO_NOP
> > -	 * statements only.
> > -	 */
> > -	if (txn->engine != NULL)
> > -		engine_commit(txn->engine, txn);
> > -	/*
> > -	 * The transaction is in the binary log. No action below
> > -	 * may throw. In case an error has happened, there is
> > -	 * no other option but terminate.
> > -	 */
> > -	if (txn->has_triggers &&
> > -	    trigger_run(&txn->on_commit, txn) != 0) {
> > -		diag_log();
> > -		unreachable();
> > -		panic("commit trigger failed");
> > -	}
> > -
> > 
> > -	fiber_set_txn(fiber(), NULL);
> > 
> >  	txn_free(txn);
> >  	return 0;
> >  
> >  fail:
> > diff --git a/src/box/wal.c b/src/box/wal.c
> > index 71f6dbb5c..62b6391fd 100644
> > --- a/src/box/wal.c
> > +++ b/src/box/wal.c
> > @@ -263,6 +263,8 @@ tx_schedule_f(va_list ap)
> > 
> >  			struct journal_entry *req =
> >  			
> >  				stailq_shift_entry(&writer->schedule_queue,
> >  				
> >  						   struct journal_entry, fifo);
> > 
> > +			if (req->on_done_cb != NULL)
> > +				req->on_done_cb(req, req->on_done_cb_data);
> > 
> >  			fiber_wakeup(req->fiber);
> >  		
> >  		}
> >  		writer->is_in_rollback = false;
> > 
> > @@ -1158,7 +1160,12 @@ wal_write(struct journal *journal, struct
> > journal_entry *entry)> 
> >  {
> >  
> >  	struct wal_writer *writer = (struct wal_writer *) journal;
> > 
> > -	ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
> > +	ERROR_INJECT(ERRINJ_WAL_IO, {
> > +		entry->res = -1;
> > +		if (entry->on_done_cb != NULL)
> > +			entry->on_done_cb(entry, entry->on_done_cb_data);
> > +		return -1;
> > +	});
> > 
> >  	if (writer->is_in_rollback) {
> >  	
> >  		/*
> > 
> > @@ -1171,6 +1178,9 @@ wal_write(struct journal *journal, struct
> > journal_entry *entry)> 
> >  		say_error("Aborting transaction %llu during "
> >  		
> >  			  "cascading rollback",
> >  			  vclock_sum(&writer->vclock));
> > 
> > +		entry->res = -1;
> > +		if (entry->on_done_cb != NULL)
> > +			entry->on_done_cb(entry, entry->on_done_cb_data);
> 
> Could you please add 'goto fail' so as not to duplicate code.
> 
> I would also add a helper function journal_entry_complete() that would
> call the callback - would look neater that way IMO.
Accepted
> 
> >  		return -1;
> >  	
> >  	}
> > 
> > @@ -1185,6 +1195,9 @@ wal_write(struct journal *journal, struct
> > journal_entry *entry)> 
> >  		if (batch == NULL) {
> >  		
> >  			diag_set(OutOfMemory, sizeof(struct wal_msg),
> >  			
> >  				 "region", "struct wal_msg");
> > 
> > +			entry->res = -1;
> > +			if (entry->on_done_cb != NULL)
> > +				entry->on_done_cb(entry, entry->on_done_cb_data);
> > 
> >  			return -1;
> >  		
> >  		}
> >  		wal_msg_create(batch);
> > 
> > @@ -1222,7 +1235,10 @@ wal_write_in_wal_mode_none(struct journal *journal,
> > 
> >  		       entry->rows + entry->n_rows);
> >  	
> >  	vclock_merge(&writer->vclock, &vclock_diff);
> >  	vclock_copy(&replicaset.vclock, &writer->vclock);
> > 
> > -	return vclock_sum(&writer->vclock);
> > +	entry->res = vclock_sum(&writer->vclock);
> > +	if (entry->on_done_cb)
> > +		entry->on_done_cb(entry, entry->on_done_cb_data);
> > +	return entry->res;
> > 
> >  }

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part.
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20190620/d52e5fa3/attachment.sig>


More information about the Tarantool-patches mailing list