[Tarantool-patches] [PATCH 9/9] util: introduce binary heap data structure

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Feb 10 02:46:15 MSK 2021


Lua does not have a built-in standard library for binary heaps
(also called priority queues). There is an implementation in
Tarantool core in libsalad, but it is in C.

Heap is a perfect storage for the soon coming feature map-reduce.
In the map-reduce algorithm it will be necessary to be able to
lock an entire storage against any bucket moves for time <=
specified timeout. Number of map-reduce requests can be big, and
they can have different timeouts.

So there is a pile of timeouts from different requests. It is
necessary to be able to quickly add new ones, be able to delete
random ones, and remove expired ones.

One way would be a sorted array of the deadlines. Unfortunately,
it is super slow. O(N + log(N)) to add a new element (find place
for log(N) and move all next elements for N), O(N) to delete a
random one (move all next elements one cell left/right).

Another way would be a sorted tree. But trees like RB or a dumb
binary tree require extra steps to keep them balanced and to have
access to the smallest element ASAP.

The best way is the binary heap. It is perfectly balanced by
design meaning that all operations there have complexity at most
O(log(N)). It is possible to find the closest deadline for
constant time as it is the heap's top.

This patch implements it. The heap is intrusive. It means it
stores index of each element right inside of the element as a
field 'index'. Having an index along with each element allows to
delete it from the heap for O(log(N)) without necessity to look
its place up first.

Part of #147
---
 test/unit-tap/heap.test.lua | 310 ++++++++++++++++++++++++++++++++++++
 test/unit-tap/suite.ini     |   4 +
 vshard/heap.lua             | 226 ++++++++++++++++++++++++++
 3 files changed, 540 insertions(+)
 create mode 100755 test/unit-tap/heap.test.lua
 create mode 100644 test/unit-tap/suite.ini
 create mode 100644 vshard/heap.lua

diff --git a/test/unit-tap/heap.test.lua b/test/unit-tap/heap.test.lua
new file mode 100755
index 0000000..8c3819f
--- /dev/null
+++ b/test/unit-tap/heap.test.lua
@@ -0,0 +1,310 @@
+#!/usr/bin/env tarantool
+
+local tap = require('tap')
+local test = tap.test("cfg")
+local heap = require('vshard.heap')
+
+--
+-- Max number of heap to test. Number of iterations in the test
+-- grows as a factorial of this value. At 10 the test becomes
+-- too long already.
+--
+local heap_size = 8
+
+--
+-- Type of the object stored in the intrusive heap.
+--
+local function min_heap_cmp(l, r)
+    return l.value < r.value
+end
+
+local function max_heap_cmp(l, r)
+    return l.value > r.value
+end
+
+local function new_object(value)
+    return {value = value}
+end
+
+local function heap_check_indexes(heap)
+    local count = heap:count()
+    local data = heap.data
+    for i = 1, count do
+        assert(data[i].index == i)
+    end
+end
+
+local function reverse(values, i1, i2)
+    while i1 < i2 do
+        values[i1], values[i2] = values[i2], values[i1]
+        i1 = i1 + 1
+        i2 = i2 - 1
+    end
+end
+
+--
+-- Implementation of std::next_permutation() from C++.
+--
+local function next_permutation(values)
+    local count = #values
+    if count <= 1 then
+        return false
+    end
+    local i = count
+    while true do
+        local j = i
+        i = i - 1
+        if values[i] < values[j] then
+            local k = count
+            while values[i] >= values[k] do
+                k = k - 1
+            end
+            values[i], values[k] = values[k], values[i]
+            reverse(values, j, count)
+            return true
+        end
+        if i == 1 then
+            reverse(values, 1, count)
+            return false
+        end
+    end
+end
+
+local function range(count)
+    local res = {}
+    for i = 1, count do
+        res[i] = i
+    end
+    return res
+end
+
+--
+-- Min heap fill and empty.
+--
+local function test_min_heap_basic(test)
+    test:plan(1)
+
+    local h = heap.new(min_heap_cmp)
+    assert(not h:pop())
+    assert(h:count() == 0)
+    local values = {}
+    for i = 1, heap_size do
+        values[i] = new_object(i)
+    end
+    for counti = 1, heap_size do
+        local indexes = range(counti)
+        repeat
+            for i = 1, counti do
+                h:push(values[indexes[i]])
+            end
+            heap_check_indexes(h)
+            assert(h:count() == counti)
+            for i = 1, counti do
+                assert(h:top() == values[i])
+                assert(h:pop() == values[i])
+                heap_check_indexes(h)
+            end
+            assert(not h:pop())
+            assert(h:count() == 0)
+        until not next_permutation(indexes)
+    end
+
+    test:ok(true, "no asserts")
+end
+
+--
+-- Max heap fill and empty.
+--
+local function test_max_heap_basic(test)
+    test:plan(1)
+
+    local h = heap.new(max_heap_cmp)
+    assert(not h:pop())
+    assert(h:count() == 0)
+    local values = {}
+    for i = 1, heap_size do
+        values[i] = new_object(heap_size - i + 1)
+    end
+    for counti = 1, heap_size do
+        local indexes = range(counti)
+        repeat
+            for i = 1, counti do
+                h:push(values[indexes[i]])
+            end
+            heap_check_indexes(h)
+            assert(h:count() == counti)
+            for i = 1, counti do
+                assert(h:top() == values[i])
+                assert(h:pop() == values[i])
+                heap_check_indexes(h)
+            end
+            assert(not h:pop())
+            assert(h:count() == 0)
+        until not next_permutation(indexes)
+    end
+
+    test:ok(true, "no asserts")
+end
+
+--
+-- Min heap update top element.
+--
+local function test_min_heap_update_top(test)
+    test:plan(1)
+
+    local h = heap.new(min_heap_cmp)
+    for counti = 1, heap_size do
+        local indexes = range(counti)
+        repeat
+            local values = {}
+            for i = 1, counti do
+                values[i] = new_object(0)
+                h:push(values[i])
+            end
+            heap_check_indexes(h)
+            for i = 1, counti do
+                h:top().value = indexes[i]
+                h:update_top()
+            end
+            heap_check_indexes(h)
+            assert(h:count() == counti)
+            for i = 1, counti do
+                assert(h:top().value == i)
+                assert(h:pop().value == i)
+                heap_check_indexes(h)
+            end
+            assert(not h:pop())
+            assert(h:count() == 0)
+        until not next_permutation(indexes)
+    end
+
+    test:ok(true, "no asserts")
+end
+
+--
+-- Min heap update all elements in all possible positions.
+--
+local function test_min_heap_update(test)
+    test:plan(1)
+
+    local h = heap.new(min_heap_cmp)
+    for counti = 1, heap_size do
+        for srci = 1, counti do
+            local endv = srci * 10 + 5
+            for newv = 5, endv, 5 do
+                local values = {}
+                for i = 1, counti do
+                    values[i] = new_object(i * 10)
+                    h:push(values[i])
+                end
+                heap_check_indexes(h)
+                local obj = values[srci]
+                obj.value = newv
+                h:update(obj)
+                assert(obj.index >= 1)
+                assert(obj.index <= counti)
+                local prev = -1
+                for i = 1, counti do
+                    obj = h:pop()
+                    assert(obj.index == -1)
+                    assert(obj.value >= prev)
+                    assert(obj.value >= 1)
+                    prev = obj.value
+                    obj.value = -1
+                    heap_check_indexes(h)
+                end
+                assert(not h:pop())
+                assert(h:count() == 0)
+            end
+        end
+    end
+
+    test:ok(true, "no asserts")
+end
+
+--
+-- Max heap delete all elements from all possible positions.
+--
+local function test_max_heap_delete(test)
+    test:plan(1)
+
+    local h = heap.new(max_heap_cmp)
+    local inf = heap_size + 1
+    for counti = 1, heap_size do
+        for srci = 1, counti do
+            local values = {}
+            for i = 1, counti do
+                values[i] = new_object(i)
+                h:push(values[i])
+            end
+            heap_check_indexes(h)
+            local obj = values[srci]
+            obj.value = inf
+            h:remove(obj)
+            assert(obj.index == -1)
+            local prev = inf
+            for i = 2, counti do
+                obj = h:pop()
+                assert(obj.index == -1)
+                assert(obj.value < prev)
+                assert(obj.value >= 1)
+                prev = obj.value
+                obj.value = -1
+                heap_check_indexes(h)
+            end
+            assert(not h:pop())
+            assert(h:count() == 0)
+        end
+    end
+
+    test:ok(true, "no asserts")
+end
+
+local function test_min_heap_remove_top(test)
+    test:plan(1)
+
+    local h = heap.new(min_heap_cmp)
+    for i = 1, heap_size do
+        h:push(new_object(i))
+    end
+    for i = 1, heap_size do
+        assert(h:top().value == i)
+        h:remove_top()
+    end
+    assert(h:count() == 0)
+
+    test:ok(true, "no asserts")
+end
+
+local function test_max_heap_remove_try(test)
+    test:plan(1)
+
+    local h = heap.new(max_heap_cmp)
+    local obj = new_object(1)
+    assert(obj.index == nil)
+    h:remove_try(obj)
+    assert(h:count() == 0)
+
+    h:push(obj)
+    h:push(new_object(2))
+    assert(obj.index == 2)
+    h:remove(obj)
+    assert(obj.index == -1)
+    h:remove_try(obj)
+    assert(obj.index == -1)
+    assert(h:count() == 1)
+
+    test:ok(true, "no asserts")
+end
+
+test:plan(7)
+
+test:test('min_heap_basic', test_min_heap_basic)
+test:test('max_heap_basic', test_max_heap_basic)
+test:test('min_heap_update_top', test_min_heap_update_top)
+test:test('min heap update', test_min_heap_update)
+test:test('max heap delete', test_max_heap_delete)
+test:test('min heap remove top', test_min_heap_remove_top)
+test:test('max heap remove try', test_max_heap_remove_try)
+
+os.exit(test:check() and 0 or 1)
diff --git a/test/unit-tap/suite.ini b/test/unit-tap/suite.ini
new file mode 100644
index 0000000..f365b69
--- /dev/null
+++ b/test/unit-tap/suite.ini
@@ -0,0 +1,4 @@
+[default]
+core = app
+description = Unit tests TAP
+is_parallel = True
diff --git a/vshard/heap.lua b/vshard/heap.lua
new file mode 100644
index 0000000..78c600a
--- /dev/null
+++ b/vshard/heap.lua
@@ -0,0 +1,226 @@
+local math_floor = math.floor
+
+--
+-- Implementation of a typical algorithm of the binary heap.
+-- The heap is intrusive - it stores index of each element inside of it. It
+-- allows to update and delete elements in any place in the heap, not only top
+-- elements.
+--
+
+local function heap_parent_index(index)
+    return math_floor(index / 2)
+end
+
+local function heap_left_child_index(index)
+    return index * 2
+end
+
+--
+-- Generate a new heap.
+--
+-- The implementation is targeted on as few index accesses as possible.
+-- Everything what could be is stored as upvalue variables instead of as indexes
+-- in a table. What couldn't be an upvalue and is used in a function more than
+-- once is saved on the stack.
+--
+local function heap_new(is_left_above)
+    -- Having it as an upvalue allows not to do 'self.data' lookup in each
+    -- function.
+    local data = {}
+    -- Saves #data calculation. In Lua it is not just reading a number.
+    local count = 0
+
+    local function heap_update_index_up(idx)
+        if idx == 1 then
+            return false
+        end
+
+        local orig_idx = idx
+        local value = data[idx]
+        local pidx = heap_parent_index(idx)
+        local parent = data[pidx]
+        while is_left_above(value, parent) do
+            data[idx] = parent
+            parent.index = idx
+            idx = pidx
+            if idx == 1 then
+                break
+            end
+            pidx = heap_parent_index(idx)
+            parent = data[pidx]
+        end
+
+        if idx == orig_idx then
+            return false
+        end
+        data[idx] = value
+        value.index = idx
+        return true
+    end
+
+    local function heap_update_index_down(idx)
+        local left_idx = heap_left_child_index(idx)
+        if left_idx > count then
+            return false
+        end
+
+        local orig_idx = idx
+        local left
+        local right
+        local right_idx = left_idx + 1
+        local top
+        local top_idx
+        local value = data[idx]
+        repeat
+            right_idx = left_idx + 1
+            if right_idx > count then
+                top = data[left_idx]
+                if is_left_above(value, top) then
+                    break
+                end
+                top_idx = left_idx
+            else
+                left = data[left_idx]
+                right = data[right_idx]
+                if is_left_above(left, right) then
+                    if is_left_above(value, left) then
+                        break
+                    end
+                    top_idx = left_idx
+                    top = left
+                else
+                    if is_left_above(value, right) then
+                        break
+                    end
+                    top_idx = right_idx
+                    top = right
+                end
+            end
+
+            data[idx] = top
+            top.index = idx
+            idx = top_idx
+            left_idx = heap_left_child_index(idx)
+        until left_idx > count
+
+        if idx == orig_idx then
+            return false
+        end
+        data[idx] = value
+        value.index = idx
+        return true
+    end
+
+    local function heap_update_index(idx)
+        if not heap_update_index_up(idx) then
+            heap_update_index_down(idx)
+        end
+    end
+
+    local function heap_push(self, value)
+        count = count + 1
+        data[count] = value
+        value.index = count
+        heap_update_index_up(count)
+    end
+
+    local function heap_update_top(self)
+        heap_update_index_down(1)
+    end
+
+    local function heap_update(self, value)
+        heap_update_index(value.index)
+    end
+
+    local function heap_remove_top(self)
+        if count == 0 then
+            return
+        end
+        data[1].index = -1
+        if count == 1 then
+            data[1] = nil
+            count = 0
+            return
+        end
+        local value = data[count]
+        data[count] = nil
+        data[1] = value
+        value.index = 1
+        count = count - 1
+        heap_update_index_down(1)
+    end
+
+    local function heap_remove(self, value)
+        local idx = value.index
+        value.index = -1
+        if idx == count then
+            data[count] = nil
+            count = count - 1
+            return
+        end
+        value = data[count]
+        data[idx] = value
+        data[count] = nil
+        value.index = idx
+        count = count - 1
+        heap_update_index(idx)
+    end
+
+    local function heap_remove_try(self, value)
+        local idx = value.index
+        if idx and idx > 0 then
+            heap_remove(self, value)
+        end
+    end
+
+    local function heap_pop(self)
+        if count == 0 then
+            return
+        end
+        -- Some duplication from remove_top, but allows to save a few
+        -- condition checks, index accesses, and a function call.
+        local res = data[1]
+        res.index = -1
+        if count == 1 then
+            data[1] = nil
+            count = 0
+            return res
+        end
+        local value = data[count]
+        data[count] = nil
+        data[1] = value
+        value.index = 1
+        count = count - 1
+        heap_update_index_down(1)
+        return res
+    end
+
+    local function heap_top(self)
+        return data[1]
+    end
+
+    local function heap_count(self)
+        return count
+    end
+
+    return setmetatable({
+        -- Expose the data. For testing.
+        data = data,
+    }, {
+        __index = {
+            push = heap_push,
+            update_top = heap_update_top,
+            remove_top = heap_remove_top,
+            pop = heap_pop,
+            update = heap_update,
+            remove = heap_remove,
+            remove_try = heap_remove_try,
+            top = heap_top,
+            count = heap_count,
+        }
+    })
+end
+
+return {
+    new = heap_new,
+}
-- 
2.24.3 (Apple Git-128)



More information about the Tarantool-patches mailing list