[tarantool-patches] Re: [PATCH v5 1/4] memtx: rework memtx_tree to store arbitrary nodes

Kirill Shcherbatov kshcherbatov at tarantool.org
Mon Mar 11 19:53:48 MSK 2019


Hi! Thank you for review. I've fixed your comments for this commit and
merge this changes up. Actual patch version is on the branch.

===================================================

Reworked memtx_tree class to use structure memtx_tree_data as a
tree node. This makes possible to extend it with service field
to implement tuple hints and multikey indexes in subsequent
patches.

Needed for #3961
---
 src/box/memtx_tree.c | 245 ++++++++++++++++++++++++-------------------
 1 file changed, 138 insertions(+), 107 deletions(-)

diff --git a/src/box/memtx_tree.c b/src/box/memtx_tree.c
index 250df7f2d..d5b42efb6 100644
--- a/src/box/memtx_tree.c
+++ b/src/box/memtx_tree.c
@@ -50,30 +50,37 @@ struct memtx_tree_key_data {
 };
 
 /**
- * BPS tree element vs key comparator.
- * Defined in header in order to allow compiler to inline it.
- * @param tuple - tuple to compare.
- * @param key_data - key to compare with.
- * @param def - key definition.
- * @retval 0  if tuple == key in terms of def.
- * @retval <0 if tuple < key in terms of def.
- * @retval >0 if tuple > key in terms of def.
+ * Struct that is used as a elem in BPS tree definition.
  */
-static int
-memtx_tree_compare_key(const struct tuple *tuple,
-		       const struct memtx_tree_key_data *key_data,
-		       struct key_def *def)
+struct memtx_tree_data {
+	/* Tuple that this node is represents. */
+	struct tuple *tuple;
+};
+
+/**
+ * Test whether BPS tree elements are identical i.e. represent
+ * the same tuple at the same position in the tree.
+ * @param a - First BPS tree element to compare.
+ * @param b - Second BPS tree element to compare.
+ * @retval true - When elements a and b are identical.
+ * @retval false - Otherwise.
+ */
+static bool
+memtx_tree_data_identical(const struct memtx_tree_data *a,
+			  const struct memtx_tree_data *b)
 {
-	return tuple_compare_with_key(tuple, key_data->key,
-				      key_data->part_count, def);
+	return a->tuple == b->tuple;
 }
 
 #define BPS_TREE_NAME memtx_tree
 #define BPS_TREE_BLOCK_SIZE (512)
 #define BPS_TREE_EXTENT_SIZE MEMTX_EXTENT_SIZE
-#define BPS_TREE_COMPARE(a, b, arg) tuple_compare(a, b, arg)
-#define BPS_TREE_COMPARE_KEY(a, b, arg) memtx_tree_compare_key(a, b, arg)
-#define bps_tree_elem_t struct tuple *
+#define BPS_TREE_COMPARE(a, b, arg)\
+	tuple_compare((&a)->tuple, (&b)->tuple, arg)
+#define BPS_TREE_COMPARE_KEY(a, b, arg)\
+	tuple_compare_with_key((&a)->tuple, (b)->key, (b)->part_count, arg)
+#define BPS_TREE_IDENTICAL(a, b) memtx_tree_data_identical(&a, &b)
+#define bps_tree_elem_t struct memtx_tree_data
 #define bps_tree_key_t struct memtx_tree_key_data *
 #define bps_tree_arg_t struct key_def *
 
@@ -84,6 +91,7 @@ memtx_tree_compare_key(const struct tuple *tuple,
 #undef BPS_TREE_EXTENT_SIZE
 #undef BPS_TREE_COMPARE
 #undef BPS_TREE_COMPARE_KEY
+#undef BPS_TREE_IDENTICAL
 #undef bps_tree_elem_t
 #undef bps_tree_key_t
 #undef bps_tree_arg_t
@@ -91,7 +99,7 @@ memtx_tree_compare_key(const struct tuple *tuple,
 struct memtx_tree_index {
 	struct index base;
 	struct memtx_tree tree;
-	struct tuple **build_array;
+	struct memtx_tree_data *build_array;
 	size_t build_array_size, build_array_alloc_size;
 	struct memtx_gc_task gc_task;
 	struct memtx_tree_iterator gc_iterator;
@@ -102,8 +110,10 @@ struct memtx_tree_index {
 static int
 memtx_tree_qcompare(const void* a, const void *b, void *c)
 {
-	return tuple_compare(*(struct tuple **)a,
-		*(struct tuple **)b, (struct key_def *)c);
+	const struct memtx_tree_data *data_a = a;
+	const struct memtx_tree_data *data_b = b;
+	struct key_def *key_def = c;
+	return tuple_compare(data_a->tuple, data_b->tuple, key_def);
 }
 
 /* {{{ MemtxTree Iterators ****************************************/
@@ -114,7 +124,7 @@ struct tree_iterator {
 	struct memtx_tree_iterator tree_iterator;
 	enum iterator_type type;
 	struct memtx_tree_key_data key_data;
-	struct tuple *current_tuple;
+	struct memtx_tree_data current;
 	/** Memory pool the iterator was allocated from. */
 	struct mempool *pool;
 };
@@ -137,8 +147,9 @@ static void
 tree_iterator_free(struct iterator *iterator)
 {
 	struct tree_iterator *it = tree_iterator(iterator);
-	if (it->current_tuple != NULL)
-		tuple_unref(it->current_tuple);
+	struct tuple *tuple = it->current.tuple;
+	if (tuple != NULL)
+		tuple_unref(tuple);
 	mempool_free(it->pool, it);
 }
 
@@ -153,25 +164,28 @@ tree_iterator_dummie(struct iterator *iterator, struct tuple **ret)
 static int
 tree_iterator_next(struct iterator *iterator, struct tuple **ret)
 {
-	struct tuple **res;
 	struct tree_iterator *it = tree_iterator(iterator);
-	assert(it->current_tuple != NULL);
-	struct tuple **check = memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
-	if (check == NULL || *check != it->current_tuple)
+	assert(it->current.tuple != NULL);
+	struct memtx_tree_data *check =
+		memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+	if (check == NULL || !memtx_tree_data_identical(check, &it->current)) {
 		it->tree_iterator =
-			memtx_tree_upper_bound_elem(it->tree, it->current_tuple,
+			memtx_tree_upper_bound_elem(it->tree, it->current,
 						    NULL);
-	else
+	} else {
 		memtx_tree_iterator_next(it->tree, &it->tree_iterator);
-	tuple_unref(it->current_tuple);
-	it->current_tuple = NULL;
-	res = memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+	}
+	tuple_unref(it->current.tuple);
+	struct memtx_tree_data *res =
+		memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
 	if (res == NULL) {
 		iterator->next = tree_iterator_dummie;
+		it->current.tuple = NULL;
 		*ret = NULL;
 	} else {
-		*ret = it->current_tuple = *res;
-		tuple_ref(it->current_tuple);
+		*ret = res->tuple;
+		tuple_ref(*ret);
+		it->current = *res;
 	}
 	return 0;
 }
@@ -180,22 +194,25 @@ static int
 tree_iterator_prev(struct iterator *iterator, struct tuple **ret)
 {
 	struct tree_iterator *it = tree_iterator(iterator);
-	assert(it->current_tuple != NULL);
-	struct tuple **check = memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
-	if (check == NULL || *check != it->current_tuple)
+	assert(it->current.tuple != NULL);
+	struct memtx_tree_data *check =
+		memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+	if (check == NULL || !memtx_tree_data_identical(check, &it->current)) {
 		it->tree_iterator =
-			memtx_tree_lower_bound_elem(it->tree, it->current_tuple,
-						    NULL);
+			memtx_tree_lower_bound_elem(it->tree, it->current, NULL);
+	}
 	memtx_tree_iterator_prev(it->tree, &it->tree_iterator);
-	tuple_unref(it->current_tuple);
-	it->current_tuple = NULL;
-	struct tuple **res = memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+	tuple_unref(it->current.tuple);
+	struct memtx_tree_data *res =
+		memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
 	if (!res) {
 		iterator->next = tree_iterator_dummie;
+		it->current.tuple = NULL;
 		*ret = NULL;
 	} else {
-		*ret = it->current_tuple = *res;
-		tuple_ref(it->current_tuple);
+		*ret = res->tuple;
+		tuple_ref(*ret);
+		it->current = *res;
 	}
 	return 0;
 }
@@ -204,27 +221,30 @@ static int
 tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret)
 {
 	struct tree_iterator *it = tree_iterator(iterator);
-	assert(it->current_tuple != NULL);
-	struct tuple **check = memtx_tree_iterator_get_elem(it->tree,
-						&it->tree_iterator);
-	if (check == NULL || *check != it->current_tuple)
+	assert(it->current.tuple != NULL);
+	struct memtx_tree_data *check =
+		memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+	if (check == NULL || !memtx_tree_data_identical(check, &it->current)) {
 		it->tree_iterator =
-			memtx_tree_upper_bound_elem(it->tree, it->current_tuple,
-						    NULL);
-	else
+			memtx_tree_upper_bound_elem(it->tree, it->current, NULL);
+	} else {
 		memtx_tree_iterator_next(it->tree, &it->tree_iterator);
-	tuple_unref(it->current_tuple);
-	it->current_tuple = NULL;
-	struct tuple **res = memtx_tree_iterator_get_elem(it->tree,
-						&it->tree_iterator);
+	}
+	tuple_unref(it->current.tuple);
+	struct memtx_tree_data *res =
+		memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
 	/* Use user key def to save a few loops. */
-	if (!res || memtx_tree_compare_key(*res, &it->key_data,
-					   it->index_def->key_def) != 0) {
+	if (res == NULL ||
+	    tuple_compare_with_key(res->tuple, it->key_data.key,
+				   it->key_data.part_count,
+				   it->index_def->key_def) != 0) {
 		iterator->next = tree_iterator_dummie;
+		it->current.tuple = NULL;
 		*ret = NULL;
 	} else {
-		*ret = it->current_tuple = *res;
-		tuple_ref(it->current_tuple);
+		*ret = res->tuple;
+		tuple_ref(*ret);
+		it->current = *res;
 	}
 	return 0;
 }
@@ -233,26 +253,29 @@ static int
 tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret)
 {
 	struct tree_iterator *it = tree_iterator(iterator);
-	assert(it->current_tuple != NULL);
-	struct tuple **check = memtx_tree_iterator_get_elem(it->tree,
-						&it->tree_iterator);
-	if (check == NULL || *check != it->current_tuple)
+	assert(it->current.tuple != NULL);
+	struct memtx_tree_data *check =
+		memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
+	if (check == NULL || !memtx_tree_data_identical(check, &it->current)) {
 		it->tree_iterator =
-			memtx_tree_lower_bound_elem(it->tree, it->current_tuple,
-						    NULL);
+			memtx_tree_lower_bound_elem(it->tree, it->current, NULL);
+	}
 	memtx_tree_iterator_prev(it->tree, &it->tree_iterator);
-	tuple_unref(it->current_tuple);
-	it->current_tuple = NULL;
-	struct tuple **res = memtx_tree_iterator_get_elem(it->tree,
-						&it->tree_iterator);
+	tuple_unref(it->current.tuple);
+	struct memtx_tree_data *res =
+		memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
 	/* Use user key def to save a few loops. */
-	if (!res || memtx_tree_compare_key(*res, &it->key_data,
-					   it->index_def->key_def) != 0) {
+	if (res == NULL ||
+	    tuple_compare_with_key(res->tuple, it->key_data.key,
+				   it->key_data.part_count,
+				   it->index_def->key_def) != 0) {
 		iterator->next = tree_iterator_dummie;
+		it->current.tuple = NULL;
 		*ret = NULL;
 	} else {
-		*ret = it->current_tuple = *res;
-		tuple_ref(it->current_tuple);
+		*ret = res->tuple;
+		tuple_ref(*ret);
+		it->current = *res;
 	}
 	return 0;
 }
@@ -260,7 +283,7 @@ tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret)
 static void
 tree_iterator_set_next_method(struct tree_iterator *it)
 {
-	assert(it->current_tuple != NULL);
+	assert(it->current.tuple != NULL);
 	switch (it->type) {
 	case ITER_EQ:
 		it->base.next = tree_iterator_next_equal;
@@ -294,7 +317,7 @@ tree_iterator_start(struct iterator *iterator, struct tuple **ret)
 	const struct memtx_tree *tree = it->tree;
 	enum iterator_type type = it->type;
 	bool exact = false;
-	assert(it->current_tuple == NULL);
+	assert(it->current.tuple == NULL);
 	if (it->key_data.key == 0) {
 		if (iterator_type_is_reverse(it->type))
 			it->tree_iterator = memtx_tree_iterator_last(tree);
@@ -331,12 +354,13 @@ tree_iterator_start(struct iterator *iterator, struct tuple **ret)
 		}
 	}
 
-	struct tuple **res = memtx_tree_iterator_get_elem(it->tree,
-						&it->tree_iterator);
+	struct memtx_tree_data *res =
+		memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
 	if (!res)
 		return 0;
-	*ret = it->current_tuple = *res;
-	tuple_ref(it->current_tuple);
+	*ret = res->tuple;
+	tuple_ref(*ret);
+	it->current = *res;
 	tree_iterator_set_next_method(it);
 	return 0;
 }
@@ -390,9 +414,10 @@ memtx_tree_index_gc_run(struct memtx_gc_task *task, bool *done)
 
 	unsigned int loops = 0;
 	while (!memtx_tree_iterator_is_invalid(itr)) {
-		struct tuple *tuple = *memtx_tree_iterator_get_elem(tree, itr);
+		struct memtx_tree_data *res =
+			memtx_tree_iterator_get_elem(tree, itr);
 		memtx_tree_iterator_next(tree, itr);
-		tuple_unref(tuple);
+		tuple_unref(res->tuple);
 		if (++loops >= YIELD_LOOPS) {
 			*done = false;
 			return;
@@ -470,8 +495,8 @@ static int
 memtx_tree_index_random(struct index *base, uint32_t rnd, struct tuple **result)
 {
 	struct memtx_tree_index *index = (struct memtx_tree_index *)base;
-	struct tuple **res = memtx_tree_random(&index->tree, rnd);
-	*result = res != NULL ? *res : NULL;
+	struct memtx_tree_data *res = memtx_tree_random(&index->tree, rnd);
+	*result = res != NULL ? res->tuple : NULL;
 	return 0;
 }
 
@@ -494,8 +519,8 @@ memtx_tree_index_get(struct index *base, const char *key,
 	struct memtx_tree_key_data key_data;
 	key_data.key = key;
 	key_data.part_count = part_count;
-	struct tuple **res = memtx_tree_find(&index->tree, &key_data);
-	*result = res != NULL ? *res : NULL;
+	struct memtx_tree_data *res = memtx_tree_find(&index->tree, &key_data);
+	*result = res != NULL ? res->tuple : NULL;
 	return 0;
 }
 
@@ -506,11 +531,14 @@ memtx_tree_index_replace(struct index *base, struct tuple *old_tuple,
 {
 	struct memtx_tree_index *index = (struct memtx_tree_index *)base;
 	if (new_tuple) {
-		struct tuple *dup_tuple = NULL;
+		struct memtx_tree_data new_data;
+		new_data.tuple = new_tuple;
+		struct memtx_tree_data dup_data;
+		dup_data.tuple = NULL;
 
 		/* Try to optimistically replace the new_tuple. */
-		int tree_res = memtx_tree_insert(&index->tree,
-						 new_tuple, &dup_tuple);
+		int tree_res = memtx_tree_insert(&index->tree, new_data,
+						 &dup_data);
 		if (tree_res) {
 			diag_set(OutOfMemory, MEMTX_EXTENT_SIZE,
 				 "memtx_tree_index", "replace");
@@ -518,24 +546,26 @@ memtx_tree_index_replace(struct index *base, struct tuple *old_tuple,
 		}
 
 		uint32_t errcode = replace_check_dup(old_tuple,
-						     dup_tuple, mode);
+						     dup_data.tuple, mode);
 		if (errcode) {
-			memtx_tree_delete(&index->tree, new_tuple);
-			if (dup_tuple)
-				memtx_tree_insert(&index->tree, dup_tuple, 0);
+			memtx_tree_delete(&index->tree, new_data);
+			if (dup_data.tuple != NULL)
+				memtx_tree_insert(&index->tree, dup_data, NULL);
 			struct space *sp = space_cache_find(base->def->space_id);
 			if (sp != NULL)
 				diag_set(ClientError, errcode, base->def->name,
 					 space_name(sp));
 			return -1;
 		}
-		if (dup_tuple) {
-			*result = dup_tuple;
+		if (dup_data.tuple != NULL) {
+			*result = dup_data.tuple;
 			return 0;
 		}
 	}
 	if (old_tuple) {
-		memtx_tree_delete(&index->tree, old_tuple);
+		struct memtx_tree_data old_data;
+		old_data.tuple = old_tuple;
+		memtx_tree_delete(&index->tree, old_data);
 	}
 	*result = old_tuple;
 	return 0;
@@ -580,7 +610,7 @@ memtx_tree_index_create_iterator(struct index *base, enum iterator_type type,
 	it->index_def = base->def;
 	it->tree = &index->tree;
 	it->tree_iterator = memtx_tree_invalid_iterator();
-	it->current_tuple = NULL;
+	it->current.tuple = NULL;
 	return (struct iterator *)it;
 }
 
@@ -598,8 +628,8 @@ memtx_tree_index_reserve(struct index *base, uint32_t size_hint)
 	struct memtx_tree_index *index = (struct memtx_tree_index *)base;
 	if (size_hint < index->build_array_alloc_size)
 		return 0;
-	struct tuple **tmp = (struct tuple **)realloc(index->build_array,
-						      size_hint * sizeof(*tmp));
+	struct memtx_tree_data *tmp =
+		realloc(index->build_array, size_hint * sizeof(*tmp));
 	if (tmp == NULL) {
 		diag_set(OutOfMemory, size_hint * sizeof(*tmp),
 			 "memtx_tree_index", "reserve");
@@ -615,20 +645,20 @@ memtx_tree_index_build_next(struct index *base, struct tuple *tuple)
 {
 	struct memtx_tree_index *index = (struct memtx_tree_index *)base;
 	if (index->build_array == NULL) {
-		index->build_array = (struct tuple **)malloc(MEMTX_EXTENT_SIZE);
+		index->build_array = malloc(MEMTX_EXTENT_SIZE);
 		if (index->build_array == NULL) {
 			diag_set(OutOfMemory, MEMTX_EXTENT_SIZE,
 				 "memtx_tree_index", "build_next");
 			return -1;
 		}
 		index->build_array_alloc_size =
-			MEMTX_EXTENT_SIZE / sizeof(struct tuple*);
+			MEMTX_EXTENT_SIZE / sizeof(index->build_array[0]);
 	}
 	assert(index->build_array_size <= index->build_array_alloc_size);
 	if (index->build_array_size == index->build_array_alloc_size) {
 		index->build_array_alloc_size = index->build_array_alloc_size +
 					index->build_array_alloc_size / 2;
-		struct tuple **tmp = (struct tuple **)
+		struct memtx_tree_data *tmp =
 			realloc(index->build_array,
 				index->build_array_alloc_size * sizeof(*tmp));
 		if (tmp == NULL) {
@@ -638,7 +668,9 @@ memtx_tree_index_build_next(struct index *base, struct tuple *tuple)
 		}
 		index->build_array = tmp;
 	}
-	index->build_array[index->build_array_size++] = tuple;
+	struct memtx_tree_data *elem =
+		&index->build_array[index->build_array_size++];
+	elem->tuple = tuple;
 	return 0;
 }
 
@@ -648,8 +680,7 @@ memtx_tree_index_end_build(struct index *base)
 	struct memtx_tree_index *index = (struct memtx_tree_index *)base;
 	struct key_def *cmp_def = memtx_tree_index_cmp_def(index);
 	qsort_arg(index->build_array, index->build_array_size,
-		  sizeof(struct tuple *),
-		  memtx_tree_qcompare, cmp_def);
+		  sizeof(index->build_array[0]), memtx_tree_qcompare, cmp_def);
 	memtx_tree_build(&index->tree, index->build_array,
 			 index->build_array_size);
 
@@ -682,12 +713,12 @@ tree_snapshot_iterator_next(struct snapshot_iterator *iterator, uint32_t *size)
 	assert(iterator->free == tree_snapshot_iterator_free);
 	struct tree_snapshot_iterator *it =
 		(struct tree_snapshot_iterator *)iterator;
-	struct tuple **res = memtx_tree_iterator_get_elem(it->tree,
-						&it->tree_iterator);
+	struct memtx_tree_data *res =
+		memtx_tree_iterator_get_elem(it->tree, &it->tree_iterator);
 	if (res == NULL)
 		return NULL;
 	memtx_tree_iterator_next(it->tree, &it->tree_iterator);
-	return tuple_data_range(*res, size);
+	return tuple_data_range(res->tuple, size);
 }
 
 /**
-- 
2.21.0



More information about the Tarantool-patches mailing list