Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: tarantool-patches@dev.tarantool.org, olegrok@tarantool.org,
	yaroslav.dynnikov@tarantool.org
Subject: [Tarantool-patches] [PATCH 9/9] util: introduce binary heap data structure
Date: Wed, 10 Feb 2021 00:46:15 +0100
Message-ID: <259e9595aefe7a28af13eb6dd336ea8f145c2112.1612914070.git.v.shpilevoy@tarantool.org> (raw)
In-Reply-To: <cover.1612914070.git.v.shpilevoy@tarantool.org>

Lua does not have a built-in standard library for binary heaps
(also called priority queues). There is an implementation in
Tarantool core in libsalad, but it is in C.

Heap is a perfect storage for the soon coming feature map-reduce.
In the map-reduce algorithm it will be necessary to be able to
lock an entire storage against any bucket moves for time <=
specified timeout. Number of map-reduce requests can be big, and
they can have different timeouts.

So there is a pile of timeouts from different requests. It is
necessary to be able to quickly add new ones, be able to delete
random ones, and remove expired ones.

One way would be a sorted array of the deadlines. Unfortunately,
it is super slow. O(N + log(N)) to add a new element (find place
for log(N) and move all next elements for N), O(N) to delete a
random one (move all next elements one cell left/right).

Another way would be a sorted tree. But trees like RB or a dumb
binary tree require extra steps to keep them balanced and to have
access to the smallest element ASAP.

The best way is the binary heap. It is perfectly balanced by
design meaning that all operations there have complexity at most
O(log(N)). It is possible to find the closest deadline for
constant time as it is the heap's top.

This patch implements it. The heap is intrusive. It means it
stores index of each element right inside of the element as a
field 'index'. Having an index along with each element allows to
delete it from the heap for O(log(N)) without necessity to look
its place up first.

Part of #147
---
 test/unit-tap/heap.test.lua | 310 ++++++++++++++++++++++++++++++++++++
 test/unit-tap/suite.ini     |   4 +
 vshard/heap.lua             | 226 ++++++++++++++++++++++++++
 3 files changed, 540 insertions(+)
 create mode 100755 test/unit-tap/heap.test.lua
 create mode 100644 test/unit-tap/suite.ini
 create mode 100644 vshard/heap.lua

diff --git a/test/unit-tap/heap.test.lua b/test/unit-tap/heap.test.lua
new file mode 100755
index 0000000..8c3819f
--- /dev/null
+++ b/test/unit-tap/heap.test.lua
@@ -0,0 +1,310 @@
+#!/usr/bin/env tarantool
+
+local tap = require('tap')
+local test = tap.test("cfg")
+local heap = require('vshard.heap')
+
+--
+-- Max number of heap to test. Number of iterations in the test
+-- grows as a factorial of this value. At 10 the test becomes
+-- too long already.
+--
+local heap_size = 8
+
+--
+-- Type of the object stored in the intrusive heap.
+--
+local function min_heap_cmp(l, r)
+    return l.value < r.value
+end
+
+local function max_heap_cmp(l, r)
+    return l.value > r.value
+end
+
+local function new_object(value)
+    return {value = value}
+end
+
+local function heap_check_indexes(heap)
+    local count = heap:count()
+    local data = heap.data
+    for i = 1, count do
+        assert(data[i].index == i)
+    end
+end
+
+local function reverse(values, i1, i2)
+    while i1 < i2 do
+        values[i1], values[i2] = values[i2], values[i1]
+        i1 = i1 + 1
+        i2 = i2 - 1
+    end
+end
+
+--
+-- Implementation of std::next_permutation() from C++.
+--
+local function next_permutation(values)
+    local count = #values
+    if count <= 1 then
+        return false
+    end
+    local i = count
+    while true do
+        local j = i
+        i = i - 1
+        if values[i] < values[j] then
+            local k = count
+            while values[i] >= values[k] do
+                k = k - 1
+            end
+            values[i], values[k] = values[k], values[i]
+            reverse(values, j, count)
+            return true
+        end
+        if i == 1 then
+            reverse(values, 1, count)
+            return false
+        end
+    end
+end
+
+local function range(count)
+    local res = {}
+    for i = 1, count do
+        res[i] = i
+    end
+    return res
+end
+
+--
+-- Min heap fill and empty.
+--
+local function test_min_heap_basic(test)
+    test:plan(1)
+
+    local h = heap.new(min_heap_cmp)
+    assert(not h:pop())
+    assert(h:count() == 0)
+    local values = {}
+    for i = 1, heap_size do
+        values[i] = new_object(i)
+    end
+    for counti = 1, heap_size do
+        local indexes = range(counti)
+        repeat
+            for i = 1, counti do
+                h:push(values[indexes[i]])
+            end
+            heap_check_indexes(h)
+            assert(h:count() == counti)
+            for i = 1, counti do
+                assert(h:top() == values[i])
+                assert(h:pop() == values[i])
+                heap_check_indexes(h)
+            end
+            assert(not h:pop())
+            assert(h:count() == 0)
+        until not next_permutation(indexes)
+    end
+
+    test:ok(true, "no asserts")
+end
+
+--
+-- Max heap fill and empty.
+--
+local function test_max_heap_basic(test)
+    test:plan(1)
+
+    local h = heap.new(max_heap_cmp)
+    assert(not h:pop())
+    assert(h:count() == 0)
+    local values = {}
+    for i = 1, heap_size do
+        values[i] = new_object(heap_size - i + 1)
+    end
+    for counti = 1, heap_size do
+        local indexes = range(counti)
+        repeat
+            for i = 1, counti do
+                h:push(values[indexes[i]])
+            end
+            heap_check_indexes(h)
+            assert(h:count() == counti)
+            for i = 1, counti do
+                assert(h:top() == values[i])
+                assert(h:pop() == values[i])
+                heap_check_indexes(h)
+            end
+            assert(not h:pop())
+            assert(h:count() == 0)
+        until not next_permutation(indexes)
+    end
+
+    test:ok(true, "no asserts")
+end
+
+--
+-- Min heap update top element.
+--
+local function test_min_heap_update_top(test)
+    test:plan(1)
+
+    local h = heap.new(min_heap_cmp)
+    for counti = 1, heap_size do
+        local indexes = range(counti)
+        repeat
+            local values = {}
+            for i = 1, counti do
+                values[i] = new_object(0)
+                h:push(values[i])
+            end
+            heap_check_indexes(h)
+            for i = 1, counti do
+                h:top().value = indexes[i]
+                h:update_top()
+            end
+            heap_check_indexes(h)
+            assert(h:count() == counti)
+            for i = 1, counti do
+                assert(h:top().value == i)
+                assert(h:pop().value == i)
+                heap_check_indexes(h)
+            end
+            assert(not h:pop())
+            assert(h:count() == 0)
+        until not next_permutation(indexes)
+    end
+
+    test:ok(true, "no asserts")
+end
+
+--
+-- Min heap update all elements in all possible positions.
+--
+local function test_min_heap_update(test)
+    test:plan(1)
+
+    local h = heap.new(min_heap_cmp)
+    for counti = 1, heap_size do
+        for srci = 1, counti do
+            local endv = srci * 10 + 5
+            for newv = 5, endv, 5 do
+                local values = {}
+                for i = 1, counti do
+                    values[i] = new_object(i * 10)
+                    h:push(values[i])
+                end
+                heap_check_indexes(h)
+                local obj = values[srci]
+                obj.value = newv
+                h:update(obj)
+                assert(obj.index >= 1)
+                assert(obj.index <= counti)
+                local prev = -1
+                for i = 1, counti do
+                    obj = h:pop()
+                    assert(obj.index == -1)
+                    assert(obj.value >= prev)
+                    assert(obj.value >= 1)
+                    prev = obj.value
+                    obj.value = -1
+                    heap_check_indexes(h)
+                end
+                assert(not h:pop())
+                assert(h:count() == 0)
+            end
+        end
+    end
+
+    test:ok(true, "no asserts")
+end
+
+--
+-- Max heap delete all elements from all possible positions.
+--
+local function test_max_heap_delete(test)
+    test:plan(1)
+
+    local h = heap.new(max_heap_cmp)
+    local inf = heap_size + 1
+    for counti = 1, heap_size do
+        for srci = 1, counti do
+            local values = {}
+            for i = 1, counti do
+                values[i] = new_object(i)
+                h:push(values[i])
+            end
+            heap_check_indexes(h)
+            local obj = values[srci]
+            obj.value = inf
+            h:remove(obj)
+            assert(obj.index == -1)
+            local prev = inf
+            for i = 2, counti do
+                obj = h:pop()
+                assert(obj.index == -1)
+                assert(obj.value < prev)
+                assert(obj.value >= 1)
+                prev = obj.value
+                obj.value = -1
+                heap_check_indexes(h)
+            end
+            assert(not h:pop())
+            assert(h:count() == 0)
+        end
+    end
+
+    test:ok(true, "no asserts")
+end
+
+local function test_min_heap_remove_top(test)
+    test:plan(1)
+
+    local h = heap.new(min_heap_cmp)
+    for i = 1, heap_size do
+        h:push(new_object(i))
+    end
+    for i = 1, heap_size do
+        assert(h:top().value == i)
+        h:remove_top()
+    end
+    assert(h:count() == 0)
+
+    test:ok(true, "no asserts")
+end
+
+local function test_max_heap_remove_try(test)
+    test:plan(1)
+
+    local h = heap.new(max_heap_cmp)
+    local obj = new_object(1)
+    assert(obj.index == nil)
+    h:remove_try(obj)
+    assert(h:count() == 0)
+
+    h:push(obj)
+    h:push(new_object(2))
+    assert(obj.index == 2)
+    h:remove(obj)
+    assert(obj.index == -1)
+    h:remove_try(obj)
+    assert(obj.index == -1)
+    assert(h:count() == 1)
+
+    test:ok(true, "no asserts")
+end
+
+test:plan(7)
+
+test:test('min_heap_basic', test_min_heap_basic)
+test:test('max_heap_basic', test_max_heap_basic)
+test:test('min_heap_update_top', test_min_heap_update_top)
+test:test('min heap update', test_min_heap_update)
+test:test('max heap delete', test_max_heap_delete)
+test:test('min heap remove top', test_min_heap_remove_top)
+test:test('max heap remove try', test_max_heap_remove_try)
+
+os.exit(test:check() and 0 or 1)
diff --git a/test/unit-tap/suite.ini b/test/unit-tap/suite.ini
new file mode 100644
index 0000000..f365b69
--- /dev/null
+++ b/test/unit-tap/suite.ini
@@ -0,0 +1,4 @@
+[default]
+core = app
+description = Unit tests TAP
+is_parallel = True
diff --git a/vshard/heap.lua b/vshard/heap.lua
new file mode 100644
index 0000000..78c600a
--- /dev/null
+++ b/vshard/heap.lua
@@ -0,0 +1,226 @@
+local math_floor = math.floor
+
+--
+-- Implementation of a typical algorithm of the binary heap.
+-- The heap is intrusive - it stores index of each element inside of it. It
+-- allows to update and delete elements in any place in the heap, not only top
+-- elements.
+--
+
+local function heap_parent_index(index)
+    return math_floor(index / 2)
+end
+
+local function heap_left_child_index(index)
+    return index * 2
+end
+
+--
+-- Generate a new heap.
+--
+-- The implementation is targeted on as few index accesses as possible.
+-- Everything what could be is stored as upvalue variables instead of as indexes
+-- in a table. What couldn't be an upvalue and is used in a function more than
+-- once is saved on the stack.
+--
+local function heap_new(is_left_above)
+    -- Having it as an upvalue allows not to do 'self.data' lookup in each
+    -- function.
+    local data = {}
+    -- Saves #data calculation. In Lua it is not just reading a number.
+    local count = 0
+
+    local function heap_update_index_up(idx)
+        if idx == 1 then
+            return false
+        end
+
+        local orig_idx = idx
+        local value = data[idx]
+        local pidx = heap_parent_index(idx)
+        local parent = data[pidx]
+        while is_left_above(value, parent) do
+            data[idx] = parent
+            parent.index = idx
+            idx = pidx
+            if idx == 1 then
+                break
+            end
+            pidx = heap_parent_index(idx)
+            parent = data[pidx]
+        end
+
+        if idx == orig_idx then
+            return false
+        end
+        data[idx] = value
+        value.index = idx
+        return true
+    end
+
+    local function heap_update_index_down(idx)
+        local left_idx = heap_left_child_index(idx)
+        if left_idx > count then
+            return false
+        end
+
+        local orig_idx = idx
+        local left
+        local right
+        local right_idx = left_idx + 1
+        local top
+        local top_idx
+        local value = data[idx]
+        repeat
+            right_idx = left_idx + 1
+            if right_idx > count then
+                top = data[left_idx]
+                if is_left_above(value, top) then
+                    break
+                end
+                top_idx = left_idx
+            else
+                left = data[left_idx]
+                right = data[right_idx]
+                if is_left_above(left, right) then
+                    if is_left_above(value, left) then
+                        break
+                    end
+                    top_idx = left_idx
+                    top = left
+                else
+                    if is_left_above(value, right) then
+                        break
+                    end
+                    top_idx = right_idx
+                    top = right
+                end
+            end
+
+            data[idx] = top
+            top.index = idx
+            idx = top_idx
+            left_idx = heap_left_child_index(idx)
+        until left_idx > count
+
+        if idx == orig_idx then
+            return false
+        end
+        data[idx] = value
+        value.index = idx
+        return true
+    end
+
+    local function heap_update_index(idx)
+        if not heap_update_index_up(idx) then
+            heap_update_index_down(idx)
+        end
+    end
+
+    local function heap_push(self, value)
+        count = count + 1
+        data[count] = value
+        value.index = count
+        heap_update_index_up(count)
+    end
+
+    local function heap_update_top(self)
+        heap_update_index_down(1)
+    end
+
+    local function heap_update(self, value)
+        heap_update_index(value.index)
+    end
+
+    local function heap_remove_top(self)
+        if count == 0 then
+            return
+        end
+        data[1].index = -1
+        if count == 1 then
+            data[1] = nil
+            count = 0
+            return
+        end
+        local value = data[count]
+        data[count] = nil
+        data[1] = value
+        value.index = 1
+        count = count - 1
+        heap_update_index_down(1)
+    end
+
+    local function heap_remove(self, value)
+        local idx = value.index
+        value.index = -1
+        if idx == count then
+            data[count] = nil
+            count = count - 1
+            return
+        end
+        value = data[count]
+        data[idx] = value
+        data[count] = nil
+        value.index = idx
+        count = count - 1
+        heap_update_index(idx)
+    end
+
+    local function heap_remove_try(self, value)
+        local idx = value.index
+        if idx and idx > 0 then
+            heap_remove(self, value)
+        end
+    end
+
+    local function heap_pop(self)
+        if count == 0 then
+            return
+        end
+        -- Some duplication from remove_top, but allows to save a few
+        -- condition checks, index accesses, and a function call.
+        local res = data[1]
+        res.index = -1
+        if count == 1 then
+            data[1] = nil
+            count = 0
+            return res
+        end
+        local value = data[count]
+        data[count] = nil
+        data[1] = value
+        value.index = 1
+        count = count - 1
+        heap_update_index_down(1)
+        return res
+    end
+
+    local function heap_top(self)
+        return data[1]
+    end
+
+    local function heap_count(self)
+        return count
+    end
+
+    return setmetatable({
+        -- Expose the data. For testing.
+        data = data,
+    }, {
+        __index = {
+            push = heap_push,
+            update_top = heap_update_top,
+            remove_top = heap_remove_top,
+            pop = heap_pop,
+            update = heap_update,
+            remove = heap_remove,
+            remove_try = heap_remove_try,
+            top = heap_top,
+            count = heap_count,
+        }
+    })
+end
+
+return {
+    new = heap_new,
+}
-- 
2.24.3 (Apple Git-128)


  parent reply	other threads:[~2021-02-09 23:50 UTC|newest]

Thread overview: 36+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-02-09 23:46 [Tarantool-patches] [PATCH 0/9] VShard Map-Reduce, part 1, preparations Vladislav Shpilevoy via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 1/9] rlist: move rlist to a new module Vladislav Shpilevoy via Tarantool-patches
2021-02-10  8:57   ` Oleg Babin via Tarantool-patches
2021-02-11  6:50     ` Oleg Babin via Tarantool-patches
2021-02-12  0:09       ` Vladislav Shpilevoy via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 2/9] Use fiber.clock() instead of .time() everywhere Vladislav Shpilevoy via Tarantool-patches
2021-02-10  8:57   ` Oleg Babin via Tarantool-patches
2021-02-10 22:33     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 3/9] test: introduce a helper to wait for bucket GC Vladislav Shpilevoy via Tarantool-patches
2021-02-10  8:57   ` Oleg Babin via Tarantool-patches
2021-02-10 22:33     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-11  6:50       ` Oleg Babin via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 4/9] storage: bucket_recv() should check rs lock Vladislav Shpilevoy via Tarantool-patches
2021-02-10  8:59   ` Oleg Babin via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 5/9] util: introduce yielding table functions Vladislav Shpilevoy via Tarantool-patches
2021-02-10  8:59   ` Oleg Babin via Tarantool-patches
2021-02-10 22:34     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-11  6:50       ` Oleg Babin via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 6/9] cfg: introduce 'deprecated option' feature Vladislav Shpilevoy via Tarantool-patches
2021-02-10  8:59   ` Oleg Babin via Tarantool-patches
2021-02-10 22:34     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-11  6:50       ` Oleg Babin via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 7/9] gc: introduce reactive garbage collector Vladislav Shpilevoy via Tarantool-patches
2021-02-10  9:00   ` Oleg Babin via Tarantool-patches
2021-02-10 22:35     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-11  6:50       ` Oleg Babin via Tarantool-patches
2021-02-09 23:46 ` [Tarantool-patches] [PATCH 8/9] recovery: introduce reactive recovery Vladislav Shpilevoy via Tarantool-patches
2021-02-10  9:00   ` Oleg Babin via Tarantool-patches
2021-02-09 23:46 ` Vladislav Shpilevoy via Tarantool-patches [this message]
2021-02-10  9:01   ` [Tarantool-patches] [PATCH 9/9] util: introduce binary heap data structure Oleg Babin via Tarantool-patches
2021-02-10 22:36     ` Vladislav Shpilevoy via Tarantool-patches
2021-02-11  6:51       ` Oleg Babin via Tarantool-patches
2021-02-12  0:09         ` Vladislav Shpilevoy via Tarantool-patches
2021-03-05 22:03   ` Vladislav Shpilevoy via Tarantool-patches
2021-02-09 23:51 ` [Tarantool-patches] [PATCH 0/9] VShard Map-Reduce, part 1, preparations Vladislav Shpilevoy via Tarantool-patches
2021-02-12 11:02   ` Oleg Babin via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=259e9595aefe7a28af13eb6dd336ea8f145c2112.1612914070.git.v.shpilevoy@tarantool.org \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=olegrok@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --cc=yaroslav.dynnikov@tarantool.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Tarantool development patches archive

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://lists.tarantool.org/tarantool-patches/0 tarantool-patches/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 tarantool-patches tarantool-patches/ https://lists.tarantool.org/tarantool-patches \
		tarantool-patches@dev.tarantool.org.
	public-inbox-index tarantool-patches

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git