From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp39.i.mail.ru (smtp39.i.mail.ru [94.100.177.99]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id C5291469719 for ; Tue, 15 Sep 2020 17:33:58 +0300 (MSK) Date: Tue, 15 Sep 2020 14:33:57 +0000 From: Nikita Pettik Message-ID: <20200915143357.GB23208@tarantool.org> References: <1599560532-27089-1-git-send-email-alyapunov@tarantool.org> <1599560532-27089-8-git-send-email-alyapunov@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Disposition: inline In-Reply-To: <1599560532-27089-8-git-send-email-alyapunov@tarantool.org> Subject: Re: [Tarantool-patches] [PATCH v4 07/12] txm: introduce memtx_story List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Aleksandr Lyapunov Cc: tarantool-patches@dev.tarantool.org On 08 Sep 13:22, Aleksandr Lyapunov wrote: > Memtx story is a part of a history of a value in space. > It's a story about a tuple, from the point it was added to space > to the point when it was deleted from the space. > All stories are linked into a list of stories of the same key of > each index. > > Part of #4897 > --- > +/** > + * Temporary (allocated on region) struct that stores a conflicting TX. > + */ > +struct memtx_tx_conflict > +{ > + /* The thansaction that will conflict us upon commit. */ -> transaction > +} > + > +/** > + * Scan a history starting by @stmt statement in @index for a visible tuple > + * (prepared suits), returned via @visible_replaced. > + * Collect a list of transactions that will abort current transaction if they > + * are committed. > + * > + * @return 0 on success, -1 on memory error. > + */ > +static int > +memtx_tx_story_find_visible(struct memtx_story *story, struct txn_stmt *stmt, -> ..find_visible_tuple? > + uint32_t index, struct tuple **visible_replaced, > + struct memtx_tx_conflict **collected_conflicts, > + struct region *region) > +{ > + while (true) { > + if (!story->link[index].older.is_story) { > + /* The tuple is so old that we don't know its story. */ > + *visible_replaced = story->link[index].older.tuple; > + assert(*visible_replaced == NULL || > + !(*visible_replaced)->is_dirty); > + break; > + } > + story = story->link[index].older.story; > + bool unused; > + if (memtx_tx_story_is_visible(story, stmt->txn, > + visible_replaced, true, &unused)) > + break; > + > +int > +memtx_tx_history_add_stmt(struct txn_stmt *stmt, struct tuple *old_tuple, > + struct tuple *new_tuple, enum dup_replace_mode mode, > + struct tuple **result) > +{ > + assert(new_tuple != NULL || old_tuple != NULL); > + struct space *space = stmt->space; > + struct memtx_story *add_story = NULL; > + uint32_t add_story_linked = 0; > + struct memtx_story *del_story = NULL; > + bool del_story_created = false; > + struct region *region = &stmt->txn->region; > + size_t region_svp = region_used(region); > + > + /* > + * List of transactions that will conflict us once one of them > + * become committed. > + */ > + struct memtx_tx_conflict *collected_conflicts = NULL; > + > + /* Create add_story if necessary. */ > + if (new_tuple != NULL) { > + add_story = memtx_tx_story_new_add_stmt(new_tuple, stmt); > + if (add_story == NULL) > + goto fail; > + > + for (uint32_t i = 0; i < space->index_count; i++) { > + struct tuple *replaced; > + struct index *index = space->index[i]; > + if (index_replace(index, NULL, new_tuple, > + DUP_REPLACE_OR_INSERT, > + &replaced) != 0) > + goto fail; > + memtx_tx_story_link_tuple(add_story, replaced, i); > + add_story_linked++; > + > + struct tuple *visible_replaced = NULL; > + if (memtx_tx_story_find_visible(add_story, stmt, i, > + &visible_replaced, > + &collected_conflicts, > + region) != 0) > + goto fail; > + > + uint32_t errcode; > + errcode = replace_check_dup(old_tuple, visible_replaced, > + i == 0 ? mode : DUP_INSERT); > + if (errcode != 0) { > + if (space != NULL) Why space can be null, if we anyway dereference it to get index_count? > + diag_set(ClientError, errcode, > + index->def->name, > + space_name(space)); > + goto fail; > + } > + > + if (i == 0) > + old_tuple = visible_replaced; > + } > +void > +memtx_tx_history_rollback_stmt(struct txn_stmt *stmt) > +{ > + if (stmt->add_story != NULL) { > + assert(stmt->add_story->tuple == stmt->new_tuple); > + struct memtx_story *story = stmt->add_story; > + > + for (uint32_t i = 0; i < story->index_count; i++) { > + struct memtx_story_link *link = &story->link[i]; > + if (link->newer_story == NULL) { > + struct tuple *unused; > + struct index *index = stmt->space->index[i]; > + struct tuple *was = memtx_tx_story_older_tuple(link); > + if (index_replace(index, story->tuple, was, > + DUP_INSERT, &unused) != 0) { > + diag_log(); > + unreachable(); > + panic("failed to rollback change"); > + } > + } else { > + struct memtx_story *newer = link->newer_story; > + assert(newer->link[i].older.is_story); > + assert(newer->link[i].older.story == story); > + memtx_tx_story_unlink(newer, i); > + if (link->older.is_story) { > + struct memtx_story *to = link->older.story; > + memtx_tx_story_link_story(newer, to, i); > + } else { > + struct tuple *to = link->older.tuple; > + memtx_tx_story_link_tuple(newer, to, i); > + } > + } > + memtx_tx_story_unlink(story, i); > + } > + stmt->add_story->add_stmt = NULL; > + memtx_tx_story_delete(stmt->add_story); > + stmt->add_story = NULL; > + tuple_unref(stmt->new_tuple); > + } > + > + if (stmt->del_story != NULL) { > + struct memtx_story *story = stmt->del_story; > + > + struct txn_stmt **prev = &story->del_stmt; > + while (*prev != stmt) { > + prev = &(*prev)->next_in_del_list; > + assert(*prev != NULL); > + } Some parts of this function are not covered by tests. For instance: diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c index 1b546378a..e2c612a95 100644 --- a/src/box/memtx_tx.c +++ b/src/box/memtx_tx.c @@ -793,6 +793,7 @@ memtx_tx_history_rollback_stmt(struct txn_stmt *stmt) panic("failed to rollback change"); } } else { + assert(0); struct memtx_story *newer = link->newer_story; assert(newer->link[i].older.is_story); assert(newer->link[i].older.story == story); @@ -818,6 +819,7 @@ memtx_tx_history_rollback_stmt(struct txn_stmt *stmt) struct txn_stmt **prev = &story->del_stmt; while (*prev != stmt) { + assert(0); prev = &(*prev)->next_in_del_list; assert(*prev != NULL); } And tests are passing. > + *prev = stmt->next_in_del_list; > + stmt->next_in_del_list = NULL; > + > + stmt->del_story->del_stmt = NULL; > + stmt->del_story = NULL; > + } > +} > + > +void > +memtx_tx_history_prepare_stmt(struct txn_stmt *stmt) > +{ > + assert(stmt->txn->psn != 0); > + > + /* Move story to the past to prepared stories. */ > + > + struct memtx_story *story = stmt->add_story; > + uint32_t index_count = story == NULL ? 0 : story->index_count; > + /* > + * Note that if stmt->add_story == NULL, the index_count is set to 0, > + * and we will not enter the loop. > + */ > + for (uint32_t i = 0; i < index_count; ) { > + if (!story->link[i].older.is_story) { > + /* tuple is old. */ > + i++; > + continue; > + } > + bool old_story_is_prepared = false; > + struct memtx_story *old_story = story->link[i].older.story; > + if (old_story->del_psn != 0) { > + /* if psn is set, the change is prepared. */ > + old_story_is_prepared = true; > + } else if (old_story->add_psn != 0) { > + /* if psn is set, the change is prepared. */ > + old_story_is_prepared = true; > + } else if (old_story->add_stmt == NULL) { > + /* ancient. */ > + old_story_is_prepared = true; > + } else if (old_story->add_stmt->txn == stmt->txn) { > + /* added by us. */ > + } > + > + if (old_story_is_prepared) { > + struct tx_read_tracker *tracker; > + rlist_foreach_entry(tracker, &old_story->reader_list, > + in_reader_list) { > + if (tracker->reader == stmt->txn) > + continue; > + if (tracker->reader->status != TXN_INPROGRESS) > + continue; > + memtx_tx_handle_conflict(stmt->txn, > + tracker->reader); > + } > + i++; > + continue; > + } > + > + if (old_story->add_stmt->does_require_old_tuple || i != 0) > + old_story->add_stmt->txn->status = TXN_CONFLICTED; This function still seems a bit hard to understand. Mb it is worth adding more comments? > + /* Swap story and old story. */ > + struct memtx_story_link *link = &story->link[i]; > + if (link->newer_story == NULL) { > + /* we have to replace the tuple in index. */ > + struct tuple *unused; > + struct index *index = stmt->space->index[i]; > + if (index_replace(index, story->tuple, old_story->tuple, > + DUP_INSERT, &unused) != 0) { > + diag_log(); > + panic("failed to rollback change"); > + } > + } else { > + struct memtx_story *newer = link->newer_story; > + assert(newer->link[i].older.is_story); > + assert(newer->link[i].older.story == story); > + memtx_tx_story_unlink(newer, i); > + memtx_tx_story_link_story(newer, old_story, i); > + } > +struct tuple * > +memtx_tx_tuple_clarify_slow(struct txn *txn, struct space *space, > + struct tuple *tuple, uint32_t index, > + uint32_t mk_index, bool is_prepared_ok) > +{ > + assert(tuple->is_dirty); > + struct memtx_story *story = memtx_tx_story_get(tuple); > + bool own_change = false; > + struct tuple *result = NULL; > + > + while (true) { > + if (memtx_tx_story_is_visible(story, txn, &result, > + is_prepared_ok, &own_change)) { > + break; > + } > + if (story->link[index].older.is_story) { > + story = story->link[index].older.story; > + } else { > + result = story->link[index].older.tuple; > + break; > + } > + } > + if (!own_change) > + memtx_tx_track_read(txn, space, tuple); > + (void)mk_index; /* TODO: multiindex */ Panic/assert/diag in case of multikey indexes? > + return result; > +} > + > + > +int > +memtx_tx_track_read(struct txn *txn, struct space *space, struct tuple *tuple) > +{ > + if (tuple == NULL) > + return 0; > + if (txn == NULL) > + return 0; > + if (space == NULL) > + return 0; Space seems to be always != NULL. If so, let's replace it with an assertion. > + struct memtx_story *story; > + struct tx_read_tracker *tracker = NULL; > + > + if (!tuple->is_dirty) { > + story = memtx_tx_story_new(space, tuple); > + if (story == NULL) > + return -1; > + size_t sz; > + tracker = region_alloc_object(&txn->region, > + struct tx_read_tracker, &sz); > + if (tracker == NULL) { > + diag_set(OutOfMemory, sz, "tx region", "read_tracker"); > + memtx_tx_story_delete(story); > + return -1; > + } > + tracker->reader = txn; > + tracker->story = story; > + rlist_add(&story->reader_list, &tracker->in_reader_list); > + rlist_add(&txn->read_set, &tracker->in_read_set); > + return 0; > + } > + story = memtx_tx_story_get(tuple); > + > + struct rlist *r1 = story->reader_list.next; > + struct rlist *r2 = txn->read_set.next; > + while (r1 != &story->reader_list && r2 != &txn->read_set) { > + tracker = rlist_entry(r1, struct tx_read_tracker, > + in_reader_list); > + assert(tracker->story == story); > + if (tracker->reader == txn) > + break; > + tracker = rlist_entry(r2, struct tx_read_tracker, > + in_read_set); > + assert(tracker->reader == txn); > + if (tracker->story == story) > + break; > + tracker = NULL; > + r1 = r1->next; > + r2 = r2->next; > + } > + if (tracker != NULL) { > + /* Move to the beginning of a list for faster further lookups.*/ > + rlist_del(&tracker->in_reader_list); > + rlist_del(&tracker->in_read_set); > + } else { > + size_t sz; > + tracker = region_alloc_object(&txn->region, > + struct tx_read_tracker, &sz); > + if (tracker == NULL) { > + diag_set(OutOfMemory, sz, "tx region", "read_tracker"); > + return -1; > + } > + tracker->reader = txn; > + tracker->story = story; > + } > + rlist_add(&story->reader_list, &tracker->in_reader_list); > + rlist_add(&txn->read_set, &tracker->in_read_set); This part is very similar to the one in memtx_tx_cause_conflict() Mb it is worth to generalize this chunk (moving to a separate func or sort of)..? > + return 0; > +} > diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h > index 6143a22..4670ebe 100644 > --- a/src/box/memtx_tx.h > +++ b/src/box/memtx_tx.h > @@ -31,6 +31,12 @@ > */ > > #include > +#include > +#include Redundant includes (?).. > +#include "small/rlist.h" > +#include "index.h" > +#include "tuple.h" > > #include "small/rlist.h" > > diff --git a/src/box/txn.h b/src/box/txn.h > index f957d1e..ba818d0 100644 > --- a/src/box/txn.h > +++ b/src/box/txn.h > @@ -36,6 +36,7 @@ > #include "trigger.h" > #include "fiber.h" > #include "space.h" > +#include "tuple.h" Redundant include x2 > #if defined(__cplusplus) > extern "C" { > @@ -172,6 +173,27 @@ struct txn_stmt { > struct space *space; > struct tuple *old_tuple; > struct tuple *new_tuple; > + /** > + * If new_tuple != NULL and this transaction was not prepared, > + * this member holds added story of the new_tuple. > + */ > + struct memtx_story *add_story; > + /** > + * If new_tuple == NULL and this transaction was not prepared, > + * this member holds added story of the old_tuple. > + */ > + struct memtx_story *del_story; > + /** >