[Tarantool-patches] [PATCH v4 07/12] txm: introduce memtx_story

Nikita Pettik korablev at tarantool.org
Tue Sep 15 17:33:57 MSK 2020


On 08 Sep 13:22, Aleksandr Lyapunov wrote:
> Memtx story is a part of a history of a value in space.
> It's a story about a tuple, from the point it was added to space
> to the point when it was deleted from the space.
> All stories are linked into a list of stories of the same key of
> each index.
> 
> Part of #4897
> ---
> +/**
> + * Temporary (allocated on region) struct that stores a conflicting TX.
> + */
> +struct memtx_tx_conflict
> +{
> +	/* The thansaction that will conflict us upon commit. */

-> transaction

> +}
> +
> +/**
> + * Scan a history starting by @stmt statement in @index for a visible tuple
> + * (prepared suits), returned via @visible_replaced.
> + * Collect a list of transactions that will abort current transaction if they
> + * are committed.
> + *
> + * @return 0 on success, -1 on memory error.
> + */
> +static int
> +memtx_tx_story_find_visible(struct memtx_story *story, struct txn_stmt *stmt,

-> ..find_visible_tuple?

> +			    uint32_t index, struct tuple **visible_replaced,
> +			    struct memtx_tx_conflict **collected_conflicts,
> +			    struct region *region)
> +{
> +	while (true) {
> +		if (!story->link[index].older.is_story) {
> +			/* The tuple is so old that we don't know its story. */
> +			*visible_replaced = story->link[index].older.tuple;
> +			assert(*visible_replaced == NULL ||
> +			       !(*visible_replaced)->is_dirty);
> +			break;
> +		}
> +		story = story->link[index].older.story;
> +		bool unused;
> +		if (memtx_tx_story_is_visible(story, stmt->txn,
> +					      visible_replaced, true, &unused))
> +			break;
> +
> +int
> +memtx_tx_history_add_stmt(struct txn_stmt *stmt, struct tuple *old_tuple,
> +			  struct tuple *new_tuple, enum dup_replace_mode mode,
> +			  struct tuple **result)
> +{
> +	assert(new_tuple != NULL || old_tuple != NULL);
> +	struct space *space = stmt->space;
> +	struct memtx_story *add_story = NULL;
> +	uint32_t add_story_linked = 0;
> +	struct memtx_story *del_story = NULL;
> +	bool del_story_created = false;
> +	struct region *region = &stmt->txn->region;
> +	size_t region_svp = region_used(region);
> +
> +	/*
> +	 * List of transactions that will conflict us once one of them
> +	 * become committed.
> +	 */
> +	struct memtx_tx_conflict *collected_conflicts = NULL;
> +
> +	/* Create add_story if necessary. */
> +	if (new_tuple != NULL) {
> +		add_story = memtx_tx_story_new_add_stmt(new_tuple, stmt);
> +		if (add_story == NULL)
> +			goto fail;
> +
> +		for (uint32_t i = 0; i < space->index_count; i++) {
> +			struct tuple *replaced;
> +			struct index *index = space->index[i];
> +			if (index_replace(index, NULL, new_tuple,
> +					  DUP_REPLACE_OR_INSERT,
> +					  &replaced) != 0)
> +				goto fail;
> +			memtx_tx_story_link_tuple(add_story, replaced, i);
> +			add_story_linked++;
> +
> +			struct tuple *visible_replaced = NULL;
> +			if (memtx_tx_story_find_visible(add_story, stmt, i,
> +							&visible_replaced,
> +							&collected_conflicts,
> +							region) != 0)
> +				goto fail;
> +
> +			uint32_t errcode;
> +			errcode = replace_check_dup(old_tuple, visible_replaced,
> +						    i == 0 ? mode : DUP_INSERT);
> +			if (errcode != 0) {
> +				if (space != NULL)

Why space can be null, if we anyway dereference it to get index_count?

> +					diag_set(ClientError, errcode,
> +						 index->def->name,
> +						 space_name(space));
> +				goto fail;
> +			}
> +
> +			if (i == 0)
> +				old_tuple = visible_replaced;
> +		}
> +void
> +memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
> +{
> +	if (stmt->add_story != NULL) {
> +		assert(stmt->add_story->tuple == stmt->new_tuple);
> +		struct memtx_story *story = stmt->add_story;
> +
> +		for (uint32_t i = 0; i < story->index_count; i++) {
> +			struct memtx_story_link *link = &story->link[i];
> +			if (link->newer_story == NULL) {
> +				struct tuple *unused;
> +				struct index *index = stmt->space->index[i];
> +				struct tuple *was = memtx_tx_story_older_tuple(link);
> +				if (index_replace(index, story->tuple, was,
> +						  DUP_INSERT, &unused) != 0) {
> +					diag_log();
> +					unreachable();
> +					panic("failed to rollback change");
> +				}
> +			} else {
> +				struct memtx_story *newer = link->newer_story;
> +				assert(newer->link[i].older.is_story);
> +				assert(newer->link[i].older.story == story);
> +				memtx_tx_story_unlink(newer, i);
> +				if (link->older.is_story) {
> +					struct memtx_story *to = link->older.story;
> +					memtx_tx_story_link_story(newer, to, i);
> +				} else {
> +					struct tuple *to = link->older.tuple;
> +					memtx_tx_story_link_tuple(newer, to, i);
> +				}
> +			}
> +			memtx_tx_story_unlink(story, i);
> +		}
> +		stmt->add_story->add_stmt = NULL;
> +		memtx_tx_story_delete(stmt->add_story);
> +		stmt->add_story = NULL;
> +		tuple_unref(stmt->new_tuple);
> +	}
> +
> +	if (stmt->del_story != NULL) {
> +		struct memtx_story *story = stmt->del_story;
> +
> +		struct txn_stmt **prev = &story->del_stmt;
> +		while (*prev != stmt) {
> +			prev = &(*prev)->next_in_del_list;
> +			assert(*prev != NULL);
> +		}

Some parts of this function are not covered by tests. For instance:

diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c
index 1b546378a..e2c612a95 100644
--- a/src/box/memtx_tx.c
+++ b/src/box/memtx_tx.c
@@ -793,6 +793,7 @@ memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
                                        panic("failed to rollback change");
                                }
                        } else {
+                               assert(0);
                                struct memtx_story *newer = link->newer_story;
                                assert(newer->link[i].older.is_story);
                                assert(newer->link[i].older.story == story);
@@ -818,6 +819,7 @@ memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
 
                struct txn_stmt **prev = &story->del_stmt;
                while (*prev != stmt) {
+                       assert(0);
                        prev = &(*prev)->next_in_del_list;
                        assert(*prev != NULL);
                }

And tests are passing.

> +		*prev = stmt->next_in_del_list;
> +		stmt->next_in_del_list = NULL;
> +
> +		stmt->del_story->del_stmt = NULL;
> +		stmt->del_story = NULL;
> +	}
> +}
> +
> +void
> +memtx_tx_history_prepare_stmt(struct txn_stmt *stmt)
> +{
> +	assert(stmt->txn->psn != 0);
> +
> +	/* Move story to the past to prepared stories. */
> +
> +	struct memtx_story *story = stmt->add_story;
> +	uint32_t index_count = story == NULL ? 0 : story->index_count;
> +	/*
> +	 * Note that if stmt->add_story == NULL, the index_count is set to 0,
> +	 * and we will not enter the loop.
> +	 */
> +	for (uint32_t i = 0; i < index_count; ) {
> +		if (!story->link[i].older.is_story) {
> +			/* tuple is old. */
> +			i++;
> +			continue;
> +		}
> +		bool old_story_is_prepared = false;
> +		struct memtx_story *old_story = story->link[i].older.story;
> +		if (old_story->del_psn != 0) {
> +			/* if psn is set, the change is prepared. */
> +			old_story_is_prepared = true;
> +		} else if (old_story->add_psn != 0) {
> +			/* if psn is set, the change is prepared. */
> +			old_story_is_prepared = true;
> +		} else if (old_story->add_stmt == NULL) {
> +			/* ancient. */
> +			old_story_is_prepared = true;
> +		} else if (old_story->add_stmt->txn == stmt->txn) {
> +			/* added by us. */
> +		}
> +
> +		if (old_story_is_prepared) {
> +			struct tx_read_tracker *tracker;
> +			rlist_foreach_entry(tracker, &old_story->reader_list,
> +					    in_reader_list) {
> +				if (tracker->reader == stmt->txn)
> +					continue;
> +				if (tracker->reader->status != TXN_INPROGRESS)
> +					continue;
> +				memtx_tx_handle_conflict(stmt->txn,
> +							 tracker->reader);
> +			}
> +			i++;
> +			continue;
> +		}
> +
> +		if (old_story->add_stmt->does_require_old_tuple || i != 0)
> +			old_story->add_stmt->txn->status = TXN_CONFLICTED;

This function still seems a bit hard to understand. Mb it is worth adding
more comments?

> +		/* Swap story and old story. */
> +		struct memtx_story_link *link = &story->link[i];
> +		if (link->newer_story == NULL) {
> +			/* we have to replace the tuple in index. */
> +			struct tuple *unused;
> +			struct index *index = stmt->space->index[i];
> +			if (index_replace(index, story->tuple, old_story->tuple,
> +					  DUP_INSERT, &unused) != 0) {
> +				diag_log();
> +				panic("failed to rollback change");
> +			}
> +		} else {
> +			struct memtx_story *newer = link->newer_story;
> +			assert(newer->link[i].older.is_story);
> +			assert(newer->link[i].older.story == story);
> +			memtx_tx_story_unlink(newer, i);
> +			memtx_tx_story_link_story(newer, old_story, i);
> +		}
> +struct tuple *
> +memtx_tx_tuple_clarify_slow(struct txn *txn, struct space *space,
> +			    struct tuple *tuple, uint32_t index,
> +			    uint32_t mk_index, bool is_prepared_ok)
> +{
> +	assert(tuple->is_dirty);
> +	struct memtx_story *story = memtx_tx_story_get(tuple);
> +	bool own_change = false;
> +	struct tuple *result = NULL;
> +
> +	while (true) {
> +		if (memtx_tx_story_is_visible(story, txn, &result,
> +					      is_prepared_ok, &own_change)) {
> +			break;
> +		}
> +		if (story->link[index].older.is_story) {
> +			story = story->link[index].older.story;
> +		} else {
> +			result = story->link[index].older.tuple;
> +			break;
> +		}
> +	}
> +	if (!own_change)
> +		memtx_tx_track_read(txn, space, tuple);
> +	(void)mk_index; /* TODO: multiindex */

Panic/assert/diag in case of multikey indexes?

> +	return result;
> +}
> +
> +
> +int
> +memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple)
> +{
> +	if (tuple == NULL)
> +		return 0;
> +	if (txn == NULL)
> +		return 0;
> +	if (space == NULL)
> +		return 0;

Space seems to be always != NULL. If so, let's replace it with an assertion.


> +	struct memtx_story *story;
> +	struct tx_read_tracker *tracker = NULL;
> +
> +	if (!tuple->is_dirty) {
> +		story = memtx_tx_story_new(space, tuple);
> +		if (story == NULL)
> +			return -1;
> +		size_t sz;
> +		tracker = region_alloc_object(&txn->region,
> +					      struct tx_read_tracker, &sz);
> +		if (tracker == NULL) {
> +			diag_set(OutOfMemory, sz, "tx region", "read_tracker");
> +			memtx_tx_story_delete(story);
> +			return -1;
> +		}
> +		tracker->reader = txn;
> +		tracker->story = story;
> +		rlist_add(&story->reader_list, &tracker->in_reader_list);
> +		rlist_add(&txn->read_set, &tracker->in_read_set);
> +		return 0;
> +	}
> +	story = memtx_tx_story_get(tuple);
> +
> +	struct rlist *r1 = story->reader_list.next;
> +	struct rlist *r2 = txn->read_set.next;
> +	while (r1 != &story->reader_list && r2 != &txn->read_set) {
> +		tracker = rlist_entry(r1, struct tx_read_tracker,
> +				      in_reader_list);
> +		assert(tracker->story == story);
> +		if (tracker->reader == txn)
> +			break;
> +		tracker = rlist_entry(r2, struct tx_read_tracker,
> +				      in_read_set);
> +		assert(tracker->reader == txn);
> +		if (tracker->story == story)
> +			break;
> +		tracker = NULL;
> +		r1 = r1->next;
> +		r2 = r2->next;
> +	}
> +	if (tracker != NULL) {
> +		/* Move to the beginning of a list for faster further lookups.*/
> +		rlist_del(&tracker->in_reader_list);
> +		rlist_del(&tracker->in_read_set);
> +	} else {
> +		size_t sz;
> +		tracker = region_alloc_object(&txn->region,
> +					      struct tx_read_tracker, &sz);
> +		if (tracker == NULL) {
> +			diag_set(OutOfMemory, sz, "tx region", "read_tracker");
> +			return -1;
> +		}
> +		tracker->reader = txn;
> +		tracker->story = story;
> +	}
> +	rlist_add(&story->reader_list, &tracker->in_reader_list);
> +	rlist_add(&txn->read_set, &tracker->in_read_set);

This part is very similar to the one in memtx_tx_cause_conflict()
Mb it is worth to generalize this chunk (moving to a separate func or sort of)..?

> +	return 0;
> +}
> diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h
> index 6143a22..4670ebe 100644
> --- a/src/box/memtx_tx.h
> +++ b/src/box/memtx_tx.h
> @@ -31,6 +31,12 @@
>   */
>  
>  #include <stdbool.h>
> +#include <stddef.h>
> +#include <stdint.h>

Redundant includes (?)..

> +#include "small/rlist.h"
> +#include "index.h"
> +#include "tuple.h"
>  
>  #include "small/rlist.h"
>  
> diff --git a/src/box/txn.h b/src/box/txn.h
> index f957d1e..ba818d0 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -36,6 +36,7 @@
>  #include "trigger.h"
>  #include "fiber.h"
>  #include "space.h"
> +#include "tuple.h"

Redundant include x2
  
>  #if defined(__cplusplus)
>  extern "C" {
> @@ -172,6 +173,27 @@ struct txn_stmt {
>  	struct space *space;
>  	struct tuple *old_tuple;
>  	struct tuple *new_tuple;
> +	/**
> +	 * If new_tuple != NULL and this transaction was not prepared,
> +	 * this member holds added story of the new_tuple.
> +	 */
> +	struct memtx_story *add_story;
> +	/**
> +	 * If new_tuple == NULL and this transaction was not prepared,
> +	 * this member holds added story of the old_tuple.
> +	 */
> +	struct memtx_story *del_story;
> +	/**
> 


More information about the Tarantool-patches mailing list