From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Alexander Turenko Subject: [PATCH 1/2] Unify tarantoolctl cat/play xlog filtering code Date: Wed, 20 Mar 2019 21:41:13 +0300 Message-Id: <33ed6316b37f7f97e4b8536415f39588cb94087a.1553104456.git.alexander.turenko@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit To: Vladimir Davydov Cc: Alexander Turenko , tarantool-patches@freelists.org List-ID: Exposed this unified code (filter_xlog() function) and wrote a unit test. Allow to run app-tap/tarantoolctl.test.lua w/o test-run. Needed for #3827. --- extra/dist/tarantoolctl.in | 124 +++++++++++++-------- test/app-tap/tarantoolctl.test.lua | 172 ++++++++++++++++++++++++++++- 2 files changed, 244 insertions(+), 52 deletions(-) diff --git a/extra/dist/tarantoolctl.in b/extra/dist/tarantoolctl.in index 47fcf895f..014acee6c 100755 --- a/extra/dist/tarantoolctl.in +++ b/extra/dist/tarantoolctl.in @@ -828,35 +828,59 @@ local function eval() return 0 end -local function cat() - local options = keyword_arguments - local from, to, spaces = options.from, options.to, options.space - local show_system, cat_format = options['show-system'], options.format - local replicas = options.replica +-- Call a callback @a cb for each record that matches all of the +-- following conditions: +-- +-- 1. opts.from <= record.HEADER.lsn < opts.to. +-- 2. If @a opts.space and @a opts['show-system'] are not set, +-- record.BODY.space_id should be nil or >= 512. +-- 3. If @a opts.space is set, record.BODY.space_id should not be +-- nil and should be in @a opts.space list. +-- 4. If @a opts.replica is set, record.HEADER.replica_id should +-- not be nil and should be in @a opts.replica list. +-- +-- Once a record with LSN >= @a opts.to is found the loop stops +-- (even if a file contains next records with smaller LSN from +-- another replicas). Note however that this function is called +-- once for each xlog / snap file, so even when it stops on LSN +-- >= @a opts.to on a current file a next file will be processed. +local function filter_xlog(gen, param, state, opts, cb) + local spaces = opts.spaces + local from, to, spaces = opts.from, opts.to, opts.space + local show_system, replicas = opts['show-system'], opts.replica + + for lsn, record in gen, param, state do + local sid = record.BODY and record.BODY.space_id + local rid = record.HEADER.replica_id + if lsn >= to then + -- stop, as we've finished reading tuple with lsn == to + -- and the next lsn's will be bigger + break + elseif (lsn < from) or + (not spaces and sid and sid < 512 and not show_system) or + (spaces and (sid == nil or not find_in_list(sid, spaces))) or + (replicas and not find_in_list(rid, replicas)) then + -- pass this tuple + else + cb(record) + end + end +end - local format_cb = cat_formats[cat_format] - local is_printed = false +local function cat() + local opts = keyword_arguments + local cat_format = opts.format + local format_cb = cat_formats[cat_format] + local is_printed = false for id, file in ipairs(positional_arguments) do log.error("Processing file '%s'", file) - for lsn, record in xlog.pairs(file) do - local sid = record.BODY and record.BODY.space_id - local rid = record.HEADER.replica_id - if lsn >= to then - -- stop, as we've finished reading tuple with lsn == to - -- and the next lsn's will be bigger - break - elseif (lsn < from) or - (not spaces and sid and sid < 512 and not show_system) or - (spaces and (sid == nil or not find_in_list(sid, spaces))) or - (replicas and not find_in_list(rid, replicas)) then - -- pass this tuple - else - is_printed = true - format_cb(record) - io.stdout:flush() - end - end - if options.format == 'yaml' and is_printed then + local gen, param, state = xlog.pairs(file) + filter_xlog(gen, param, state, opts, function(record) + is_printed = true + format_cb(record) + io.stdout:flush() + end) + if opts.format == 'yaml' and is_printed then is_printed = false print('...\n') end @@ -864,11 +888,8 @@ local function cat() end local function play() - local options = keyword_arguments - local from, to, spaces = options.from, options.to, options.space - local show_system = options['show-system'] + local opts = keyword_arguments local uri = table.remove(positional_arguments, 1) - local replicas = options.replica if uri == nil then error("Empty URI is provided") @@ -879,19 +900,10 @@ local function play() end for id, file in ipairs(positional_arguments) do log.info(("Processing file '%s'"):format(file)) - for lsn, record in xlog.pairs(file) do + local gen, param, state = xlog.pairs(file) + filter_xlog(gen, param, state, opts, function(record) local sid = record.BODY and record.BODY.space_id - local rid = record.HEADER.replica_id - if lsn >= to then - -- stop, as we've finished reading tuple with lsn == to - -- and the next lsn's will be bigger - break - elseif (lsn < from) or sid == nil or - (not spaces and sid < 512 and not show_system) or - (spaces and not find_in_list(sid, spaces)) or - (replicas and not find_in_list(rid, replicas)) then - -- pass this tuple - else + if sid ~= nil then local args, so = {}, remote.space[sid] if so == nil then error(("No space #%s, stopping"):format(sid)) @@ -902,7 +914,7 @@ local function play() table.insert(args, record.BODY.operations) so[record.HEADER.type:lower()](unpack(args)) end - end + end) end remote:close() end @@ -1289,8 +1301,9 @@ usage = function(command, verbose) usage_commands(commands, verbose) os.exit(1) end + -- parse parameters and put the result into positional/keyword_arguments -do +local function populate_arguments() local function keyword_arguments_populate(ka) ka = ka or {} ka.from = ka.from or 0 @@ -1336,11 +1349,24 @@ do keyword_arguments = keyword_arguments_populate(parameters) end -local cmd_pair = commands[command_name] -if #arg < 2 then - log.error("Not enough arguments for '%s' command\n", command_name) - usage(command_name) +local function main() + populate_arguments() + local cmd_pair = commands[command_name] + if #arg < 2 then + log.error("Not enough arguments for '%s' command\n", command_name) + usage(command_name) + end + cmd_pair.process(cmd_pair.func) +end + +if rawget(_G, 'TARANTOOLCTL_UNIT_TEST') then + return { + internal = { + filter_xlog = filter_xlog, + } + } end -cmd_pair.process(cmd_pair.func) + +main() -- vim: syntax=lua diff --git a/test/app-tap/tarantoolctl.test.lua b/test/app-tap/tarantoolctl.test.lua index db046e03f..4910b94f3 100755 --- a/test/app-tap/tarantoolctl.test.lua +++ b/test/app-tap/tarantoolctl.test.lua @@ -7,7 +7,11 @@ local uuid = require('uuid') local yaml = require('yaml') local errno = require('errno') local fiber = require('fiber') -local test_run = require('test_run').new() +local ok, test_run = pcall(require, 'test_run') +test_run = ok and test_run.new() or nil + +local BUILDDIR = os.getenv('BUILDDIR') or '.' +local TARANTOOLCTL_PATH = ('%s/extra/dist/tarantoolctl'):format(BUILDDIR) local function recursive_rmdir(path) path = fio.abspath(path) @@ -156,8 +160,19 @@ local function check_ok(test, dir, cmd, args, e_res, e_stdout, e_stderr) end end +local function merge(...) + local res = {} + for i = 1, select('#', ...) do + local t = select(i, ...) + for k, v in pairs(t) do + res[k] = v + end + end + return res +end + local test = tap.test('tarantoolctl') -test:plan(7) +test:plan(8) -- basic start/stop test -- must be stopped afterwards @@ -402,7 +417,9 @@ do end -- check play -do +if test_run == nil then + test:skip('skip \'tarantoolctl play\' test (test-run is required)') +else local dir = fio.tempdir() local filler_code = [[ @@ -468,4 +485,153 @@ do end end +test:test('filter_xlog', function(test) + local xlog_data = { + -- [1] = + { + HEADER = {lsn = 130, type = 'INSERT', timestamp = 1551987542.8702}, + BODY = {space_id = 515, tuple = {1}}, + }, + -- [2] = + { + HEADER = {lsn = 131, type = 'INSERT', timestamp = 1551987542.8702}, + BODY = {space_id = 515, tuple = {2}}, + }, + -- [3] = + { + HEADER = {lsn = 132, type = 'INSERT', timestamp = 1551987542.8702}, + BODY = {space_id = 515, tuple = {3}}, + }, + -- [4] = + { + HEADER = {lsn = 133, type = 'INSERT', timestamp = 1551987542.8702}, + BODY = {space_id = 516, tuple = {'a'}}, + }, + -- [5] = + { + HEADER = {lsn = 134, type = 'INSERT', timestamp = 1551987542.8702}, + BODY = {space_id = 517, tuple = {'a'}}, + }, + -- [6] = + { + HEADER = {lsn = 135, type = 'INSERT', timestamp = 1551987542.8702}, + BODY = {space_id = 518, tuple = {'a'}}, + }, + -- [7] = + { + HEADER = {lsn = 136, type = 'INSERT', timestamp = 1551987542.8702, + replica_id = 1}, + BODY = {space_id = 515, tuple = {4}}, + }, + -- [8] = + { + HEADER = {lsn = 137, type = 'INSERT', timestamp = 1551987542.8702, + replica_id = 1}, + BODY = {space_id = 515, tuple = {5}}, + }, + -- [9] = + { + HEADER = {lsn = 100, type = 'INSERT', timestamp = 1551987542.8702, + replica_id = 2}, + BODY = {space_id = 515, tuple = {6}}, + }, + -- [10] = + { + HEADER = {lsn = 110, type = 'INSERT', timestamp = 1551987542.8702, + replica_id = 3}, + BODY = {space_id = 515, tuple = {7}}, + }, + -- [11] = + { + HEADER = {lsn = 138, type = 'INSERT', timestamp = 1551987542.8702, + replica_id = 1}, + BODY = {space_id = 515, tuple = {8}}, + }, + } + + local default_opts = { + from = 0, + to = -1ULL, + ['show-system'] = false, + } + + local x = xlog_data -- alias + local cases = { + { + 'w/o args', + opts = default_opts, + exp_result = x, + }, + { + 'from and to', + opts = merge(default_opts, {from = 131, to = 132}), + exp_result = {x[2]}, + }, + { + 'space id', + opts = merge(default_opts, {space = {516}}), + exp_result = {x[4]}, + }, + { + 'space ids', + opts = merge(default_opts, {space = {516, 517}}), + exp_result = {x[4], x[5]}, + }, + { + 'replica id', + opts = merge(default_opts, {replica = {1}}), + exp_result = {x[7], x[8], x[11]}, + }, + { + 'replica ids', + opts = merge(default_opts, {replica = {1, 2}}), + exp_result = {x[7], x[8], x[9], x[11]}, + }, + { + 'to w/o replica id', + opts = merge(default_opts, {to = 120}), + exp_result = {}, + }, + { + 'to and replica id', + opts = merge(default_opts, {to = 137, replica = {1}}), + exp_result = {x[7]}, + }, + { + 'to and replica ids', + opts = merge(default_opts, {to = 137, replica = {1, 2}}), + exp_result = {x[7]}, + }, + } + test:plan(#cases) + + rawset(_G, 'TARANTOOLCTL_UNIT_TEST', true) + local tarantoolctl = dofile(TARANTOOLCTL_PATH) + + -- Like xlog.pairs(). + local function gen(param, lsn) + local row = param.data[param.idx] + if row == nil then + return + end + param.idx = param.idx + 1 + return row.HEADER.lsn, row + end + + local function xlog_data_pairs(data) + return gen, {data = data, idx = 1}, 0 + end + + for _, case in ipairs(cases) do + local gen, param, state = xlog_data_pairs(xlog_data) + local res = {} + tarantoolctl.internal.filter_xlog(gen, param, state, case.opts, + function(record) + table.insert(res, record) + end + ) + test:is_deeply(res, case.exp_result, case[1]) + end +end) + os.exit(test:check() == true and 0 or -1) -- 2.20.1