[PATCH] memtx: free tuples asynchronously when primary index is dropped
Konstantin Osipov
kostja at tarantool.org
Mon May 21 00:31:18 MSK 2018
* Vladimir Davydov <vdavydov.dev at gmail.com> [18/05/20 18:07]:
> When a memtx space is dropped or truncated, we have to unreference all
> tuples stored in it. Currently, we do it synchronously, thus blocking
> the tx thread. If a space is big, tx thread may remain blocked for
> several seconds, which is unacceptable. This patch makes drop/truncate
> hand actual work to a background fiber.
>
> Before this patch, drop of a space with 10M 64-byte records took more
> than 0.5 seconds. After this patch, it takes less than 1 millisecond.
>
> Closes #3408
This is a duplicate of https://github.com/tarantool/tarantool/issues/444
It's OK to push.
You can test it either using error injection or by adding metrics.
Choice of the constant - 128 iterations of the iterator loop per
yield - is puzzling. Did you do any math? How much does a fiber
yield cost?
> https://github.com/tarantool/tarantool/issues/3408
> https://github.com/tarantool/tarantool/commits/gh-3408-memtx-async-index-destruction
>
> src/box/memtx_bitset.c | 5 +--
> src/box/memtx_engine.c | 91 ++++++++++++++++++++++----------------------------
> src/box/memtx_engine.h | 40 ++++++++++++++++------
> src/box/memtx_hash.c | 53 ++++++++++++++++++++++++++---
> src/box/memtx_hash.h | 2 ++
> src/box/memtx_rtree.c | 5 +--
> src/box/memtx_tree.c | 53 ++++++++++++++++++++++++++---
> src/box/memtx_tree.h | 1 +
> 8 files changed, 176 insertions(+), 74 deletions(-)
>
> diff --git a/src/box/memtx_bitset.c b/src/box/memtx_bitset.c
> index 7b87560b..2322af37 100644
> --- a/src/box/memtx_bitset.c
> +++ b/src/box/memtx_bitset.c
> @@ -457,9 +457,9 @@ memtx_bitset_index_count(struct index *base, enum iterator_type type,
> static const struct index_vtab memtx_bitset_index_vtab = {
> /* .destroy = */ memtx_bitset_index_destroy,
> /* .commit_create = */ generic_index_commit_create,
> - /* .abort_create = */ memtx_index_abort_create,
> + /* .abort_create = */ generic_index_abort_create,
> /* .commit_modify = */ generic_index_commit_modify,
> - /* .commit_drop = */ memtx_index_commit_drop,
> + /* .commit_drop = */ generic_index_commit_drop,
> /* .update_def = */ generic_index_update_def,
> /* .depends_on_pk = */ generic_index_depends_on_pk,
> /* .def_change_requires_rebuild = */
> @@ -486,6 +486,7 @@ static const struct index_vtab memtx_bitset_index_vtab = {
> struct memtx_bitset_index *
> memtx_bitset_index_new(struct memtx_engine *memtx, struct index_def *def)
> {
> + assert(def->iid > 0);
> assert(!def->opts.is_unique);
>
> memtx_index_arena_init();
> diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
> index 388cc4fe..ee3afaec 100644
> --- a/src/box/memtx_engine.c
> +++ b/src/box/memtx_engine.c
> @@ -35,6 +35,7 @@
> #include <small/small.h>
> #include <small/mempool.h>
>
> +#include "fiber.h"
> #include "coio_file.h"
> #include "tuple.h"
> #include "txn.h"
> @@ -928,6 +929,23 @@ static const struct engine_vtab memtx_engine_vtab = {
> /* .check_space_def = */ memtx_engine_check_space_def,
> };
>
> +static int
> +memtx_engine_gc_f(va_list va)
> +{
> + struct memtx_engine *memtx = va_arg(va, struct memtx_engine *);
> + while (!fiber_is_cancelled()) {
> + if (stailq_empty(&memtx->gc_queue)) {
> + fiber_yield_timeout(TIMEOUT_INFINITY);
> + continue;
> + }
> + struct memtx_gc_task *task;
> + task = stailq_shift_entry(&memtx->gc_queue,
> + struct memtx_gc_task, link);
> + task->func(task);
> + }
> + return 0;
> +}
> +
> struct memtx_engine *
> memtx_engine_new(const char *snap_dirname, bool force_recovery,
> uint64_t tuple_arena_max_size, uint32_t objsize_min,
> @@ -945,18 +963,34 @@ memtx_engine_new(const char *snap_dirname, bool force_recovery,
> xdir_create(&memtx->snap_dir, snap_dirname, SNAP, &INSTANCE_UUID);
> memtx->snap_dir.force_recovery = force_recovery;
>
> - if (xdir_scan(&memtx->snap_dir) != 0) {
> - xdir_destroy(&memtx->snap_dir);
> - free(memtx);
> - return NULL;
> - }
> + if (xdir_scan(&memtx->snap_dir) != 0)
> + goto fail;
> +
> + stailq_create(&memtx->gc_queue);
> + memtx->gc_fiber = fiber_new("memtx.gc", memtx_engine_gc_f);
> + if (memtx->gc_fiber == NULL)
> + goto fail;
>
> memtx->state = MEMTX_INITIALIZED;
> memtx->force_recovery = force_recovery;
>
> memtx->base.vtab = &memtx_engine_vtab;
> memtx->base.name = "memtx";
> +
> + fiber_start(memtx->gc_fiber, memtx);
> return memtx;
> +fail:
> + xdir_destroy(&memtx->snap_dir);
> + free(memtx);
> + return NULL;
> +}
> +
> +void
> +memtx_engine_schedule_gc(struct memtx_engine *memtx,
> + struct memtx_gc_task *task)
> +{
> + stailq_add_tail_entry(&memtx->gc_queue, task, link);
> + fiber_wakeup(memtx->gc_fiber);
> }
>
> void
> @@ -1063,53 +1097,6 @@ memtx_index_extent_reserve(int num)
> return 0;
> }
>
> -static void
> -memtx_index_prune(struct index *index)
> -{
> - if (index->def->iid > 0)
> - return;
> -
> - /*
> - * Tuples stored in a memtx space are referenced by the
> - * primary index so when the primary index is dropped we
> - * should delete them.
> - */
> - struct iterator *it = index_create_iterator(index, ITER_ALL, NULL, 0);
> - if (it == NULL)
> - goto fail;
> - int rc;
> - struct tuple *tuple;
> - while ((rc = iterator_next(it, &tuple)) == 0 && tuple != NULL)
> - tuple_unref(tuple);
> - iterator_delete(it);
> - if (rc != 0)
> - goto fail;
> -
> - return;
> -fail:
> - /*
> - * This function is called after WAL write so we have no
> - * other choice but panic in case of any error. The good
> - * news is memtx iterators do not fail so we should not
> - * normally get here.
> - */
> - diag_log();
> - unreachable();
> - panic("failed to drop index");
> -}
> -
> -void
> -memtx_index_abort_create(struct index *index)
> -{
> - memtx_index_prune(index);
> -}
> -
> -void
> -memtx_index_commit_drop(struct index *index)
> -{
> - memtx_index_prune(index);
> -}
> -
> bool
> memtx_index_def_change_requires_rebuild(struct index *index,
> const struct index_def *new_def)
> diff --git a/src/box/memtx_engine.h b/src/box/memtx_engine.h
> index c1f2652c..2ce597b4 100644
> --- a/src/box/memtx_engine.h
> +++ b/src/box/memtx_engine.h
> @@ -37,12 +37,14 @@
>
> #include "engine.h"
> #include "xlog.h"
> +#include "salad/stailq.h"
>
> #if defined(__cplusplus)
> extern "C" {
> #endif /* defined(__cplusplus) */
>
> struct index;
> +struct fiber;
>
> /**
> * The state of memtx recovery process.
> @@ -101,8 +103,36 @@ struct memtx_engine {
> struct mempool hash_iterator_pool;
> /** Memory pool for bitset index iterator. */
> struct mempool bitset_iterator_pool;
> + /**
> + * Garbage collection fiber. Used for asynchronous
> + * destruction of dropped indexes.
> + */
> + struct fiber *gc_fiber;
> + /**
> + * Scheduled garbage collection tasks, linked by
> + * memtx_gc_task::link.
> + */
> + struct stailq gc_queue;
> };
>
> +struct memtx_gc_task;
> +typedef void (*memtx_gc_func)(struct memtx_gc_task *);
> +
> +/** Garbage collection task. */
> +struct memtx_gc_task {
> + /** Link in memtx_engine::gc_queue. */
> + struct stailq_entry link;
> + /** Function that will perform the task. */
> + memtx_gc_func func;
> +};
> +
> +/**
> + * Schedule a garbage collection task for execution.
> + */
> +void
> +memtx_engine_schedule_gc(struct memtx_engine *memtx,
> + struct memtx_gc_task *task);
> +
> struct memtx_engine *
> memtx_engine_new(const char *snap_dirname, bool force_recovery,
> uint64_t tuple_arena_max_size,
> @@ -151,16 +181,6 @@ memtx_index_extent_free(void *ctx, void *extent);
> int
> memtx_index_extent_reserve(int num);
>
> -/*
> - * The following two methods are used by all kinds of memtx indexes
> - * to delete tuples stored in the space when the primary index is
> - * destroyed.
> - */
> -void
> -memtx_index_abort_create(struct index *index);
> -void
> -memtx_index_commit_drop(struct index *index);
> -
> /**
> * Generic implementation of index_vtab::def_change_requires_rebuild,
> * common for all kinds of memtx indexes.
> diff --git a/src/box/memtx_hash.c b/src/box/memtx_hash.c
> index 24138f42..7e4c0474 100644
> --- a/src/box/memtx_hash.c
> +++ b/src/box/memtx_hash.c
> @@ -129,15 +129,57 @@ hash_iterator_eq(struct iterator *it, struct tuple **ret)
> /* {{{ MemtxHash -- implementation of all hashes. **********************/
>
> static void
> -memtx_hash_index_destroy(struct index *base)
> +memtx_hash_index_free(struct memtx_hash_index *index)
> {
> - struct memtx_hash_index *index = (struct memtx_hash_index *)base;
> light_index_destroy(index->hash_table);
> free(index->hash_table);
> free(index);
> }
>
> static void
> +memtx_hash_index_destroy_f(struct memtx_gc_task *task)
> +{
> + enum { YIELD_LOOPS = 128 };
> +
> + struct memtx_hash_index *index = container_of(task,
> + struct memtx_hash_index, gc_task);
> + struct light_index_core *hash = index->hash_table;
> +
> + struct light_index_iterator itr;
> + light_index_iterator_begin(hash, &itr);
> +
> + struct tuple **res;
> + unsigned int loops = 0;
> + while ((res = light_index_iterator_get_and_next(hash, &itr)) != NULL) {
> + tuple_unref(*res);
> + if (++loops % YIELD_LOOPS == 0)
> + fiber_sleep(0);
> + }
> + memtx_hash_index_free(index);
> +}
> +
> +static void
> +memtx_hash_index_destroy(struct index *base)
> +{
> + struct memtx_hash_index *index = (struct memtx_hash_index *)base;
> + struct memtx_engine *memtx = (struct memtx_engine *)base->engine;
> + if (index->gc_task.func != NULL) {
> + /*
> + * Primary index. We need to free all tuples stored
> + * in the index, which may take a while. Schedule a
> + * background task in order not to block tx thread.
> + */
> + memtx_engine_schedule_gc(memtx, &index->gc_task);
> + } else {
> + /*
> + * Secondary index. Destruction is fast, no need to
> + * hand over to background fiber.
> + */
> + memtx_hash_index_free(index);
> + }
> +}
> +
> +static void
> memtx_hash_index_update_def(struct index *base)
> {
> struct memtx_hash_index *index = (struct memtx_hash_index *)base;
> @@ -381,9 +423,9 @@ memtx_hash_index_create_snapshot_iterator(struct index *base)
> static const struct index_vtab memtx_hash_index_vtab = {
> /* .destroy = */ memtx_hash_index_destroy,
> /* .commit_create = */ generic_index_commit_create,
> - /* .abort_create = */ memtx_index_abort_create,
> + /* .abort_create = */ generic_index_abort_create,
> /* .commit_modify = */ generic_index_commit_modify,
> - /* .commit_drop = */ memtx_index_commit_drop,
> + /* .commit_drop = */ generic_index_commit_drop,
> /* .update_def = */ memtx_hash_index_update_def,
> /* .depends_on_pk = */ generic_index_depends_on_pk,
> /* .def_change_requires_rebuild = */
> @@ -443,6 +485,9 @@ memtx_hash_index_new(struct memtx_engine *memtx, struct index_def *def)
> memtx_index_extent_alloc, memtx_index_extent_free,
> NULL, index->base.def->key_def);
> index->hash_table = hash_table;
> +
> + if (def->iid == 0)
> + index->gc_task.func = memtx_hash_index_destroy_f;
> return index;
> }
>
> diff --git a/src/box/memtx_hash.h b/src/box/memtx_hash.h
> index bdadbd1e..99001079 100644
> --- a/src/box/memtx_hash.h
> +++ b/src/box/memtx_hash.h
> @@ -31,6 +31,7 @@
> * SUCH DAMAGE.
> */
> #include "index.h"
> +#include "memtx_engine.h"
>
> #if defined(__cplusplus)
> extern "C" {
> @@ -42,6 +43,7 @@ struct light_index_core;
> struct memtx_hash_index {
> struct index base;
> struct light_index_core *hash_table;
> + struct memtx_gc_task gc_task;
> };
>
> struct memtx_hash_index *
> diff --git a/src/box/memtx_rtree.c b/src/box/memtx_rtree.c
> index 653343a7..bcf8e4d4 100644
> --- a/src/box/memtx_rtree.c
> +++ b/src/box/memtx_rtree.c
> @@ -301,9 +301,9 @@ memtx_rtree_index_create_iterator(struct index *base, enum iterator_type type,
> static const struct index_vtab memtx_rtree_index_vtab = {
> /* .destroy = */ memtx_rtree_index_destroy,
> /* .commit_create = */ generic_index_commit_create,
> - /* .abort_create = */ memtx_index_abort_create,
> + /* .abort_create = */ generic_index_abort_create,
> /* .commit_modify = */ generic_index_commit_modify,
> - /* .commit_drop = */ memtx_index_commit_drop,
> + /* .commit_drop = */ generic_index_commit_drop,
> /* .update_def = */ generic_index_update_def,
> /* .depends_on_pk = */ generic_index_depends_on_pk,
> /* .def_change_requires_rebuild = */
> @@ -330,6 +330,7 @@ static const struct index_vtab memtx_rtree_index_vtab = {
> struct memtx_rtree_index *
> memtx_rtree_index_new(struct memtx_engine *memtx, struct index_def *def)
> {
> + assert(def->iid > 0);
> assert(def->key_def->part_count == 1);
> assert(def->key_def->parts[0].type == FIELD_TYPE_ARRAY);
> assert(def->opts.is_unique == false);
> diff --git a/src/box/memtx_tree.c b/src/box/memtx_tree.c
> index 073006b4..97452c5f 100644
> --- a/src/box/memtx_tree.c
> +++ b/src/box/memtx_tree.c
> @@ -301,15 +301,57 @@ memtx_tree_index_cmp_def(struct memtx_tree_index *index)
> }
>
> static void
> -memtx_tree_index_destroy(struct index *base)
> +memtx_tree_index_free(struct memtx_tree_index *index)
> {
> - struct memtx_tree_index *index = (struct memtx_tree_index *)base;
> memtx_tree_destroy(&index->tree);
> free(index->build_array);
> free(index);
> }
>
> static void
> +memtx_tree_index_destroy_f(struct memtx_gc_task *task)
> +{
> + enum { YIELD_LOOPS = 128 };
> +
> + struct memtx_tree_index *index = container_of(task,
> + struct memtx_tree_index, gc_task);
> + struct memtx_tree *tree = &index->tree;
> +
> + unsigned int loops = 0;
> + struct memtx_tree_iterator itr;
> + for (itr = memtx_tree_iterator_first(tree);
> + !memtx_tree_iterator_is_invalid(&itr);
> + memtx_tree_iterator_next(tree, &itr)) {
> + struct tuple *tuple = *memtx_tree_iterator_get_elem(tree, &itr);
> + tuple_unref(tuple);
> + if (++loops % YIELD_LOOPS == 0)
> + fiber_sleep(0);
> + }
> + memtx_tree_index_free(index);
> +}
> +
> +static void
> +memtx_tree_index_destroy(struct index *base)
> +{
> + struct memtx_tree_index *index = (struct memtx_tree_index *)base;
> + struct memtx_engine *memtx = (struct memtx_engine *)base->engine;
> + if (index->gc_task.func != NULL) {
> + /*
> + * Primary index. We need to free all tuples stored
> + * in the index, which may take a while. Schedule a
> + * background task in order not to block tx thread.
> + */
> + memtx_engine_schedule_gc(memtx, &index->gc_task);
> + } else {
> + /*
> + * Secondary index. Destruction is fast, no need to
> + * hand over to background fiber.
> + */
> + memtx_tree_index_free(index);
> + }
> +}
> +
> +static void
> memtx_tree_index_update_def(struct index *base)
> {
> struct memtx_tree_index *index = (struct memtx_tree_index *)base;
> @@ -590,9 +632,9 @@ memtx_tree_index_create_snapshot_iterator(struct index *base)
> static const struct index_vtab memtx_tree_index_vtab = {
> /* .destroy = */ memtx_tree_index_destroy,
> /* .commit_create = */ generic_index_commit_create,
> - /* .abort_create = */ memtx_index_abort_create,
> + /* .abort_create = */ generic_index_abort_create,
> /* .commit_modify = */ generic_index_commit_modify,
> - /* .commit_drop = */ memtx_index_commit_drop,
> + /* .commit_drop = */ generic_index_commit_drop,
> /* .update_def = */ memtx_tree_index_update_def,
> /* .depends_on_pk = */ memtx_tree_index_depends_on_pk,
> /* .def_change_requires_rebuild = */
> @@ -643,5 +685,8 @@ memtx_tree_index_new(struct memtx_engine *memtx, struct index_def *def)
> memtx_tree_create(&index->tree, cmp_def,
> memtx_index_extent_alloc,
> memtx_index_extent_free, NULL);
> +
> + if (def->iid == 0)
> + index->gc_task.func = memtx_tree_index_destroy_f;
> return index;
> }
> diff --git a/src/box/memtx_tree.h b/src/box/memtx_tree.h
> index 28b6fbd6..2e7acde2 100644
> --- a/src/box/memtx_tree.h
> +++ b/src/box/memtx_tree.h
> @@ -97,6 +97,7 @@ struct memtx_tree_index {
> struct memtx_tree tree;
> struct tuple **build_array;
> size_t build_array_size, build_array_alloc_size;
> + struct memtx_gc_task gc_task;
> };
>
> struct memtx_tree_index *
> --
> 2.11.0
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
More information about the Tarantool-patches
mailing list