[PATCH 1.7 2/4] Replace stailq_splice with stailq_cut_tail

Vladimir Davydov vdavydov.dev at gmail.com
Tue Jan 23 13:06:12 MSK 2018


stailq_splice(head1, item, head2) moves elements from list 'head1'
starting from 'item' to list 'head2'. To follow the protocol, it needs
to know the element previous to 'item' in 'head1' so as to make it the
new last element of 'head1'. To achieve that, it has to loop over
'head1', which is inefficient. Actually, wherever we use this function,
we know in advance the element stailq_splice() has to look up, but we
still pass the next one, making its life difficult and obscuring the
code at the caller's side.

For example, look at how stailq_splice() is used in txn.c:

        if (stmt == NULL) {
                rollback_stmts = txn->stmts;
                stailq_create(&txn->stmts);
        } else {
                stailq_create(&rollback_stmts);
                stmt = stailq_next_entry(stmt, next);
                stailq_splice(&txn->stmts, &stmt->next, &rollback_stmts);
        }

while under the hood stailq_splice() has the loop to find 'stmt':

        stailq_splice(struct stailq *head1, struct stailq_entry *elem,
                      struct stailq *head2)
        {
                if (elem) {
                        *head2->last = elem;
                        head2->last = head1->last;
                        head1->last = &head1->first;
                        while (*head1->last != elem)
                                head1->last = &(*head1->last)->next;
                        *head1->last = NULL;
                }
        }

This is utterly preposterous.

Let's replace stailq_splice() with a new method with the same signature,
but with a slightly different semantics: move all elements from list
'head1' starting from the element *following* 'item' to list 'head2'; if
'item' is NULL, move all elements from 'head1' to 'head2'. This greatly
simplifies the code for both parties, as the callee doesn't have to loop
any more while the caller doesn't have to handle the case when 'item' is
NULL.

Also, let's change the name of this function, because stailq_splice()
sounds kinda confusing: after all, this function tears a list in two
first and only then splices the tail with another list. Let's remove the
'splice' part altogether (anyway, there's another function for splicing
lists - stailq_concat()) and call it stailq_cut_tail().
---
 src/box/txn.c           |  9 +--------
 src/box/vy_log.c        | 19 +++++++------------
 src/box/vy_tx.c         |  9 +--------
 src/box/wal.cc          | 25 ++++++++++---------------
 src/lib/salad/stailq.h  | 28 ++++++++++++++++++----------
 test/unit/stailq.c      | 44 +++++++++++++++++++++++++++++++++++++++++---
 test/unit/stailq.result | 33 ++++++++++++++++++++++++++++++++-
 7 files changed, 110 insertions(+), 57 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index b09df8c6..5d5b579b 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -484,14 +484,7 @@ box_txn_rollback_to_savepoint(box_txn_savepoint_t *svp)
 		return -1;
 	}
 	struct stailq rollback_stmts;
-	if (stmt == NULL) {
-		rollback_stmts = txn->stmts;
-		stailq_create(&txn->stmts);
-	} else {
-		stailq_create(&rollback_stmts);
-		stmt = stailq_next_entry(stmt, next);
-		stailq_splice(&txn->stmts, &stmt->next, &rollback_stmts);
-	}
+	stailq_cut_tail(&txn->stmts, svp->stmt, &rollback_stmts);
 	stailq_reverse(&rollback_stmts);
 	stailq_foreach_entry(stmt, &rollback_stmts, next) {
 		engine_rollback_statement(txn->engine, txn, stmt);
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 25a9cfad..55fef9d3 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -147,7 +147,10 @@ struct vy_log {
 	int tx_size;
 	/** Start of the current transaction in the pool, for rollback */
 	size_t tx_svp;
-	/** First record of the current transaction. */
+	/**
+	 * Last record in the queue at the time when the current
+	 * transaction was started. Used for rollback.
+	 */
 	struct stailq_entry *tx_begin;
 	/**
 	 * Flag set if vy_log_write() failed.
@@ -802,7 +805,6 @@ vy_log_flush(void)
 	region_reset(&vy_log.pool);
 	stailq_create(&vy_log.tx);
 	vy_log.tx_size = 0;
-	vy_log.tx_svp = 0;
 	return 0;
 }
 
@@ -1167,7 +1169,8 @@ void
 vy_log_tx_begin(void)
 {
 	latch_lock(&vy_log.latch);
-	vy_log.tx_begin = NULL;
+	vy_log.tx_begin = stailq_last(&vy_log.tx);
+	vy_log.tx_svp = region_used(&vy_log.pool);
 	vy_log.tx_failed = false;
 	say_verbose("begin vylog transaction");
 }
@@ -1225,12 +1228,10 @@ done:
 	return 0;
 
 rollback:
-	stailq_create(&rollback);
-	stailq_splice(&vy_log.tx, vy_log.tx_begin, &rollback);
+	stailq_cut_tail(&vy_log.tx, vy_log.tx_begin, &rollback);
 	region_truncate(&vy_log.pool, vy_log.tx_svp);
 	vy_log.tx_size = 0;
 	vy_log.tx_svp = 0;
-	vy_log.tx_begin = NULL;
 	say_verbose("rollback vylog transaction");
 	latch_unlock(&vy_log.latch);
 	return -1;
@@ -1254,8 +1255,6 @@ vy_log_write(const struct vy_log_record *record)
 {
 	assert(latch_owner(&vy_log.latch) == fiber());
 
-	size_t svp = region_used(&vy_log.pool);
-
 	struct vy_log_record *tx_record = vy_log_record_dup(&vy_log.pool,
 							    record);
 	if (tx_record == NULL) {
@@ -1268,10 +1267,6 @@ vy_log_write(const struct vy_log_record *record)
 
 	stailq_add_tail_entry(&vy_log.tx, tx_record, in_tx);
 	vy_log.tx_size++;
-	if (vy_log.tx_begin == NULL) {
-		vy_log.tx_begin = &tx_record->in_tx;
-		vy_log.tx_svp = svp;
-	}
 }
 
 /**
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index 42ba5157..4f9b1e42 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -681,15 +681,8 @@ vy_tx_rollback_to_savepoint(struct vy_tx *tx, void *svp)
 {
 	assert(tx->state == VINYL_TX_READY);
 	struct stailq_entry *last = svp;
-	/* Start from the first statement after the savepoint. */
-	last = last == NULL ? stailq_first(&tx->log) : stailq_next(last);
-	if (last == NULL) {
-		/* Empty transaction or no changes after the savepoint. */
-		return;
-	}
 	struct stailq tail;
-	stailq_create(&tail);
-	stailq_splice(&tx->log, last, &tail);
+	stailq_cut_tail(&tx->log, last, &tail);
 	/* Rollback statements in LIFO order. */
 	stailq_reverse(&tail);
 	struct txv *v, *tmp;
diff --git a/src/box/wal.cc b/src/box/wal.cc
index 0c4dd3de..c1238c84 100644
--- a/src/box/wal.cc
+++ b/src/box/wal.cc
@@ -629,7 +629,8 @@ wal_write_to_disk(struct cmsg *msg)
 	/*
 	 * Iterate over requests (transactions)
 	 */
-	struct journal_entry *entry, *last_commit_entry = NULL;
+	struct journal_entry *entry;
+	struct stailq_entry *last_committed = NULL;
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
 		wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
 		entry->res = vclock_sum(&writer->vclock);
@@ -637,14 +638,13 @@ wal_write_to_disk(struct cmsg *msg)
 		if (rc < 0)
 			goto done;
 		if (rc > 0)
-			last_commit_entry = entry;
+			last_committed = &entry->fifo;
 		/* rc == 0: the write is buffered in xlog_tx */
 	}
 	if (xlog_flush(l) < 0)
 		goto done;
 
-	last_commit_entry = stailq_last_entry(&wal_msg->commit,
-					      struct journal_entry, fifo);
+	last_committed = stailq_last(&wal_msg->commit);
 
 done:
 	struct error *error = diag_last_error(diag_get());
@@ -660,20 +660,15 @@ done:
 	 * nothing, and need to start rollback from the first
 	 * request. Otherwise we rollback from the first request.
 	 */
-	struct journal_entry *rollback_entry = last_commit_entry ?
-		stailq_next_entry(last_commit_entry, fifo) :
-		stailq_first_entry(&wal_msg->commit, struct journal_entry,
-				   fifo);
-	if (rollback_entry) {
-		/* Update status of the successfully committed requests. */
-		for (entry = rollback_entry; entry != NULL;
-		     entry = stailq_next_entry(entry, fifo)) {
+	struct stailq rollback;
+	stailq_cut_tail(&wal_msg->commit, last_committed, &rollback);
 
+	if (!stailq_empty(&rollback)) {
+		/* Update status of the successfully committed requests. */
+		stailq_foreach_entry(entry, &rollback, fifo)
 			entry->res = -1;
-		}
 		/* Rollback unprocessed requests */
-		stailq_splice(&wal_msg->commit, &rollback_entry->fifo,
-			      &wal_msg->rollback);
+		stailq_concat(&wal_msg->rollback, &rollback);
 		wal_writer_begin_rollback(writer);
 	}
 	fiber_gc();
diff --git a/src/lib/salad/stailq.h b/src/lib/salad/stailq.h
index a4656421..0f51ddb9 100644
--- a/src/lib/salad/stailq.h
+++ b/src/lib/salad/stailq.h
@@ -158,18 +158,26 @@ stailq_reverse(struct stailq *head)
 	}
 }
 
-/** Concat all members of head1 starting from elem to the end of head2. */
+/**
+ * Move elements of list @head starting from @last->next to
+ * list @tail. If @last is NULL, then this function moves all
+ * elements from @head to @tail. Note, all elements of list
+ * @tail are discarded.
+ */
 static inline void
-stailq_splice(struct stailq *head1, struct stailq_entry *elem,
-	      struct stailq *head2)
+stailq_cut_tail(struct stailq *head, struct stailq_entry *last,
+		struct stailq *tail)
 {
-	if (elem) {
-		*head2->last = elem;
-		head2->last = head1->last;
-		head1->last = &head1->first;
-		while (*head1->last != elem)
-			head1->last = &(*head1->last)->next;
-		*head1->last = NULL;
+	if (last != NULL) {
+		tail->first = last->next;
+		tail->last = head->last;
+		head->last = &last->next;
+		last->next = NULL;
+	} else {
+		tail->first = head->first;
+		tail->last = head->first != NULL ? head->last : &tail->first;
+		head->first = NULL;
+		head->last = &head->first;
 	}
 }
 
diff --git a/test/unit/stailq.c b/test/unit/stailq.c
index be957334..12f05a0c 100644
--- a/test/unit/stailq.c
+++ b/test/unit/stailq.c
@@ -3,7 +3,7 @@
 #include <stdarg.h>
 #include "unit.h"
 
-#define PLAN		37
+#define PLAN		68
 
 #define ITEMS		7
 
@@ -15,7 +15,7 @@ struct test {
 
 static struct test items[ITEMS];
 
-static struct stailq head;
+static struct stailq head, tail;
 
 int
 main(void)
@@ -72,6 +72,44 @@ main(void)
 		is(it, items + i, "element (foreach_entry) %d", i);
 		i++;
 	}
+
+	stailq_create(&head);
+	for (i = 0; i < ITEMS; i++) {
+		items[i].no = ITEMS - i;
+		stailq_add_tail_entry(&head, &items[i], next);
+	}
+	stailq_cut_tail(&head, NULL, &tail);
+	ok(stailq_empty(&head), "head is empty after cut at first");
+	i = 0;
+	stailq_foreach_entry(it, &tail, next) {
+		is(it, items + i, "tail element after cut at first %d", i);
+		i++;
+	}
+	stailq_concat(&head, &tail);
+	stailq_cut_tail(&head, stailq_last(&head), &tail);
+	ok(stailq_empty(&tail), "tail is empty after cut at last");
+	i = 0;
+	stailq_foreach_entry(it, &head, next) {
+		is(it, items + i, "head element after cut at last %d", i);
+		i++;
+	}
+	stailq_concat(&head, &tail);
+	stailq_cut_tail(&head, &items[3].next, &tail);
+	i = 0;
+	stailq_foreach_entry(it, &head, next) {
+		is(it, items + i, "head element after cut at middle %d", i);
+		i++;
+	}
+	stailq_foreach_entry(it, &tail, next) {
+		is(it, items + i, "tail element after cut at middle %d", i);
+		i++;
+	}
+	stailq_concat(&head, &tail);
+	ok(stailq_empty(&tail), "tail is empty after concat");
+	i = 0;
+	stailq_foreach_entry(it, &head, next) {
+		is(it, items + i, "head element after concat %d", i);
+		i++;
+	}
 	return check_plan();
 }
-
diff --git a/test/unit/stailq.result b/test/unit/stailq.result
index debebf52..78d3e721 100644
--- a/test/unit/stailq.result
+++ b/test/unit/stailq.result
@@ -1,4 +1,4 @@
-1..37
+1..68
 ok 1 - list is empty
 ok 2 - list is empty after reverse
 ok 3 - first item
@@ -36,3 +36,34 @@ ok 34 - element (foreach_entry) 3
 ok 35 - element (foreach_entry) 4
 ok 36 - element (foreach_entry) 5
 ok 37 - element (foreach_entry) 6
+ok 38 - head is empty after cut at first
+ok 39 - tail element after cut at first 0
+ok 40 - tail element after cut at first 1
+ok 41 - tail element after cut at first 2
+ok 42 - tail element after cut at first 3
+ok 43 - tail element after cut at first 4
+ok 44 - tail element after cut at first 5
+ok 45 - tail element after cut at first 6
+ok 46 - tail is empty after cut at last
+ok 47 - head element after cut at last 0
+ok 48 - head element after cut at last 1
+ok 49 - head element after cut at last 2
+ok 50 - head element after cut at last 3
+ok 51 - head element after cut at last 4
+ok 52 - head element after cut at last 5
+ok 53 - head element after cut at last 6
+ok 54 - head element after cut at middle 0
+ok 55 - head element after cut at middle 1
+ok 56 - head element after cut at middle 2
+ok 57 - head element after cut at middle 3
+ok 58 - tail element after cut at middle 4
+ok 59 - tail element after cut at middle 5
+ok 60 - tail element after cut at middle 6
+ok 61 - tail is empty after concat
+ok 62 - head element after concat 0
+ok 63 - head element after concat 1
+ok 64 - head element after concat 2
+ok 65 - head element after concat 3
+ok 66 - head element after concat 4
+ok 67 - head element after concat 5
+ok 68 - head element after concat 6
-- 
2.11.0




More information about the Tarantool-patches mailing list