[tarantool/dump] Add dump and restore filters

Vladimir Davydov vdavydov.dev at gmail.com
Tue Jan 30 19:37:56 MSK 2018


This patch adds an optional argument to dump and restore functions that
can be used for passing options. Currently, there's the only option
available, 'filter', which is supposed to be a function that takes a
space and a tuple and returns a tuple. This function is called for each
dumped/restored tuple. It can be used to overwrite what is written to
the dump file or restored. If it returns nil the tuple will be skipped.

The 'filter' option can be used to convert memtx spaces to vinyl as
shown below:

    dump.restore('dump', {
        filter = function(space, tuple)
            if space.id == box.schema.SPACE_ID then
                return tuple:update{{'=', 4, 'vinyl'}}
            else
                return tuple
            end
        end
    })

Closes #1
---
Branch: gh-1-add-tuple-filter

 dump/init.lua      | 74 +++++++++++++++++++++++++++++++++++++++++---------
 test/dump.test.lua | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 139 insertions(+), 14 deletions(-)

diff --git a/dump/init.lua b/dump/init.lua
index 1a4511165cf6..889ee17f9f30 100644
--- a/dump/init.lua
+++ b/dump/init.lua
@@ -170,7 +170,7 @@ end
 
 -- Create a dump  stream object for path
 -- Creates the path if it doesn't exist.
-local function dump_stream_new(dump_stream, path)
+local function dump_stream_new(dump_stream, path, opts)
     local dump_stream_vtab = {
         begin_dump_space = begin_dump_space;
         end_dump_space = end_dump_space;
@@ -182,6 +182,7 @@ local function dump_stream_new(dump_stream, path)
     end
     local dump_object = {
         path = path;
+        opts = opts;
         files = {};
         spaces = 0;
         rows = 0;
@@ -204,7 +205,7 @@ local dump_stream =
 -- Create a restore stream object for a path
 -- Scans the path, finds all dump files and prepares
 -- them for restore.
-local function restore_stream_new(restore_stream, path)
+local function restore_stream_new(restore_stream, path, opts)
     croak("Reading contents of %s", path)
     local stat = fio.stat(path)
     if not stat then
@@ -227,7 +228,13 @@ local function restore_stream_new(restore_stream, path)
     -- as checkpoint recovery
     table.sort(files)
     croak("Found %d files", #files)
-    local restore_object = { path = path; files = files; spaces = 0; rows = 0; }
+    local restore_object = {
+        path = path;
+        opts = opts;
+        files = files;
+        spaces = 0;
+        rows = 0;
+    }
     return restore_object
 end
 
@@ -241,7 +248,7 @@ local function space_stream_next_tuple(stream)
         return nil
     end
     stream.buf.rpos = rpos
-    return tuple
+    return box.tuple.new(tuple)
 end
 
 -- Read in more data from the dump file to the stream buffer.
@@ -302,8 +309,11 @@ local function space_stream_restore(stream)
                 box.commit()
                 box.begin()
             end
-            stream.space:replace(tuple)
-            stream.rows = stream.rows + 1
+            tuple = stream.opts.filter(stream.space, tuple)
+            if tuple ~= nil then
+                stream.space:replace(tuple)
+                stream.rows = stream.rows + 1
+            end
         end
     end
     if TXN_ROWS > 1 then
@@ -314,7 +324,7 @@ local function space_stream_restore(stream)
 end
 
 -- Create a new stream to restore a single space
-local function space_stream_new(dir, space_id)
+local function space_stream_new(dir, space_id, opts)
     local space = box.space[space_id]
     if space == nil then
         return nil, string.format("The dump directory is missing metadata for space %d",
@@ -329,6 +339,7 @@ local function space_stream_new(dir, space_id)
     local space_stream = {
         space = space;
         path = path;
+        opts = opts;
         fh = fh;
         rows = 0;
         buf = buffer.ibuf(BUFSIZ);
@@ -349,6 +360,33 @@ local restore_stream = {
 
 -- {{{ Database-wide dump and restore
 
+local common_opts_template = {
+    filter = function(space, tuple) return tuple end,
+}
+
+local dump_opts_template = common_opts_template
+local restore_opts_template = common_opts_template
+
+local function check_opts(opts, template)
+    opts = opts or {}
+    for k, v in pairs(opts) do
+        local default = template[k]
+        if default == nil then
+            return nil, string.format("Invalid option '%s'", k)
+        end
+        if type(default) ~= type(v) then
+            return nil, string.format("Invalid value for option '%s': expected %s, got %s",
+                k, type(default), type(v))
+        end
+    end
+    for k, v in pairs(template) do
+        if opts[k] == nil then
+            opts[k] = v
+        end
+    end
+    return opts
+end
+
 --
 -- Dump data from a single space into a stream.
 -- Apply filter.
@@ -378,6 +416,10 @@ local function dump_space(stream, space, filter)
             if filter and filter(v) then
                 goto continue
             end
+            v = stream.opts.filter(space, v)
+            if v == nil then
+                goto continue
+            end
             local status, msg = stream:dump_tuple(space_id, v)
             if not status then
                 stream:end_dump_space(space_id)
@@ -407,12 +449,16 @@ end
 -- system functions, users, roles and grants.
 -- Then dump all other spaces.
 --
-local function dump(path)
+local function dump(path, opts)
+    local opts, msg = check_opts(opts, dump_opts_template)
+    if not opts then
+        return nil, msg
+    end
     local status, msg = box_is_configured()
     if not status then
         return nil, msg
     end
-    local stream, msg = dump_stream:new(path)
+    local stream, msg = dump_stream:new(path, opts)
     if not stream then
         return nil, msg
     end
@@ -448,12 +494,16 @@ end
 --
 -- Restore all spaces from the backup stored at the given path.
 --
-local function restore(path)
+local function restore(path, opts)
+    local opts, msg = check_opts(opts, restore_opts_template)
+    if not opts then
+        return nil, msg
+    end
     local status, msg = box_is_configured()
     if not status then
         return nil, msg
     end
-    local stream, msg = restore_stream:new(path)
+    local stream, msg = restore_stream:new(path, opts)
     if not stream then
         return nil, msg
     end
@@ -465,7 +515,7 @@ local function restore(path)
         --  so all user defined spaces should be created by the time
         --  they are  restored
         --
-        local space_stream, msg = space_stream_new(stream.path, space_id)
+        local space_stream, msg = space_stream_new(stream.path, space_id, opts)
         if not space_stream then
             return nil, msg
         end
diff --git a/test/dump.test.lua b/test/dump.test.lua
index d0566bc1a23b..68473e6aaf01 100755
--- a/test/dump.test.lua
+++ b/test/dump.test.lua
@@ -24,9 +24,23 @@ local function rmpath(path)
 end
 
 function basic(test)
-    test:plan(2)
+    test:plan(10)
     test:is(type(dump.dump), "function", "Dump function is present")
     test:is(type(dump.restore), "function", "Restore function is present")
+    local status, msg = dump.dump('dir', {invalid = true})
+    test:is(status, nil, "Unknown dump option - status")
+    test:is(msg, "Invalid option 'invalid'", "Unknown dump option - message")
+    local status, msg = dump.restore('dir', {invalid = true})
+    test:is(status, nil, "Unknown restore option - status")
+    test:is(msg, "Invalid option 'invalid'", "Unknown restore option - message")
+    local status, msg = dump.dump('dir', {filter = true})
+    test:is(status, nil, "Invalid value of dump option - status")
+    test:is(msg, "Invalid value for option 'filter': expected function, got boolean", 
+        "Invalid value of dump option - message")
+    local status, msg = dump.restore('dir', {filter = true})
+    test:is(status, nil, "Invalid value of restore option - status")
+    test:is(msg, "Invalid value for option 'filter': expected function, got boolean", 
+        "Invalid value of restore option - message")
 end
 
 function box_is_configured(test)
@@ -99,6 +113,7 @@ local function dump_after_dump(test)
     test:is(not status, false, "First dump is successful")
     local status, msg = dump.dump(dir)
     test:is(not status, true, "Second dump fails")
+    box.space.test:drop()
     rmpath(dir)
 end
 
@@ -143,7 +158,65 @@ local function dump_hash_index(test)
     rmpath(dir)
 end
 
-test:plan(7)
+local function dump_filter(test)
+    test:plan(3)
+    local space = box.schema.space.create('test')
+    local space_id = space.id
+    space:create_index('pk')
+    space:insert{1, 'ignore'}
+    space:insert{2, 'update'}
+    space:insert{3, 'filter'}
+    local dir = fio.tempdir()
+    dump.dump(dir, {
+        filter = function(space, tuple)
+            if space.id ~= space_id then return tuple end
+            if tuple[2] == 'ignore' then return tuple end
+            if tuple[2] == 'filter' then return nil end
+            if tuple[2] == 'update' then
+                return tuple:update{{'=', 2, 'updated'}}
+            end
+        end,
+    })
+    space:drop()
+    dump.restore(dir)
+    space = box.space.test
+    test:is(space:get(1)[2], 'ignore', "Dump filter can ignore a tuple")
+    test:is(space:get(2)[2], 'updated', "Dump filter can update a tuples")
+    test:is(space:get(3), nil, "Dump filter can filter out a tuple")
+    space:drop()
+    rmpath(dir)
+end
+
+local function restore_filter(test)
+    test:plan(3)
+    local space = box.schema.space.create('test')
+    local space_id = space.id
+    space:create_index('pk')
+    space:insert{1, 'ignore'}
+    space:insert{2, 'update'}
+    space:insert{3, 'filter'}
+    local dir = fio.tempdir()
+    dump.dump(dir)
+    space:drop()
+    dump.restore(dir, {
+        filter = function(space, tuple)
+            if space.id ~= space_id then return tuple end
+            if tuple[2] == 'ignore' then return tuple end
+            if tuple[2] == 'filter' then return nil end
+            if tuple[2] == 'update' then
+                return tuple:update{{'=', 2, 'updated'}}
+            end
+        end,
+    })
+    space = box.space.test
+    test:is(space:get(1)[2], 'ignore', "Restore filter can ignore a tuple")
+    test:is(space:get(2)[2], 'updated', "Restore filter can update a tuple")
+    test:is(space:get(3), nil, "Restore filter can filter out a tuple")
+    space:drop()
+    rmpath(dir)
+end
+
+test:plan(9)
 
 test:test('Basics', basic)
 test:test('Using the rock without calling box.cfg{}', box_is_configured)
@@ -155,5 +228,7 @@ test:test('Dump into a non-writable directory', dump_access_denied)
 test:test('Dump into a non-empty directory', dump_after_dump)
 test:test('Restore of a non-existent path', restore_no_such_path)
 test:test('Dump and restore of a space with HASH primary key', dump_hash_index)
+test:test('Dump filter', dump_filter)
+test:test('Restore filter', restore_filter)
 
 os.exit(test:check() == true and 0 or -1)
-- 
2.11.0




More information about the Tarantool-patches mailing list