[PATCH 1/2] Unify tarantoolctl cat/play xlog filtering code

Alexander Turenko alexander.turenko at tarantool.org
Wed Mar 20 21:41:13 MSK 2019


Exposed this unified code (filter_xlog() function) and wrote a unit
test.

Allow to run app-tap/tarantoolctl.test.lua w/o test-run.

Needed for #3827.
---
 extra/dist/tarantoolctl.in         | 124 +++++++++++++--------
 test/app-tap/tarantoolctl.test.lua | 172 ++++++++++++++++++++++++++++-
 2 files changed, 244 insertions(+), 52 deletions(-)

diff --git a/extra/dist/tarantoolctl.in b/extra/dist/tarantoolctl.in
index 47fcf895f..014acee6c 100755
--- a/extra/dist/tarantoolctl.in
+++ b/extra/dist/tarantoolctl.in
@@ -828,35 +828,59 @@ local function eval()
     return 0
 end
 
-local function cat()
-    local options = keyword_arguments
-    local from, to, spaces = options.from, options.to, options.space
-    local show_system, cat_format = options['show-system'], options.format
-    local replicas = options.replica
+-- Call a callback @a cb for each record that matches all of the
+-- following conditions:
+--
+-- 1. opts.from <= record.HEADER.lsn < opts.to.
+-- 2. If @a opts.space and @a opts['show-system'] are not set,
+--    record.BODY.space_id should be nil or >= 512.
+-- 3. If @a opts.space is set, record.BODY.space_id should not be
+--    nil and should be in @a opts.space list.
+-- 4. If @a opts.replica is set, record.HEADER.replica_id should
+--    not be nil and should be in @a opts.replica list.
+--
+-- Once a record with LSN >= @a opts.to is found the loop stops
+-- (even if a file contains next records with smaller LSN from
+-- another replicas). Note however that this function is called
+-- once for each xlog / snap file, so even when it stops on LSN
+-- >= @a opts.to on a current file a next file will be processed.
+local function filter_xlog(gen, param, state, opts, cb)
+    local spaces = opts.spaces
+    local from, to, spaces = opts.from, opts.to, opts.space
+    local show_system, replicas = opts['show-system'], opts.replica
+
+    for lsn, record in gen, param, state do
+        local sid = record.BODY and record.BODY.space_id
+        local rid = record.HEADER.replica_id
+        if lsn >= to then
+            -- stop, as we've finished reading tuple with lsn == to
+            -- and the next lsn's will be bigger
+            break
+        elseif (lsn < from) or
+           (not spaces and sid and sid < 512 and not show_system) or
+           (spaces and (sid == nil or not find_in_list(sid, spaces))) or
+           (replicas and not find_in_list(rid, replicas)) then
+            -- pass this tuple
+        else
+            cb(record)
+        end
+    end
+end
 
-    local format_cb   = cat_formats[cat_format]
-    local is_printed  = false
+local function cat()
+    local opts = keyword_arguments
+    local cat_format = opts.format
+    local format_cb = cat_formats[cat_format]
+    local is_printed = false
     for id, file in ipairs(positional_arguments) do
         log.error("Processing file '%s'", file)
-        for lsn, record in xlog.pairs(file) do
-            local sid = record.BODY and record.BODY.space_id
-            local rid = record.HEADER.replica_id
-            if lsn >= to then
-                -- stop, as we've finished reading tuple with lsn == to
-                -- and the next lsn's will be bigger
-                break
-            elseif (lsn < from) or
-               (not spaces and sid and sid < 512 and not show_system) or
-               (spaces and (sid == nil or not find_in_list(sid, spaces))) or
-               (replicas and not find_in_list(rid, replicas)) then
-                -- pass this tuple
-            else
-                is_printed = true
-                format_cb(record)
-                io.stdout:flush()
-            end
-        end
-        if options.format == 'yaml' and is_printed then
+        local gen, param, state = xlog.pairs(file)
+        filter_xlog(gen, param, state, opts, function(record)
+            is_printed = true
+            format_cb(record)
+            io.stdout:flush()
+        end)
+        if opts.format == 'yaml' and is_printed then
             is_printed = false
             print('...\n')
         end
@@ -864,11 +888,8 @@ local function cat()
 end
 
 local function play()
-    local options = keyword_arguments
-    local from, to, spaces = options.from, options.to, options.space
-    local show_system = options['show-system']
+    local opts = keyword_arguments
     local uri = table.remove(positional_arguments, 1)
-    local replicas = options.replica
 
     if uri == nil then
         error("Empty URI is provided")
@@ -879,19 +900,10 @@ local function play()
     end
     for id, file in ipairs(positional_arguments) do
         log.info(("Processing file '%s'"):format(file))
-        for lsn, record in xlog.pairs(file) do
+        local gen, param, state = xlog.pairs(file)
+        filter_xlog(gen, param, state, opts, function(record)
             local sid = record.BODY and record.BODY.space_id
-            local rid = record.HEADER.replica_id
-            if lsn >= to then
-                -- stop, as we've finished reading tuple with lsn == to
-                -- and the next lsn's will be bigger
-                break
-            elseif (lsn < from) or sid == nil or
-               (not spaces and sid < 512 and not show_system) or
-               (spaces and not find_in_list(sid, spaces)) or
-               (replicas and not find_in_list(rid, replicas)) then
-                -- pass this tuple
-            else
+            if sid ~= nil then
                 local args, so = {}, remote.space[sid]
                 if so == nil then
                     error(("No space #%s, stopping"):format(sid))
@@ -902,7 +914,7 @@ local function play()
                 table.insert(args, record.BODY.operations)
                 so[record.HEADER.type:lower()](unpack(args))
             end
-        end
+        end)
     end
     remote:close()
 end
@@ -1289,8 +1301,9 @@ usage = function(command, verbose)
     usage_commands(commands, verbose)
     os.exit(1)
 end
+
 -- parse parameters and put the result into positional/keyword_arguments
-do
+local function populate_arguments()
     local function keyword_arguments_populate(ka)
         ka                = ka                or {}
         ka.from           = ka.from           or 0
@@ -1336,11 +1349,24 @@ do
     keyword_arguments = keyword_arguments_populate(parameters)
 end
 
-local cmd_pair = commands[command_name]
-if #arg < 2 then
-    log.error("Not enough arguments for '%s' command\n", command_name)
-    usage(command_name)
+local function main()
+    populate_arguments()
+    local cmd_pair = commands[command_name]
+    if #arg < 2 then
+        log.error("Not enough arguments for '%s' command\n", command_name)
+        usage(command_name)
+    end
+    cmd_pair.process(cmd_pair.func)
+end
+
+if rawget(_G, 'TARANTOOLCTL_UNIT_TEST') then
+    return {
+        internal = {
+            filter_xlog = filter_xlog,
+        }
+    }
 end
-cmd_pair.process(cmd_pair.func)
+
+main()
 
 -- vim: syntax=lua
diff --git a/test/app-tap/tarantoolctl.test.lua b/test/app-tap/tarantoolctl.test.lua
index db046e03f..4910b94f3 100755
--- a/test/app-tap/tarantoolctl.test.lua
+++ b/test/app-tap/tarantoolctl.test.lua
@@ -7,7 +7,11 @@ local uuid     = require('uuid')
 local yaml     = require('yaml')
 local errno    = require('errno')
 local fiber    = require('fiber')
-local test_run = require('test_run').new()
+local ok, test_run = pcall(require, 'test_run')
+test_run = ok and test_run.new() or nil
+
+local BUILDDIR = os.getenv('BUILDDIR') or '.'
+local TARANTOOLCTL_PATH = ('%s/extra/dist/tarantoolctl'):format(BUILDDIR)
 
 local function recursive_rmdir(path)
     path = fio.abspath(path)
@@ -156,8 +160,19 @@ local function check_ok(test, dir, cmd, args, e_res, e_stdout, e_stderr)
     end
 end
 
+local function merge(...)
+    local res = {}
+    for i = 1, select('#', ...) do
+        local t = select(i, ...)
+        for k, v in pairs(t) do
+            res[k] = v
+        end
+    end
+    return res
+end
+
 local test = tap.test('tarantoolctl')
-test:plan(7)
+test:plan(8)
 
 -- basic start/stop test
 -- must be stopped afterwards
@@ -402,7 +417,9 @@ do
 end
 
 -- check play
-do
+if test_run == nil then
+    test:skip('skip \'tarantoolctl play\' test (test-run is required)')
+else
     local dir = fio.tempdir()
 
     local filler_code = [[
@@ -468,4 +485,153 @@ do
     end
 end
 
+test:test('filter_xlog', function(test)
+    local xlog_data = {
+        -- [1] =
+        {
+            HEADER = {lsn = 130, type = 'INSERT', timestamp = 1551987542.8702},
+            BODY = {space_id = 515, tuple = {1}},
+        },
+        -- [2] =
+        {
+            HEADER = {lsn = 131, type = 'INSERT', timestamp = 1551987542.8702},
+            BODY = {space_id = 515, tuple = {2}},
+        },
+        -- [3] =
+        {
+            HEADER = {lsn = 132, type = 'INSERT', timestamp = 1551987542.8702},
+            BODY = {space_id = 515, tuple = {3}},
+        },
+        -- [4] =
+        {
+            HEADER = {lsn = 133, type = 'INSERT', timestamp = 1551987542.8702},
+            BODY = {space_id = 516, tuple = {'a'}},
+        },
+        -- [5] =
+        {
+            HEADER = {lsn = 134, type = 'INSERT', timestamp = 1551987542.8702},
+            BODY = {space_id = 517, tuple = {'a'}},
+        },
+        -- [6] =
+        {
+            HEADER = {lsn = 135, type = 'INSERT', timestamp = 1551987542.8702},
+            BODY = {space_id = 518, tuple = {'a'}},
+        },
+        -- [7] =
+        {
+            HEADER = {lsn = 136, type = 'INSERT', timestamp = 1551987542.8702,
+                      replica_id = 1},
+            BODY = {space_id = 515, tuple = {4}},
+        },
+        -- [8] =
+        {
+            HEADER = {lsn = 137, type = 'INSERT', timestamp = 1551987542.8702,
+                      replica_id = 1},
+            BODY = {space_id = 515, tuple = {5}},
+        },
+        -- [9] =
+        {
+            HEADER = {lsn = 100, type = 'INSERT', timestamp = 1551987542.8702,
+                      replica_id = 2},
+            BODY = {space_id = 515, tuple = {6}},
+        },
+        -- [10] =
+        {
+            HEADER = {lsn = 110, type = 'INSERT', timestamp = 1551987542.8702,
+                      replica_id = 3},
+            BODY = {space_id = 515, tuple = {7}},
+        },
+        -- [11] =
+        {
+            HEADER = {lsn = 138, type = 'INSERT', timestamp = 1551987542.8702,
+                      replica_id = 1},
+            BODY = {space_id = 515, tuple = {8}},
+        },
+    }
+
+    local default_opts = {
+        from = 0,
+        to = -1ULL,
+        ['show-system'] = false,
+    }
+
+    local x = xlog_data -- alias
+    local cases = {
+        {
+            'w/o args',
+            opts = default_opts,
+            exp_result = x,
+        },
+        {
+            'from and to',
+            opts = merge(default_opts, {from = 131, to = 132}),
+            exp_result = {x[2]},
+        },
+        {
+            'space id',
+            opts = merge(default_opts, {space = {516}}),
+            exp_result = {x[4]},
+        },
+        {
+            'space ids',
+            opts = merge(default_opts, {space = {516, 517}}),
+            exp_result = {x[4], x[5]},
+        },
+        {
+            'replica id',
+            opts = merge(default_opts, {replica = {1}}),
+            exp_result = {x[7], x[8], x[11]},
+        },
+        {
+            'replica ids',
+            opts = merge(default_opts, {replica = {1, 2}}),
+            exp_result = {x[7], x[8], x[9], x[11]},
+        },
+        {
+            'to w/o replica id',
+            opts = merge(default_opts, {to = 120}),
+            exp_result = {},
+        },
+        {
+            'to and replica id',
+            opts = merge(default_opts, {to = 137, replica = {1}}),
+            exp_result = {x[7]},
+        },
+        {
+            'to and replica ids',
+            opts = merge(default_opts, {to = 137, replica = {1, 2}}),
+            exp_result = {x[7]},
+        },
+    }
+    test:plan(#cases)
+
+    rawset(_G, 'TARANTOOLCTL_UNIT_TEST', true)
+    local tarantoolctl = dofile(TARANTOOLCTL_PATH)
+
+    -- Like xlog.pairs().
+    local function gen(param, lsn)
+        local row = param.data[param.idx]
+        if row == nil then
+            return
+        end
+        param.idx = param.idx + 1
+        return row.HEADER.lsn, row
+    end
+
+    local function xlog_data_pairs(data)
+        return gen, {data = data, idx = 1}, 0
+    end
+
+    for _, case in ipairs(cases) do
+        local gen, param, state = xlog_data_pairs(xlog_data)
+        local res = {}
+        tarantoolctl.internal.filter_xlog(gen, param, state, case.opts,
+            function(record)
+                table.insert(res, record)
+            end
+        )
+        test:is_deeply(res, case.exp_result, case[1])
+    end
+end)
+
 os.exit(test:check() == true and 0 or -1)
-- 
2.20.1




More information about the Tarantool-patches mailing list