[Tarantool-patches] [PATCH v3 11/13] txm: introduce txm_story

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Jul 17 01:25:58 MSK 2020


Thanks for the patch!

Here I continue the review I didn't finish yesterday.

The patch has changed on the branch. I took the version
I just fetched from it and pasted below among with my
comments inlined.

See 24 comments below. But tbh I have many many more. The patch
broke my brain. I fill physical pain in my head, and I didn't even
review it whole. This patch is really really hard to understad.

>     txm: introduce txm_story
>     
>     TXM story is a part of a history of a value in space.
>     It's a story about a tuple, from the point it was added to space
>     to the point when it was deleted from the space.
>     All stories are linked into a list of stories of the same key of
>     each index.
>     
>     Part of #4897
> 
> diff --git a/src/box/txn.c b/src/box/txn.c
> index bbea5e8ac..3d6180631 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -37,6 +37,28 @@
>  #include "xrow.h"
>  #include "errinj.h"
>  #include "iproto_constants.h"
> +#include "small/mempool.h"
> +
> +static uint32_t
> +txm_story_key_hash(const struct tuple *a)
> +{
> +	uintptr_t u = (uintptr_t)a;
> +	if (sizeof(uintptr_t) <= sizeof(uint32_t))
> +		return u;
> +	else
> +		return u ^ (u >> 32);
> +}
> +
> +#define mh_name _history
> +#define mh_key_t struct tuple *
> +#define mh_node_t struct txm_story *
> +#define mh_arg_t int
> +#define mh_hash(a, arg) (txm_story_key_hash((*(a))->tuple))
> +#define mh_hash_key(a, arg) (txm_story_key_hash(a))
> +#define mh_cmp(a, b, arg) ((*(a))->tuple != (*(b))->tuple)
> +#define mh_cmp_key(a, b, arg) ((a) != (*(b))->tuple)
> +#define MH_SOURCE
> +#include "salad/mhash.h"
>  
>  struct tx_manager
>  {
> @@ -48,6 +70,14 @@ struct tx_manager
>  	 * so the list is ordered by rv_psn.
>  	 */
>  	struct rlist read_view_txs;
> +	/** Mempools for tx_story objects with difference index count. */
> +	struct mempool txm_story_pool[BOX_INDEX_MAX];
> +	/** Hash table tuple -> txm_story of that tuple. */
> +	struct mh_history_t *history;
> +	/** List of all txm_story objects. */
> +	struct rlist all_stories;
> +	/** Iterator that sequentially traverses all txm_story objects. */
> +	struct rlist *traverse_all_stories;
>  };
>  
>  /** That's a definition, see declaration for description. */
> @@ -146,6 +176,9 @@ txn_stmt_new(struct region *region)
>  	stmt->space = NULL;
>  	stmt->old_tuple = NULL;
>  	stmt->new_tuple = NULL;
> +	stmt->add_story = NULL;
> +	stmt->del_story = NULL;
> +	stmt->next_in_del_list = NULL;
>  	stmt->engine_savepoint = NULL;
>  	stmt->row = NULL;
>  	stmt->has_triggers = false;
> @@ -1295,11 +1328,23 @@ void
>  tx_manager_init()
>  {
>  	rlist_create(&txm.read_view_txs);
> +	for (size_t i = 0; i < BOX_INDEX_MAX; i++) {
> +		size_t item_size = sizeof(struct txm_story) +
> +			i * sizeof(struct txm_story_link);
> +		mempool_create(&txm.txm_story_pool[i],
> +			       cord_slab_cache(), item_size);
> +	}
> +	txm.history = mh_history_new();
> +	rlist_create(&txm.all_stories);
> +	txm.traverse_all_stories = &txm.all_stories;
>  }
>  
>  void
>  tx_manager_free()
>  {
> +	mh_history_delete(txm.history);
> +	for (size_t i = 0; i < BOX_INDEX_MAX; i++)
> +		mempool_destroy(&txm.txm_story_pool[i]);
>  }
>  
>  int
> @@ -1346,3 +1391,759 @@ txm_cause_conflict(struct txn *breaker, struct txn *victim)
>  	rlist_add(&victim->conflicted_by_list, &tracker->in_conflicted_by_list);
>  	return 0;
>  }
> +
> +/**
> + * Creates new story and links it with the @tuple.
> + * @return story on success, NULL on error (diag is set).
> + */
> +static struct txm_story *
> +txm_story_new(struct space *space, struct tuple *tuple)
> +{
> +	assert(!tuple->is_dirty);
> +	uint32_t index_count = space->index_count;
> +	assert(index_count < BOX_INDEX_MAX);
> +	struct mempool *pool = &txm.txm_story_pool[index_count];
> +	struct txm_story *story = (struct txm_story *)mempool_alloc(pool);
> +	if (story == NULL) {
> +		size_t item_size = sizeof(struct txm_story) +
> +			index_count * sizeof(struct txm_story_link);
> +		diag_set(OutOfMemory, item_size, "tx_manager", "tx story");
> +		return story;
> +	}
> +	story->tuple = tuple;
> +
> +	const struct txm_story **put_story = (const struct txm_story **)&story;
> +	struct txm_story **empty = NULL;
> +	mh_int_t pos = mh_history_put(txm.history, put_story, &empty, 0);
> +	if (pos == mh_end(txm.history)) {
> +		mempool_free(pool, story);
> +		diag_set(OutOfMemory, pos + 1,
> +			 "tx_manager", "tx history hash table");
> +		return NULL;
> +	}
> +	tuple->is_dirty = true;
> +	tuple_ref(tuple);
> +
> +	story->index_count = index_count;
> +	story->add_stmt = NULL;
> +	story->add_psn = 0;
> +	story->del_stmt = NULL;
> +	story->del_psn = 0;
> +	rlist_create(&story->reader_list);
> +	rlist_add_tail(&txm.all_stories, &story->in_all_stories);
> +	memset(story->link, 0, sizeof(story->link[0]) * index_count);
> +	return story;
> +}
> +
> +static void
> +txm_story_delete(struct txm_story *story);
> +
> +/**
> + * Creates new story of a @tuple that was added by @stmt.
> + * @return story on success, NULL on error (diag is set).
> + */
> +static struct txm_story *
> +txm_story_new_add_stmt(struct tuple *tuple, struct txn_stmt *stmt)
> +{
> +	struct txm_story *res = txm_story_new(stmt->space, tuple);
> +	if (res == NULL)
> +		return NULL;
> +	res->add_stmt = stmt;
> +	assert(stmt->add_story == NULL);
> +	stmt->add_story = res;
> +	return res;
> +}
> +
> +/**
> + * Creates new story of a @tuple that was deleted by @stmt.
> + * @return story on success, NULL on error (diag is set).
> + */
> +static struct txm_story *
> +txm_story_new_del_stmt(struct tuple *tuple, struct txn_stmt *stmt)
> +{
> +	struct txm_story *res = txm_story_new(stmt->space, tuple);
> +	if (res == NULL)
> +		return NULL;
> +	res->del_stmt = stmt;
> +	assert(stmt->del_story == NULL);
> +	stmt->del_story = res;
> +	return res;
> +}
> +
> +/**
> + * Undo txm_story_new_add_stmt.
> + */
> +static void
> +txm_story_delete_add_stmt(struct txm_story *story)
> +{
> +	story->add_stmt->add_story = NULL;
> +	story->add_stmt = NULL;
> +	txm_story_delete(story);
> +}
> +
> +/**
> + * Undo txm_story_new_del_stmt.
> + */
> +static void
> +txm_story_delete_del_stmt(struct txm_story *story)
> +{
> +	story->del_stmt->del_story = NULL;
> +	story->del_stmt = NULL;
> +	txm_story_delete(story);
> +}
> +
> +
> +/**
> + * Find a story of a @tuple. The story expected to be present (assert).
> + */
> +static struct txm_story *
> +txm_story_get(struct tuple *tuple)
> +{
> +	assert(tuple->is_dirty);
> +
> +	mh_int_t pos = mh_history_find(txm.history, tuple, 0);
> +	assert(pos != mh_end(txm.history));
> +	return *mh_history_node(txm.history, pos);
> +}
> +
> +/**
> + * Get the older tuple, extracting it from older story if necessary.
> + */
> +static struct tuple *
> +txm_story_older_tuple(struct txm_story_link *link)
> +{
> +	return link->older.is_story ? link->older.story->tuple
> +				    : link->older.tuple;
> +}
> +
> +/**
> + * Link a @story with older story in @index (in both directions).
> + */
> +static void
> +txm_story_link_story(struct txm_story *story, struct txm_story *older_story,
> +		     uint32_t index)
> +{
> +	assert(older_story != NULL);
> +	struct txm_story_link *link = &story->link[index];
> +	/* Must be unlinked. */
> +	assert(!link->older.is_story);
> +	assert(link->older.tuple == NULL);
> +	link->older.is_story = true;
> +	link->older.story = older_story;
> +	older_story->link[index].newer_story = story;
> +}
> +
> +/**
> + * Link a @story with older tuple in @index. In case if the tuple is dirty -
> + * find and link with the corresponding story.
> + */
> +static void
> +txm_story_link_tuple(struct txm_story *story, struct tuple *older_tuple,
> +                     uint32_t index)
> +{
> +	struct txm_story_link *link = &story->link[index];
> +	/* Must be unlinked. */
> +	assert(!link->older.is_story);
> +	assert(link->older.tuple == NULL);
> +	if (older_tuple == NULL)
> +		return;
> +	if (older_tuple->is_dirty) {
> +		txm_story_link_story(story, txm_story_get(older_tuple), index);
> +		return;
> +	}
> +	link->older.tuple = older_tuple;
> +	tuple_ref(link->older.tuple);
> +}
> +
> +/**
> + * Unlink a @story with older story/tuple in @index.
> + */
> +static void
> +txm_story_unlink(struct txm_story *story, uint32_t index)
> +{
> +	struct txm_story_link *link = &story->link[index];
> +	if (link->older.is_story) {
> +		link->older.story->link[index].newer_story = NULL;
> +	} else if (link->older.tuple != NULL) {
> +		tuple_unref(link->older.tuple);
> +		link->older.tuple = NULL;
> +	}
> +	link->older.is_story = false;
> +	link->older.tuple = NULL;
> +}
> +
> +/**
> + * Check if a @story is visible for transaction @txn. Return visible tuple to
> + * @visible_tuple (can be set to NULL).
> + * @param is_prepared_ok - whether prepared (not committed) change is acceptable.
> + * @param own_change - return true if the change was made by @txn itself.
> + * @return true if the story is visible, false otherwise.
> + */
> +static bool
> +txm_story_is_visible(struct txm_story *story, struct txn *txn,
> +		     struct tuple **visible_tuple, bool is_prepared_ok,
> +		     bool *own_change)

1. The fact that this functions is called 'is_visible', but returns 3!!!
values, including one tuple, is very wrong. The name is simply misleading.
It does not just check something and returns bool. It looks for something,
does some heavy actions. 'is' definitely does not fit here.

> +{
> +	*own_change = false;
> +	*visible_tuple = NULL;
> +
> +	int64_t rv_psn = INT64_MAX;
> +	if (txn != NULL && txn->rv_psn != 0)
> +		rv_psn = txn->rv_psn;
> +
> +	struct txn_stmt *dels = story->del_stmt;
> +	while (dels != NULL) {
> +		if (dels->txn == txn) {
> +			/* Tuple is deleted by us (@txn). */
> +			*own_change = true;
> +			return true;
> +		}
> +		dels = dels->next_in_del_list;
> +	}
> +	if (is_prepared_ok && story->del_psn != 0 && story->del_psn < rv_psn) {
> +		/* Tuple is deleted by prepared TX. */

2. What if story->del_stmt is NULL? It would mean the transaction is already
committed. But still this condition won't pass. The one below will pass,
but looks strange. Like if there are transactions, which need *only* prepared,
but not committed tuples. I would propose this:

	if (story->del_psn != 0 && story->del_psn < rv_psn) {
		/* Check if already committed. */
		if (story->del_stmt == NULL)
			return true;
		/* The stmt is prepared, because psn is set, and the stmt is still here. */
		if (is_prepared_ok)
			return true;
	}

Also I don't really like that we need to check committed/prepared/in-progress
state by these indirect attributes. Would be better to have a state or flags in
the story object.

> +		return true;
> +	}
> +	if (story->del_psn != 0 && story->del_stmt == NULL &&
> +	    story->del_psn < rv_psn) {
> +		/* Tuple is deleted by committed TX. */
> +		return true;
> +	}
> +
> +	if (story->add_stmt != NULL && story->add_stmt->txn == txn) {
> +		/* Tuple is added by us (@txn). */
> +		*visible_tuple = story->tuple;
> +		*own_change = true;
> +		return true;
> +	}
> +	if (is_prepared_ok && story->add_psn != 0 && story->add_psn < rv_psn) {

3. The same problem as for the comment above.

These checks for del and add stmt could be generalized if you
would store psn and stmt as a struct. Like this:

	struct txm_story_stmt {
		int64_t psn;
		struct txn_stmt *txn_stmt;
	};

Then you add two of them to the story object:

	struct txm_story {
		struct txm_story_stmt add_stmt;
		struct txm_story_stmt del_stmt;
		...
	};

Then you can have functions with meaningful names to do the
checks you do here.

	static inline bool
	txm_story_stmt_is_prepared(struct txm_story_stmt *stmt)
	{
		return stmt->psn > 0 && stmt->txn_stmt != NULL;
	}

	static inline bool
	txm_story_stmt_is_committed(struct txm_story_stmt *stmt)
	{
		return stmt->psn > 0 && stmt->txn_stmt == NULL;
	}

And more. And use them like this:

	static bool
	txm_story_is_visible(...)
	{
		...
		if (txm_story_stmt_is_committed(&story->del_stmt))
			return true;
		if (txm_story_stmt_is_committed(&story->add_stmt))
			return true;
		if (!is_prepared_ok)
			return false;
		if (txm_story_stmt_is_prepared(&story->del_stmt))
			return true;
		if (txm_story_stmt_is_prepared(&story->add_stmt))
			return true;
		...
	}

> +		/* Tuple is added by another prepared TX. */
> +		*visible_tuple = story->tuple;
> +		return true;
> +	}
> +	if (story->add_psn != 0 && story->add_stmt == NULL &&
> +	    story->add_psn < rv_psn) {
> +		/* Tuple is added by committed TX. */
> +		*visible_tuple = story->tuple;
> +		return true;
> +	}
> +	if (story->add_psn == 0 && story->add_stmt == NULL) {
> +		/* added long time ago. */
> +		*visible_tuple = story->tuple;
> +		return true;
> +	}
> +	return false;
> +}
> +
> +/**
> + * Temporary (allocated on region) struct that stores a conflicting TX.
> + */
> +struct txn_conflict
> +{
> +	struct txn *other_txn;
> +	struct txn_conflict *next;
> +};
> +
> +/**
> + * Save @other_txn in list with head @coflicts_head. New list node is allocated
> + * on @region.
> + * @return 0 on success, -1 on memory error.
> + */
> +static int
> +txm_save_conflict(struct txn *other_txn, struct txn_conflict **coflicts_head,

4. coflicts_head -> conflicts_head. In the comment too.

> +		  struct region *region)
> +{
> +	size_t err_size;
> +	struct txn_conflict *next_conflict;
> +	next_conflict = region_alloc_object(region, struct txn_conflict,
> +					    &err_size);
> +	if (next_conflict == NULL) {
> +		diag_set(OutOfMemory, err_size, "txn_region", "txn conflict");
> +		return -1;
> +	}
> +	next_conflict->other_txn = other_txn;
> +	next_conflict->next = *coflicts_head;
> +	*coflicts_head = next_conflict;

5. Why can't you just return the new head? Or NULL in case of OOM. This
is how we usually do functions which allocate something.

> +	return 0;
> +}
> +
> +/**
> + * Scan a history starting by @stmt statement in @index for a visible tuple
> + * (prepared suits), returned via @visible_replaced.
> + * Collect a list of transactions that will abort current transaction if they
> + * are committed.
> + *
> + * @return 0 on success, -1 on memory error.
> + */
> +static int
> +txm_story_find_visible(struct txn_stmt *stmt, struct txm_story *story,

6. If a function is a method of a struct, it should take the struct in the
first argument.

> +		       uint32_t index, struct tuple **visible_replaced,

7. Why is it called visible_replaced instead of just 'result'? What 'replaced'
means?

> +		       struct txn_conflict **collected_conflicts,
> +		       struct region *region)
> +{
> +	while (true) {
> +		if (!story->link[index].older.is_story) {
> +			/*
> +			 * the tuple is so old that we doesn't

8. the -> The. doesn't -> don't.

> +			 * know its story.
> +			 */
> +			*visible_replaced = story->link[index].older.tuple;
> +			assert(*visible_replaced == NULL ||
> +			       !(*visible_replaced)->is_dirty);
> +			break;
> +		}
> +		story = story->link[index].older.story;
> +		bool unused;
> +		if (txm_story_is_visible(story, stmt->txn, visible_replaced,
> +					 true, &unused))
> +			break;
> +
> +		/*
> +		 * We skip the story but once the story is committed
> +		 * before out TX that may cause conflict.
> +		 * The conflict will be unavoidable if this statement
> +		 * relies on old_tuple. If not (it's a replace),
> +		 * the conflict will take place only for secondary
> +		 * index if the story will not be overwritten in primary
> +		 * index.
> +		 */
> +		bool cross_conflict = false;
> +		if (stmt->does_require_old_tuple) {
> +			cross_conflict = true;
> +		} else if (index != 0) {
> +			struct txm_story *look_up = story;
> +			cross_conflict = true;
> +			while (look_up->link[0].newer_story != NULL) {
> +				struct txm_story *over;
> +				over = look_up->link[0].newer_story;
> +				if (over->add_stmt->txn == stmt->txn) {
> +					cross_conflict = false;
> +					break;
> +				}
> +				look_up = over;
> +			}
> +		}
> +		if (cross_conflict) {
> +			if (txm_save_conflict(story->add_stmt->txn,
> +					      collected_conflicts,
> +					      region) != 0)
> +				return -1;
> +
> +		}
> +	}
> +	return 0;
> +}
> +
> +int
> +txm_history_add_stmt(struct txn_stmt *stmt, struct tuple *old_tuple,
> +		     struct tuple *new_tuple, enum dup_replace_mode mode,
> +		     struct tuple **result)
> +{
> +	assert(new_tuple != NULL || old_tuple != NULL);
> +	struct space *space = stmt->space;
> +	struct txm_story *add_story = NULL;
> +	uint32_t add_story_linked = 0;
> +	struct txm_story *del_story = NULL;
> +	bool del_story_created = false;
> +	struct region *region = &stmt->txn->region;
> +	size_t region_svp = region_used(region);
> +
> +	/*
> +	 * List of transactions that will conflict us once one of them
> +	 * become committed.
> +	 */
> +	struct txn_conflict *collected_conflicts = NULL;
> +
> +	/* Create add_story if necessary. */
> +	if (new_tuple != NULL) {
> +		add_story = txm_story_new_add_stmt(new_tuple, stmt);
> +		if (add_story == NULL)
> +			goto fail;
> +
> +		for (uint32_t i = 0; i < space->index_count; i++) {
> +			struct tuple *replaced;
> +			struct index *index = space->index[i];
> +			if (index_replace(index, NULL, new_tuple,
> +					  DUP_REPLACE_OR_INSERT,
> +					  &replaced) != 0)

9. Why does this function change indexes? Why txn.c accesses index API
directly in the first place? The function is called txm_history_add_stmt,
but actually it behaves like space:replace().

> +				goto fail;
> +			txm_story_link_tuple(add_story, replaced, i);
> +			add_story_linked++;
> +
> +			struct tuple *visible_replaced = NULL;
> +			if (txm_story_find_visible(stmt, add_story, i,
> +						   &visible_replaced,
> +						   &collected_conflicts,
> +						   region) != 0)
> +				goto fail;
> +
> +			uint32_t errcode;
> +			errcode = replace_check_dup(old_tuple, visible_replaced,
> +						    i == 0 ? mode : DUP_INSERT);
> +			if (errcode != 0) {
> +				struct space *sp = stmt->space;

10. You already have 'space' variable, it stores the same value.

> +				if (sp != NULL)
> +					diag_set(ClientError, errcode,
> +						 sp->index[i]->def->name,

11. You already have 'index' variable. You can use 'index->def->name'.

> +						 space_name(sp));
> +				goto fail;
> +			}
> +
> +			if (i == 0) {
> +				old_tuple = visible_replaced;
> +			}

12. This cycle's body remained a complete mistery to me, in general. Every
line raises questions.

> +		}
> +	}
> +
> +	/* Create del_story if necessary. */
> +	struct tuple *del_tuple = NULL;
> +	if (new_tuple != NULL) {

13. You have exactly the same condition 'new_tuple != NULL' a few lines above.
Why can't the code below be inside this diff?

> +		struct txm_story_link *link = &add_story->link[0];
> +		if (link->older.is_story) {
> +			del_story = link->older.story;
> +			del_tuple = del_story->tuple;
> +		} else {
> +			del_tuple = link->older.tuple;
> +		}
> +	} else {
> +		del_tuple = old_tuple;

14. I don't understand why del_tuple is not equal to old_tuple
always.

> +	}
> +	if (del_tuple != NULL && del_story == NULL) {
> +		if (del_tuple->is_dirty) {
> +			del_story = txm_story_get(del_tuple);
> +		} else {
> +			del_story = txm_story_new_del_stmt(del_tuple, stmt);
> +			if (del_story == NULL)
> +				goto fail;
> +			del_story_created = true;
> +		}
> +	}
> +	if (new_tuple != NULL && del_story_created) {
> +		for (uint32_t i = 0; i < add_story->index_count; i++) {

15. Why do you use sometimes story->index_count, sometimes space->index_count?
They should be equal here.

> +			struct txm_story_link *link = &add_story->link[i];
> +			if (link->older.is_story)
> +				continue;
> +			if (link->older.tuple == del_tuple) {
> +				txm_story_unlink(add_story, i);
> +				txm_story_link_story(add_story, del_story, i);
> +			}
> +		}
> +	}
> +	if (del_story != NULL && !del_story_created) {
> +		stmt->next_in_del_list = del_story->del_stmt;
> +		del_story->del_stmt = stmt;
> +		stmt->del_story = del_story;
> +	}
> +
> +	/* Purge found conflicts. */
> +	while (collected_conflicts != NULL) {
> +		if (txm_cause_conflict(collected_conflicts->other_txn,
> +				       stmt->txn) != 0)
> +			goto fail;
> +		collected_conflicts = collected_conflicts->next;

16. Why can't call txm_cause_conflict() right when a conflict is
discovered? Why do you need to collect all of them into a list
first?

> +	}
> +
> +	/*
> +	 * We now reference both new and old tuple because the stmt holds
> +	 * pointers to them.
> +	 */

17. Why do you need old and new tuple as separate arguments of this
function, if they are already stored in stmt? Also they were already
referenced when were added to stmt.

If you are talking about story->tuple, it is already referenced in
txm_story_new().

> +	if (stmt->new_tuple != NULL)
> +		tuple_ref(stmt->new_tuple);
> +	*result = old_tuple;

18. I don't see that old_tuple is added to stmt in all code paths.

> +	if (*result != NULL)
> +		tuple_ref(*result);
> +	return 0;
> +
> +fail:
> +	if (add_story != NULL) {
> +		while (add_story_linked > 0) {
> +			--add_story_linked;
> +			uint32_t i = add_story_linked;

19. This easily can be a for() loop. Why do you need while()?

> +
> +			struct index *index = space->index[i];
> +			struct txm_story_link *link = &add_story->link[i];
> +			struct tuple *was = txm_story_older_tuple(link);
> +			struct tuple *unused;
> +			if (index_replace(index, new_tuple, was,
> +					  DUP_INSERT,
> +					  &unused) != 0) {

20. What happens to 'unused' returned tuple? Who unrefs it when
it is removed from the primary index? The same for the similar
code in txm_history_rollback_stmt().

> +				diag_log();
> +				unreachable();
> +				panic("failed to rollback change");
> +			}
> +
> +			txm_story_unlink(stmt->add_story, i);
> +
> +		}
> +		txm_story_delete_add_stmt(stmt->add_story);
> +	}
> +
> +	if (del_story != NULL && del_story->del_stmt == stmt) {
> +		del_story->del_stmt = stmt->next_in_del_list;
> +		stmt->next_in_del_list = NULL;
> +	}
> +
> +	if (del_story_created)
> +		txm_story_delete_del_stmt(stmt->del_story);
> +	else
> +		stmt->del_story = NULL;
> +
> +	region_truncate(region, region_svp);
> +	return -1;
> +}
> +
> +void
> +txm_history_rollback_stmt(struct txn_stmt *stmt)
> +{
> +	if (stmt->add_story != NULL) {
> +		assert(stmt->add_story->tuple == stmt->new_tuple);
> +		struct txm_story *story = stmt->add_story;
> +
> +		for (uint32_t i = 0; i < story->index_count; i++) {
> +			struct txm_story_link *link = &story->link[i];
> +			if (link->newer_story == NULL) {
> +				struct tuple *unused;
> +				struct index *index = stmt->space->index[i];
> +				struct tuple *was = txm_story_older_tuple(link);
> +				if (index_replace(index, story->tuple, was,
> +						  DUP_INSERT, &unused) != 0) {
> +					diag_log();
> +					unreachable();
> +					panic("failed to rollback change");
> +				}
> +			} else {
> +				struct txm_story *newer = link->newer_story;
> +				assert(newer->link[i].older.is_story);
> +				assert(newer->link[i].older.story == story);
> +				txm_story_unlink(newer, i);
> +				if (link->older.is_story) {
> +					struct txm_story *to = link->older.story;
> +					txm_story_link_story(newer,to, i);

21. ',to,' -> ', to,'.

> +				} else {
> +					struct tuple *to = link->older.tuple;
> +					txm_story_link_tuple(newer, to, i);
> +				}
> +			}
> +			txm_story_unlink(story, i);
> +		}
> +		stmt->add_story->add_stmt = NULL;
> +		txm_story_delete(stmt->add_story);
> +		stmt->add_story = NULL;
> +		tuple_unref(stmt->new_tuple);

22. I don't understand why this function contols new_ and old_ tuple in
txn_stmt. They have their own destructor in txn_stmt_destroy(), their own
refs, lifetime. It does not look right that you break the encapsulation
here.

> +	}
> +
> +	if (stmt->del_story != NULL) {
> +		struct txm_story *story = stmt->del_story;
> +
> +		struct txn_stmt **prev = &story->del_stmt;
> +		while (*prev != stmt) {
> +			prev = &(*prev)->next_in_del_list;
> +			assert(*prev != NULL);
> +		}
> +		*prev = stmt->next_in_del_list;
> +		stmt->next_in_del_list = NULL;
> +
> +		stmt->del_story->del_stmt = NULL;
> +		stmt->del_story = NULL;
> +	}
> +}
> +
> +void
> +txm_history_prepare_stmt(struct txn_stmt *stmt)
> +{
> +	assert(stmt->txn->psn != 0);
> +
> +	/* Move story to the past to prepared stories. */

23. Couldn't parse the comment.

> +
> +	struct txm_story *story = stmt->add_story;
> +	uint32_t index_count = story == NULL ? 0 : story->index_count;
> +	/*
> +	 * Note that if stmt->add_story == NULL, the index_count is set to 0,
> +	 * and we will not enter the loop.
> +	 */
> +	for (uint32_t i = 0; i < index_count; ) {
> +		if (!story->link[i].older.is_story) {
> +			/* tuple is old. */

24. Don't understand. How does it follow from the condition? And
why the 'tuple is old' prevents from doing the things below?

> +			i++;
> +			continue;
> +		}
> +		struct txm_story *old_story =
> +			story->link[i].older.story;
> +		if (old_story->del_psn != 0) {
> +			/* if psn is set, the change is prepared. */
> +			i++;
> +			continue;
> +		}
> +		if (old_story->add_psn != 0) {
> +			/* if psn is set, the change is prepared. */
> +			i++;
> +			continue;
> +		}
> +
> +		if (old_story->add_stmt == NULL) {
> +			/* ancient. */
> +			i++;
> +			continue;
> +		}
> +		if (old_story->add_stmt->txn == stmt->txn) {
> +			/* added by us. */
> +			i++;
> +			continue;
> +		}
> +
> +		if (old_story->add_stmt->does_require_old_tuple || i != 0)
> +			old_story->add_stmt->txn->status = TXN_CONFLICTED;
> +
> +		/* Swap story and old story. */
> +		struct txm_story_link *link = &story->link[i];
> +		if (link->newer_story == NULL) {
> +			/* we have to replace the tuple in index. */
> +			struct tuple *unused;
> +			struct index *index = stmt->space->index[i];
> +			if (index_replace(index, story->tuple, old_story->tuple,
> +					  DUP_INSERT, &unused) != 0) {
> +				diag_log();
> +				panic("failed to rollback change");
> +			}
> +		} else {
> +			struct txm_story *newer = link->newer_story;
> +			assert(newer->link[i].older.is_story);
> +			assert(newer->link[i].older.story == story);
> +			txm_story_unlink(newer, i);
> +			txm_story_link_story(newer, old_story, i);
> +		}
> +
> +		txm_story_unlink(story, i);
> +		if (old_story->link[i].older.is_story) {
> +			struct txm_story *to =
> +				old_story->link[i].older.story;
> +			txm_story_unlink(old_story, i);
> +			txm_story_link_story(story, to, i);
> +		} else {
> +			struct tuple *to =
> +				old_story->link[i].older.tuple;
> +			txm_story_unlink(old_story, i);
> +			txm_story_link_tuple(story, to, i);
> +		}
> +
> +		txm_story_link_story(old_story, story, i);
> +
> +		if (i == 0) {
> +			assert(stmt->del_story == old_story);
> +			assert(story->link[0].older.is_story ||
> +			       story->link[0].older.tuple == NULL);
> +
> +			struct txn_stmt *dels = old_story->del_stmt;
> +			assert(dels != NULL);
> +			do {
> +				if (dels->txn != stmt->txn)
> +					dels->txn->status = TXN_CONFLICTED;
> +				dels->del_story = NULL;
> +				struct txn_stmt *next = dels->next_in_del_list;
> +				dels->next_in_del_list = NULL;
> +				dels = next;
> +			} while (dels != NULL);
> +			old_story->del_stmt = NULL;
> +
> +			if (story->link[0].older.is_story) {
> +				struct txm_story *oldest_story =
> +					story->link[0].older.story;
> +				dels = oldest_story->del_stmt;
> +				while (dels != NULL) {
> +					assert(dels->txn != stmt->txn);
> +					dels->del_story = NULL;
> +					struct txn_stmt *next =
> +						dels->next_in_del_list;
> +					dels->next_in_del_list = NULL;
> +					dels = next;
> +				}
> +				oldest_story->del_stmt = stmt;
> +				stmt->del_story = oldest_story;
> +			}
> +		}
> +	}
> +	if (stmt->add_story != NULL)
> +		stmt->add_story->add_psn = stmt->txn->psn;
> +
> +	if (stmt->del_story != NULL)
> +		stmt->del_story->del_psn = stmt->txn->psn;
> +}


More information about the Tarantool-patches mailing list