[tarantool-patches] Re: [PATCH v4 4/4] box: introduce functional indexes

Kirill Shcherbatov kshcherbatov at tarantool.org
Thu Jul 25 14:18:13 MSK 2019


Kostya, consider this API please:

==================================================

diff --git a/src/box/key_list.h b/src/box/key_list.h
new file mode 100644
index 000000000..3208f8d48
--- /dev/null
+++ b/src/box/key_list.h
@@ -0,0 +1,102 @@
+#ifndef TARANTOOL_BOX_KEY_LIST_H_INCLUDED
+#define TARANTOOL_BOX_KEY_LIST_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <stdbool.h>
+#include <inttypes.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct key_def;
+struct tuple;
+
+/**
+ * Function to prepare a value returned by
+ * key_list_iterator_next method.
+ */
+typedef void *(*key_allocator_t)(struct tuple *tuple, const char *key,
+				 uint32_t key_sz);
+
+/**
+ * An iterator to iterate over the key_data returned by function
+ * and validate it with given key definition (when required).
+ */
+struct key_list_iterator {
+	/** The ancestor tuple. */
+	struct tuple *tuple;
+	/** The pointer to currently processed key. */
+	const char *data;
+	/** The pointer to the end of extracted key_data. */
+	const char *data_end;
+	/**
+	 * The sequential functional index key definition that
+	 * describes a format of functional index function keys.
+	 */
+	struct key_def *key_def;
+	/** Whether iterator must validate processed keys. */
+	bool validate;
+	/** The method to allocate returned keys. */
+	key_allocator_t key_allocator;
+};
+
+/**
+ * Initialize a new functional index function returned keys
+ * iterator.
+ * Execute a function specified in a given functional index key
+ * definition (a functional index function) and initialize a new
+ * iterator on MsgPack array of with keys. Each key is a MsgPack
+ * array as well.
+ *
+ * Returns 0 in case of success, -1 otherwise.
+ * Uses fiber region to allocate memory.
+ */
+int
+key_list_iterator_create(struct key_list_iterator *it, struct tuple *tuple,
+			 struct key_def *key_def, bool validate,
+			 key_allocator_t key_allocator, uint32_t *key_count);
+
+/**
+ * Perform key iterator step and update iterator state.
+ * Update key pointer with an actual key.
+ *
+ * Returns 0 on success. In case of error returns -1 and sets
+ * the corresponding diag message.
+ */
+int
+key_list_iterator_next(struct key_list_iterator *it, void **value);
+
+#ifdef __cplusplus
+} /* extern "C" */
+#endif
+
+#endif /* TARANTOOL_BOX_KEY_LIST_H_INCLUDED */

==================================================

diff --git a/src/box/key_list.c b/src/box/key_list.c
new file mode 100644
index 000000000..004aee9fe
--- /dev/null
+++ b/src/box/key_list.c
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "key_list.h"
+
+#include "errcode.h"
+#include "diag.h"
+#include "func.h"
+#include "fiber.h"
+#include "key_def.h"
+#include "port.h"
+#include "tt_static.h"
+#include "tuple.h"
+#include "tuple_compare.h"
+
+int
+key_list_iterator_create(struct key_list_iterator *it, struct tuple *tuple,
+			 struct key_def *key_def, bool validate,
+			 key_allocator_t key_allocator, uint32_t *key_count)
+{
+	it->key_def = key_def;
+	it->validate = validate;
+	it->tuple = tuple;
+	it->key_allocator = key_allocator;
+
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+	struct func *func = key_def->func_index_func;
+
+	struct port out_port, in_port;
+	port_tuple_create(&in_port);
+	port_tuple_add(&in_port, tuple);
+	int rc = func_call(func, &in_port, &out_port);
+	port_destroy(&in_port);
+	if (rc != 0)
+		goto error;
+	uint32_t key_data_sz;
+	const char *key_data = port_get_msgpack(&out_port, &key_data_sz);
+	port_destroy(&out_port);
+	if (key_data == NULL)
+		goto error;
+
+	it->data_end = key_data + key_data_sz;
+	assert(mp_typeof(*key_data) == MP_ARRAY);
+	if (mp_decode_array(&key_data) != 1) {
+		diag_set(ClientError, ER_FUNC_INDEX_FUNC,
+			 func->def->name, "to many values were returned");
+		goto error;
+	}
+	if (key_def->is_multikey) {
+		if (mp_typeof(*key_data) != MP_ARRAY) {
+			diag_set(ClientError, ER_FUNC_INDEX_FUNC,
+				 func->def->name,
+				 "multikey function mustn't return scalar");
+			goto error;
+		}
+		*key_count = mp_decode_array(&key_data);
+	} else {
+		*key_count = 1;
+	}
+	it->data = key_data;
+	return 0;
+error:
+	region_truncate(region, region_svp);
+	diag_set(ClientError, ER_FUNC_INDEX_FUNC, func->def->name,
+		 diag_last_error(diag_get())->errmsg);
+	return -1;
+}
+
+int
+key_list_iterator_next(struct key_list_iterator *it, void **value)
+{
+	assert(it->data <= it->data_end);
+	if (it->data == it->data_end) {
+		*value = NULL;
+		return 0;
+	}
+	const char *key = it->data;
+	if (!it->validate) {
+		mp_next(&it->data);
+		assert(it->data <= it->data_end);
+		*value = it->key_allocator(it->tuple, key, it->data - key);
+		return *value != NULL ? 0 : -1;
+	}
+
+	if (mp_typeof(*it->data) != MP_ARRAY) {
+		diag_set(ClientError, ER_FUNC_INDEX_FUNC,
+			 it->key_def->func_index_func->def->name,
+			 "returned key type is invalid");
+		return -1;
+	}
+	const char *rptr = key;
+	uint32_t part_count = mp_decode_array(&rptr);
+	if (part_count != it->key_def->part_count_for_func_index) {
+		const char *error_msg =
+			tt_sprintf(tnt_errcode_desc(ER_EXACT_MATCH),
+				   it->key_def->part_count_for_func_index,
+				   part_count);
+		diag_set(ClientError, ER_FUNC_INDEX_FUNC,
+			 it->key_def->func_index_func->def->name, error_msg);
+		return -1;
+	}
+	const char *key_end;
+	if (key_validate_parts(it->key_def, rptr, part_count, true,
+			       &key_end) != 0) {
+		diag_set(ClientError, ER_FUNC_INDEX_FUNC,
+			 it->key_def->func_index_func->def->name,
+			 diag_last_error(diag_get())->errmsg);
+		return -1;
+	}
+
+	it->data = key_end;
+	*value = it->key_allocator(it->tuple, key, key_end - key);
+	return *value != NULL ? 0 : -1;
+}

==================================================

diff --git a/src/box/memtx_tree.c b/src/box/memtx_tree.c
index a9c1871db..77fcaf839 100644
--- a/src/box/memtx_tree.c
+++ b/src/box/memtx_tree.c
@@ -35,6 +35,7 @@
 #include "errinj.h"
 #include "memory.h"
 #include "fiber.h"
+#include "key_list.h"
 #include "tuple.h"
 #include <third_party/qsort_arg.h>
 #include <small/mempool.h>
@@ -769,6 +770,201 @@ memtx_tree_index_replace_multikey(struct index *base, struct tuple *old_tuple,
 	return 0;
 }
 
+static void *
+key_hint_plain(struct tuple *tuple, const char *key, uint32_t key_sz)
+{
+	(void)tuple; (void)key_sz;
+	return (void *)key;
+}
+
+static void *
+key_hint_new(struct tuple *tuple, const char *key, uint32_t key_sz)
+{
+	struct tuple_chunk *chunk = tuple_chunk_new(tuple, key_sz);
+	if (chunk == NULL)
+		return NULL;
+	memcpy(chunk->data, key, key_sz);
+	return chunk->data;
+}
+
+static void
+key_hint_delete(struct tuple *tuple, hint_t hint)
+{
+	struct tuple_chunk *chunk =
+		container_of((typeof(chunk->data) *)hint,
+			     struct tuple_chunk, data);
+	tuple_chunk_delete(tuple, chunk);
+}
+
+/** Release memory allocated for key_hint. */
+static void
+key_hint_destroy(struct memtx_tree_data *data)
+{
+	key_hint_delete(data->tuple, data->hint);
+}
+
+/**
+ * The journal for multikey functional index replace operation
+ * is required to rollback an incomplete action, restore the
+ * original key_hint(s) hints both as to commit a completed
+ * replace action and destruct useless key_hint(s).
+*/
+struct journal_entry {
+	/** An inserted record copy. */
+	struct memtx_tree_data inserted;
+	/** A replaced record copy. */
+	struct memtx_tree_data replaced;
+};
+
+/**
+ * Rollback a sequence of memtx_tree_index_replace_multikey_one
+ * insertions for functional index. Routine uses given journal
+ * to return given index object in it's original state.
+ */
+static void
+memtx_tree_index_replace_functional_rollback(struct memtx_tree_index *index,
+						struct journal_entry *journal,
+						int journal_sz)
+{
+	for (int i = 0; i < journal_sz; i++) {
+		if (journal[i].replaced.tuple != NULL) {
+			memtx_tree_insert(&index->tree, journal[i].replaced,
+					  NULL);
+		} else {
+			memtx_tree_delete_value(&index->tree,
+						journal[i].inserted, NULL);
+		}
+		key_hint_destroy(&journal[i].inserted);
+	}
+}
+
+/**
+ * Commit a sequence of memtx_tree_index_replace_multikey_one
+ * insertions for functional index. Rotine uses given operations
+ * journal to release unused memory.
+ */
+static void
+memtx_tree_index_replace_functional_commit(struct memtx_tree_index *index,
+						struct journal_entry *journal,
+						int journal_sz)
+{
+	(void) index;
+	for (int i = 0; i < journal_sz; i++) {
+		if (journal[i].replaced.tuple == NULL)
+			continue;
+		key_hint_destroy(&journal[i].replaced);
+	}
+}
+
+/**
+ * @sa memtx_tree_index_replace_multikey().
+ * Use functional index function from the key definition
+ * to build a key list. Then each returned key is reallocated in
+ * engine's memory as key_hint object and is used as comparison
+ * hint.
+ * To control key_hint(s) life cycle in case of functional
+ * index we use a tiny journal object is allocated on region.
+ * It allows to restore original nodes with their original
+ * key_hint(s) pointers in case of failure and release
+ * useless hints of replaced items in case of success.
+ */
+static int
+memtx_tree_index_replace_functional(struct index *base, struct tuple *old_tuple,
+			struct tuple *new_tuple, enum dup_replace_mode mode,
+			struct tuple **result)
+{
+	struct memtx_tree_index *index = (struct memtx_tree_index *)base;
+	struct key_def *cmp_def = memtx_tree_cmp_def(&index->tree);
+	assert(key_def_is_for_func_index(cmp_def));
+
+	int rc = -1;
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+
+	*result = NULL;
+	uint32_t key_cnt;
+	struct key_list_iterator it;
+	if (new_tuple != NULL) {
+		if (key_list_iterator_create(&it, new_tuple, cmp_def, true,
+					     key_hint_new, &key_cnt) != 0)
+			goto end;
+
+		int journal_idx = 0;
+		struct journal_entry *journal =
+			region_alloc(region, key_cnt * sizeof(*journal));
+		if (journal == NULL) {
+			diag_set(OutOfMemory, key_cnt * sizeof(*journal),
+				 "region", "journal");
+			goto end;
+		}
+
+		int err = 0;
+		void *key;
+		while ((err = key_list_iterator_next(&it, &key)) == 0 && key != NULL) {
+			/* Perform insertion, log it in journal. */
+			bool is_multikey_conflict;
+			journal[journal_idx].replaced.tuple = NULL;
+			journal[journal_idx].inserted.tuple = new_tuple;
+			journal[journal_idx].inserted.hint = (hint_t)key;
+			err = memtx_tree_index_replace_multikey_one(index,
+						old_tuple, new_tuple, mode,
+						(hint_t)key,
+						&journal[journal_idx].replaced,
+						&is_multikey_conflict);
+			if (err != 0)
+				break;
+			/**
+			 * Modify a 'replace' record of journal
+			 * because an original node shouldn't be
+			 * restored in case of multikey conflict.
+			 */
+			if (is_multikey_conflict)
+				journal[journal_idx].replaced.tuple = NULL;
+			else if (journal[journal_idx].replaced.tuple != NULL)
+				*result = journal[journal_idx].replaced.tuple;
+
+			++journal_idx;
+		}
+		if (key != NULL || err != 0) {
+			memtx_tree_index_replace_functional_rollback(index,
+							journal, journal_idx);
+			goto end;
+		}
+		if (*result != NULL) {
+			assert(old_tuple == NULL || old_tuple == *result);
+			old_tuple = *result;
+		}
+		memtx_tree_index_replace_functional_commit(index,
+						journal, journal_idx);
+	}
+	if (old_tuple != NULL) {
+		if (key_list_iterator_create(&it, old_tuple, cmp_def, false,
+					     key_hint_plain, &key_cnt) != 0)
+			goto end;
+		struct memtx_tree_data data, deleted_data;
+		data.tuple = old_tuple;
+		void *key;
+		while (key_list_iterator_next(&it, &key) == 0 && key != NULL) {
+			data.hint = (hint_t) key;
+			deleted_data.tuple = NULL;
+			memtx_tree_delete_value(&index->tree, data,
+						&deleted_data);
+			if (deleted_data.tuple != NULL) {
+				/*
+				 * Release related hint on
+				 * successfull node deletion.
+				 */
+				key_hint_destroy(&deleted_data);
+			}
+		}
+		assert(key == NULL);
+	}
+	rc = 0;
+end:
+	region_truncate(region, region_svp);
+	return rc;
+}
+
 static struct iterator *
 memtx_tree_index_create_iterator(struct index *base, enum iterator_type type,
 				 const char *key, uint32_t part_count)
@@ -900,13 +1096,47 @@ memtx_tree_index_build_next_multikey(struct index *base, struct tuple *tuple)
 	return 0;
 }
 
+static int
+memtx_tree_index_build_next_functional(struct index *base, struct tuple *tuple)
+{
+	struct memtx_tree_index *index = (struct memtx_tree_index *)base;
+	struct key_def *cmp_def = memtx_tree_cmp_def(&index->tree);
+	assert(key_def_is_for_func_index(cmp_def));
+
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+
+	uint32_t key_cnt;
+	struct key_list_iterator it;
+	if (key_list_iterator_create(&it, tuple, cmp_def, false, key_hint_new,
+				     &key_cnt) != 0)
+		return -1;
+
+	void *key;
+	uint32_t insert_idx = index->build_array_size;
+	while (key_list_iterator_next(&it, &key) == 0 && key != NULL) {
+		if (memtx_tree_index_build_array_append(index, tuple,
+							(hint_t)key) != 0)
+			goto error;
+	}
+	assert(key == NULL);
+	region_truncate(region, region_svp);
+	return 0;
+error:
+	for (uint32_t i = insert_idx; i < index->build_array_size; i++)
+		key_hint_destroy(&index->build_array[i]);
+	region_truncate(region, region_svp);
+	return -1;
+}
+
 /**
  * Process build_array of specified index and remove duplicates
  * of equal tuples (in terms of index's cmp_def and have same
  * tuple pointer). The build_array is expected to be sorted.
  */
 static void
-memtx_tree_index_build_array_deduplicate(struct memtx_tree_index *index)
+memtx_tree_index_build_array_deduplicate(struct memtx_tree_index *index,
+			void (*data_destructor)(struct memtx_tree_data *data))
 {
 	if (index->build_array_size == 0)
 		return;
@@ -924,10 +1154,18 @@ memtx_tree_index_build_array_deduplicate(struct memtx_tree_index *index)
 			if (++w_idx == r_idx)
 				continue;
 			index->build_array[w_idx] = index->build_array[r_idx];
+			if (data_destructor == NULL)
+				continue;
+			for (size_t i = w_idx + 1; i < r_idx; i++)
+				data_destructor(&index->build_array[i]);
 		}
 		r_idx++;
 	}
 	index->build_array_size = w_idx + 1;
+	if (data_destructor != NULL) {
+		for (size_t i = index->build_array_size + 1; i < r_idx; i++)
+			data_destructor(&index->build_array[i]);
+	}
 }
 
 static void
@@ -945,7 +1183,9 @@ memtx_tree_index_end_build(struct index *base)
 		 * the following memtx_tree_build assumes that
 		 * all keys are unique.
 		 */
-		memtx_tree_index_build_array_deduplicate(index);
+		memtx_tree_index_build_array_deduplicate(index, NULL);
+	} else if (key_def_is_for_func_index(cmp_def)) {
+		memtx_tree_index_build_array_deduplicate(index, key_hint_destroy);
 	}
 	memtx_tree_build(&index->tree, index->build_array,
 			 index->build_array_size);
@@ -1072,6 +1312,72 @@ static const struct index_vtab memtx_tree_index_multikey_vtab = {
 	/* .end_build = */ memtx_tree_index_end_build,
 };
 
+static const struct index_vtab memtx_tree_index_functional_vtab = {
+	/* .destroy = */ memtx_tree_index_destroy,
+	/* .commit_create = */ generic_index_commit_create,
+	/* .abort_create = */ generic_index_abort_create,
+	/* .commit_modify = */ generic_index_commit_modify,
+	/* .commit_drop = */ generic_index_commit_drop,
+	/* .update_def = */ memtx_tree_index_update_def,
+	/* .depends_on_pk = */ memtx_tree_index_depends_on_pk,
+	/* .def_change_requires_rebuild = */
+		memtx_index_def_change_requires_rebuild,
+	/* .size = */ memtx_tree_index_size,
+	/* .bsize = */ memtx_tree_index_bsize,
+	/* .min = */ generic_index_min,
+	/* .max = */ generic_index_max,
+	/* .random = */ memtx_tree_index_random,
+	/* .count = */ memtx_tree_index_count,
+	/* .get = */ memtx_tree_index_get,
+	/* .replace = */ memtx_tree_index_replace_functional,
+	/* .create_iterator = */ memtx_tree_index_create_iterator,
+	/* .create_snapshot_iterator = */
+		memtx_tree_index_create_snapshot_iterator,
+	/* .stat = */ generic_index_stat,
+	/* .compact = */ generic_index_compact,
+	/* .reset_stat = */ generic_index_reset_stat,
+	/* .begin_build = */ memtx_tree_index_begin_build,
+	/* .reserve = */ memtx_tree_index_reserve,
+	/* .build_next = */ memtx_tree_index_build_next_functional,
+	/* .end_build = */ memtx_tree_index_end_build,
+};
+
+/**
+ * A disabled index vtab provides safe dummy methods for
+ * 'inactive' index. It is required to perform a fault-tolerant
+ * recovery from snapshoot in case of functional index (because
+ * key defintion is not completely initialized at that moment).
+ */
+static const struct index_vtab memtx_tree_index_disabled_vtab = {
+	/* .destroy = */ memtx_tree_index_destroy,
+	/* .commit_create = */ generic_index_commit_create,
+	/* .abort_create = */ generic_index_abort_create,
+	/* .commit_modify = */ generic_index_commit_modify,
+	/* .commit_drop = */ generic_index_commit_drop,
+	/* .update_def = */ generic_index_update_def,
+	/* .depends_on_pk = */ generic_index_depends_on_pk,
+	/* .def_change_requires_rebuild = */
+		generic_index_def_change_requires_rebuild,
+	/* .size = */ generic_index_size,
+	/* .bsize = */ generic_index_bsize,
+	/* .min = */ generic_index_min,
+	/* .max = */ generic_index_max,
+	/* .random = */ generic_index_random,
+	/* .count = */ generic_index_count,
+	/* .get = */ generic_index_get,
+	/* .replace = */ disabled_index_replace,
+	/* .create_iterator = */ generic_index_create_iterator,
+	/* .create_snapshot_iterator = */
+		generic_index_create_snapshot_iterator,
+	/* .stat = */ generic_index_stat,
+	/* .compact = */ generic_index_compact,
+	/* .reset_stat = */ generic_index_reset_stat,
+	/* .begin_build = */ generic_index_begin_build,
+	/* .reserve = */ generic_index_reserve,
+	/* .build_next = */ disabled_index_build_next,
+	/* .end_build = */ generic_index_end_build,
+};
+
 struct index *
 memtx_tree_index_new(struct memtx_engine *memtx, struct index_def *def)
 {
@@ -1082,9 +1388,17 @@ memtx_tree_index_new(struct memtx_engine *memtx, struct index_def *def)
 			 "malloc", "struct memtx_tree_index");
 		return NULL;
 	}
-	const struct index_vtab *vtab = def->key_def->is_multikey ?
-					&memtx_tree_index_multikey_vtab :
-					&memtx_tree_index_vtab;
+	const struct index_vtab *vtab;
+	if (key_def_is_for_func_index(def->key_def)) {
+		if (def->key_def->func_index_func == NULL)
+			vtab = &memtx_tree_index_disabled_vtab;
+		else
+			vtab = &memtx_tree_index_functional_vtab;
+	} else if (def->key_def->is_multikey) {
+		vtab = &memtx_tree_index_multikey_vtab;
+	} else {
+		vtab = &memtx_tree_index_vtab;
+	}
 	if (index_create(&index->base, (struct engine *)memtx,
 			 vtab, def) != 0) {
 		free(index);



More information about the Tarantool-patches mailing list