* [PATCH 1/2] Unify tarantoolctl cat/play xlog filtering code
2019-03-20 18:41 [PATCH 0/2] Fix tarantoolctl cat/play premature stop Alexander Turenko
@ 2019-03-20 18:41 ` Alexander Turenko
2019-03-20 18:41 ` [PATCH 2/2] Fix tarantoolctl cat/play premature stop with --to Alexander Turenko
2019-03-26 18:11 ` [PATCH 0/2] Fix tarantoolctl cat/play premature stop Vladimir Davydov
2 siblings, 0 replies; 4+ messages in thread
From: Alexander Turenko @ 2019-03-20 18:41 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches
Exposed this unified code (filter_xlog() function) and wrote a unit
test.
Allow to run app-tap/tarantoolctl.test.lua w/o test-run.
Needed for #3827.
---
extra/dist/tarantoolctl.in | 124 +++++++++++++--------
test/app-tap/tarantoolctl.test.lua | 172 ++++++++++++++++++++++++++++-
2 files changed, 244 insertions(+), 52 deletions(-)
diff --git a/extra/dist/tarantoolctl.in b/extra/dist/tarantoolctl.in
index 47fcf895f..014acee6c 100755
--- a/extra/dist/tarantoolctl.in
+++ b/extra/dist/tarantoolctl.in
@@ -828,35 +828,59 @@ local function eval()
return 0
end
-local function cat()
- local options = keyword_arguments
- local from, to, spaces = options.from, options.to, options.space
- local show_system, cat_format = options['show-system'], options.format
- local replicas = options.replica
+-- Call a callback @a cb for each record that matches all of the
+-- following conditions:
+--
+-- 1. opts.from <= record.HEADER.lsn < opts.to.
+-- 2. If @a opts.space and @a opts['show-system'] are not set,
+-- record.BODY.space_id should be nil or >= 512.
+-- 3. If @a opts.space is set, record.BODY.space_id should not be
+-- nil and should be in @a opts.space list.
+-- 4. If @a opts.replica is set, record.HEADER.replica_id should
+-- not be nil and should be in @a opts.replica list.
+--
+-- Once a record with LSN >= @a opts.to is found the loop stops
+-- (even if a file contains next records with smaller LSN from
+-- another replicas). Note however that this function is called
+-- once for each xlog / snap file, so even when it stops on LSN
+-- >= @a opts.to on a current file a next file will be processed.
+local function filter_xlog(gen, param, state, opts, cb)
+ local spaces = opts.spaces
+ local from, to, spaces = opts.from, opts.to, opts.space
+ local show_system, replicas = opts['show-system'], opts.replica
+
+ for lsn, record in gen, param, state do
+ local sid = record.BODY and record.BODY.space_id
+ local rid = record.HEADER.replica_id
+ if lsn >= to then
+ -- stop, as we've finished reading tuple with lsn == to
+ -- and the next lsn's will be bigger
+ break
+ elseif (lsn < from) or
+ (not spaces and sid and sid < 512 and not show_system) or
+ (spaces and (sid == nil or not find_in_list(sid, spaces))) or
+ (replicas and not find_in_list(rid, replicas)) then
+ -- pass this tuple
+ else
+ cb(record)
+ end
+ end
+end
- local format_cb = cat_formats[cat_format]
- local is_printed = false
+local function cat()
+ local opts = keyword_arguments
+ local cat_format = opts.format
+ local format_cb = cat_formats[cat_format]
+ local is_printed = false
for id, file in ipairs(positional_arguments) do
log.error("Processing file '%s'", file)
- for lsn, record in xlog.pairs(file) do
- local sid = record.BODY and record.BODY.space_id
- local rid = record.HEADER.replica_id
- if lsn >= to then
- -- stop, as we've finished reading tuple with lsn == to
- -- and the next lsn's will be bigger
- break
- elseif (lsn < from) or
- (not spaces and sid and sid < 512 and not show_system) or
- (spaces and (sid == nil or not find_in_list(sid, spaces))) or
- (replicas and not find_in_list(rid, replicas)) then
- -- pass this tuple
- else
- is_printed = true
- format_cb(record)
- io.stdout:flush()
- end
- end
- if options.format == 'yaml' and is_printed then
+ local gen, param, state = xlog.pairs(file)
+ filter_xlog(gen, param, state, opts, function(record)
+ is_printed = true
+ format_cb(record)
+ io.stdout:flush()
+ end)
+ if opts.format == 'yaml' and is_printed then
is_printed = false
print('...\n')
end
@@ -864,11 +888,8 @@ local function cat()
end
local function play()
- local options = keyword_arguments
- local from, to, spaces = options.from, options.to, options.space
- local show_system = options['show-system']
+ local opts = keyword_arguments
local uri = table.remove(positional_arguments, 1)
- local replicas = options.replica
if uri == nil then
error("Empty URI is provided")
@@ -879,19 +900,10 @@ local function play()
end
for id, file in ipairs(positional_arguments) do
log.info(("Processing file '%s'"):format(file))
- for lsn, record in xlog.pairs(file) do
+ local gen, param, state = xlog.pairs(file)
+ filter_xlog(gen, param, state, opts, function(record)
local sid = record.BODY and record.BODY.space_id
- local rid = record.HEADER.replica_id
- if lsn >= to then
- -- stop, as we've finished reading tuple with lsn == to
- -- and the next lsn's will be bigger
- break
- elseif (lsn < from) or sid == nil or
- (not spaces and sid < 512 and not show_system) or
- (spaces and not find_in_list(sid, spaces)) or
- (replicas and not find_in_list(rid, replicas)) then
- -- pass this tuple
- else
+ if sid ~= nil then
local args, so = {}, remote.space[sid]
if so == nil then
error(("No space #%s, stopping"):format(sid))
@@ -902,7 +914,7 @@ local function play()
table.insert(args, record.BODY.operations)
so[record.HEADER.type:lower()](unpack(args))
end
- end
+ end)
end
remote:close()
end
@@ -1289,8 +1301,9 @@ usage = function(command, verbose)
usage_commands(commands, verbose)
os.exit(1)
end
+
-- parse parameters and put the result into positional/keyword_arguments
-do
+local function populate_arguments()
local function keyword_arguments_populate(ka)
ka = ka or {}
ka.from = ka.from or 0
@@ -1336,11 +1349,24 @@ do
keyword_arguments = keyword_arguments_populate(parameters)
end
-local cmd_pair = commands[command_name]
-if #arg < 2 then
- log.error("Not enough arguments for '%s' command\n", command_name)
- usage(command_name)
+local function main()
+ populate_arguments()
+ local cmd_pair = commands[command_name]
+ if #arg < 2 then
+ log.error("Not enough arguments for '%s' command\n", command_name)
+ usage(command_name)
+ end
+ cmd_pair.process(cmd_pair.func)
+end
+
+if rawget(_G, 'TARANTOOLCTL_UNIT_TEST') then
+ return {
+ internal = {
+ filter_xlog = filter_xlog,
+ }
+ }
end
-cmd_pair.process(cmd_pair.func)
+
+main()
-- vim: syntax=lua
diff --git a/test/app-tap/tarantoolctl.test.lua b/test/app-tap/tarantoolctl.test.lua
index db046e03f..4910b94f3 100755
--- a/test/app-tap/tarantoolctl.test.lua
+++ b/test/app-tap/tarantoolctl.test.lua
@@ -7,7 +7,11 @@ local uuid = require('uuid')
local yaml = require('yaml')
local errno = require('errno')
local fiber = require('fiber')
-local test_run = require('test_run').new()
+local ok, test_run = pcall(require, 'test_run')
+test_run = ok and test_run.new() or nil
+
+local BUILDDIR = os.getenv('BUILDDIR') or '.'
+local TARANTOOLCTL_PATH = ('%s/extra/dist/tarantoolctl'):format(BUILDDIR)
local function recursive_rmdir(path)
path = fio.abspath(path)
@@ -156,8 +160,19 @@ local function check_ok(test, dir, cmd, args, e_res, e_stdout, e_stderr)
end
end
+local function merge(...)
+ local res = {}
+ for i = 1, select('#', ...) do
+ local t = select(i, ...)
+ for k, v in pairs(t) do
+ res[k] = v
+ end
+ end
+ return res
+end
+
local test = tap.test('tarantoolctl')
-test:plan(7)
+test:plan(8)
-- basic start/stop test
-- must be stopped afterwards
@@ -402,7 +417,9 @@ do
end
-- check play
-do
+if test_run == nil then
+ test:skip('skip \'tarantoolctl play\' test (test-run is required)')
+else
local dir = fio.tempdir()
local filler_code = [[
@@ -468,4 +485,153 @@ do
end
end
+test:test('filter_xlog', function(test)
+ local xlog_data = {
+ -- [1] =
+ {
+ HEADER = {lsn = 130, type = 'INSERT', timestamp = 1551987542.8702},
+ BODY = {space_id = 515, tuple = {1}},
+ },
+ -- [2] =
+ {
+ HEADER = {lsn = 131, type = 'INSERT', timestamp = 1551987542.8702},
+ BODY = {space_id = 515, tuple = {2}},
+ },
+ -- [3] =
+ {
+ HEADER = {lsn = 132, type = 'INSERT', timestamp = 1551987542.8702},
+ BODY = {space_id = 515, tuple = {3}},
+ },
+ -- [4] =
+ {
+ HEADER = {lsn = 133, type = 'INSERT', timestamp = 1551987542.8702},
+ BODY = {space_id = 516, tuple = {'a'}},
+ },
+ -- [5] =
+ {
+ HEADER = {lsn = 134, type = 'INSERT', timestamp = 1551987542.8702},
+ BODY = {space_id = 517, tuple = {'a'}},
+ },
+ -- [6] =
+ {
+ HEADER = {lsn = 135, type = 'INSERT', timestamp = 1551987542.8702},
+ BODY = {space_id = 518, tuple = {'a'}},
+ },
+ -- [7] =
+ {
+ HEADER = {lsn = 136, type = 'INSERT', timestamp = 1551987542.8702,
+ replica_id = 1},
+ BODY = {space_id = 515, tuple = {4}},
+ },
+ -- [8] =
+ {
+ HEADER = {lsn = 137, type = 'INSERT', timestamp = 1551987542.8702,
+ replica_id = 1},
+ BODY = {space_id = 515, tuple = {5}},
+ },
+ -- [9] =
+ {
+ HEADER = {lsn = 100, type = 'INSERT', timestamp = 1551987542.8702,
+ replica_id = 2},
+ BODY = {space_id = 515, tuple = {6}},
+ },
+ -- [10] =
+ {
+ HEADER = {lsn = 110, type = 'INSERT', timestamp = 1551987542.8702,
+ replica_id = 3},
+ BODY = {space_id = 515, tuple = {7}},
+ },
+ -- [11] =
+ {
+ HEADER = {lsn = 138, type = 'INSERT', timestamp = 1551987542.8702,
+ replica_id = 1},
+ BODY = {space_id = 515, tuple = {8}},
+ },
+ }
+
+ local default_opts = {
+ from = 0,
+ to = -1ULL,
+ ['show-system'] = false,
+ }
+
+ local x = xlog_data -- alias
+ local cases = {
+ {
+ 'w/o args',
+ opts = default_opts,
+ exp_result = x,
+ },
+ {
+ 'from and to',
+ opts = merge(default_opts, {from = 131, to = 132}),
+ exp_result = {x[2]},
+ },
+ {
+ 'space id',
+ opts = merge(default_opts, {space = {516}}),
+ exp_result = {x[4]},
+ },
+ {
+ 'space ids',
+ opts = merge(default_opts, {space = {516, 517}}),
+ exp_result = {x[4], x[5]},
+ },
+ {
+ 'replica id',
+ opts = merge(default_opts, {replica = {1}}),
+ exp_result = {x[7], x[8], x[11]},
+ },
+ {
+ 'replica ids',
+ opts = merge(default_opts, {replica = {1, 2}}),
+ exp_result = {x[7], x[8], x[9], x[11]},
+ },
+ {
+ 'to w/o replica id',
+ opts = merge(default_opts, {to = 120}),
+ exp_result = {},
+ },
+ {
+ 'to and replica id',
+ opts = merge(default_opts, {to = 137, replica = {1}}),
+ exp_result = {x[7]},
+ },
+ {
+ 'to and replica ids',
+ opts = merge(default_opts, {to = 137, replica = {1, 2}}),
+ exp_result = {x[7]},
+ },
+ }
+ test:plan(#cases)
+
+ rawset(_G, 'TARANTOOLCTL_UNIT_TEST', true)
+ local tarantoolctl = dofile(TARANTOOLCTL_PATH)
+
+ -- Like xlog.pairs().
+ local function gen(param, lsn)
+ local row = param.data[param.idx]
+ if row == nil then
+ return
+ end
+ param.idx = param.idx + 1
+ return row.HEADER.lsn, row
+ end
+
+ local function xlog_data_pairs(data)
+ return gen, {data = data, idx = 1}, 0
+ end
+
+ for _, case in ipairs(cases) do
+ local gen, param, state = xlog_data_pairs(xlog_data)
+ local res = {}
+ tarantoolctl.internal.filter_xlog(gen, param, state, case.opts,
+ function(record)
+ table.insert(res, record)
+ end
+ )
+ test:is_deeply(res, case.exp_result, case[1])
+ end
+end)
+
os.exit(test:check() == true and 0 or -1)
--
2.20.1
^ permalink raw reply [flat|nested] 4+ messages in thread
* [PATCH 2/2] Fix tarantoolctl cat/play premature stop with --to
2019-03-20 18:41 [PATCH 0/2] Fix tarantoolctl cat/play premature stop Alexander Turenko
2019-03-20 18:41 ` [PATCH 1/2] Unify tarantoolctl cat/play xlog filtering code Alexander Turenko
@ 2019-03-20 18:41 ` Alexander Turenko
2019-03-26 18:11 ` [PATCH 0/2] Fix tarantoolctl cat/play premature stop Vladimir Davydov
2 siblings, 0 replies; 4+ messages in thread
From: Alexander Turenko @ 2019-03-20 18:41 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: Alexander Turenko, tarantool-patches
Stop a file processing loop only when it is guaranteed that we will not
find a record that match user-provided filters later in this file. If
--replica R is provided one time and we're meet a record from R with a
LSN equal or above of a --to value, we'll stop the loop. Otherwise (no
--replica, several --replica arguments) a file will be read until an end
even if --to is provided.
Fixes #3827.
---
extra/dist/tarantoolctl.in | 14 +++++++-------
test/app-tap/tarantoolctl.test.lua | 4 ++--
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/extra/dist/tarantoolctl.in b/extra/dist/tarantoolctl.in
index 014acee6c..0796f4605 100755
--- a/extra/dist/tarantoolctl.in
+++ b/extra/dist/tarantoolctl.in
@@ -839,11 +839,11 @@ end
-- 4. If @a opts.replica is set, record.HEADER.replica_id should
-- not be nil and should be in @a opts.replica list.
--
--- Once a record with LSN >= @a opts.to is found the loop stops
--- (even if a file contains next records with smaller LSN from
--- another replicas). Note however that this function is called
--- once for each xlog / snap file, so even when it stops on LSN
--- >= @a opts.to on a current file a next file will be processed.
+-- If @a opts.replica is set and is a singleton list and a record
+-- **from this replica** with LSN >= @a opts.to is found the loop
+-- stops. Note however that this function is called once for each
+-- xlog / snap file, so even when it stops on LSN >= @a opts.to on
+-- a current file a next file will be processed.
local function filter_xlog(gen, param, state, opts, cb)
local spaces = opts.spaces
local from, to, spaces = opts.from, opts.to, opts.space
@@ -852,11 +852,11 @@ local function filter_xlog(gen, param, state, opts, cb)
for lsn, record in gen, param, state do
local sid = record.BODY and record.BODY.space_id
local rid = record.HEADER.replica_id
- if lsn >= to then
+ if replicas and #replicas == 1 and replicas[1] == rid and lsn >= to then
-- stop, as we've finished reading tuple with lsn == to
-- and the next lsn's will be bigger
break
- elseif (lsn < from) or
+ elseif (lsn < from) or (lsn >= to) or
(not spaces and sid and sid < 512 and not show_system) or
(spaces and (sid == nil or not find_in_list(sid, spaces))) or
(replicas and not find_in_list(rid, replicas)) then
diff --git a/test/app-tap/tarantoolctl.test.lua b/test/app-tap/tarantoolctl.test.lua
index 4910b94f3..a914db5c5 100755
--- a/test/app-tap/tarantoolctl.test.lua
+++ b/test/app-tap/tarantoolctl.test.lua
@@ -590,7 +590,7 @@ test:test('filter_xlog', function(test)
{
'to w/o replica id',
opts = merge(default_opts, {to = 120}),
- exp_result = {},
+ exp_result = {x[9], x[10]},
},
{
'to and replica id',
@@ -600,7 +600,7 @@ test:test('filter_xlog', function(test)
{
'to and replica ids',
opts = merge(default_opts, {to = 137, replica = {1, 2}}),
- exp_result = {x[7]},
+ exp_result = {x[7], x[9]},
},
}
test:plan(#cases)
--
2.20.1
^ permalink raw reply [flat|nested] 4+ messages in thread