[patches] [V2 AVRO 3/3] Allow to preserve extra fields in AST and fingerprint

AKhatskevich avkhatskevich at tarantool.org
Thu Feb 22 14:20:34 MSK 2018


From: "AKhatskevich avkhatskevich at tarantool.org" <avkhatskevich at gmail.com>

Add two options (tables of attr names) to `avro.create`:
 - preserve_in_ast: attibutes which should not be deleted from AST
 - preserve_in_fingerprint: attributes which shold be used in calculating
   fingerprint (in addition to fields mentioned in Avro 1.8.2
   specification)

This feature may be useful in case of creating frameworks which works
over AVRO.

Closes #31
---
 CMakeLists.txt              |   2 +-
 avro_schema/fingerprint.lua |  29 ++++++----
 avro_schema/frontend.lua    |  56 ++++++++++++-------
 avro_schema/init.lua        |  57 +++++++++++++++++---
 avro_schema/utils.lua       |  12 +++++
 test/api_tests.lua          | 127 +++++++++++++++++++++++++++++++++++++++++++-
 6 files changed, 246 insertions(+), 37 deletions(-)
 create mode 100644 avro_schema/utils.lua

diff --git a/CMakeLists.txt b/CMakeLists.txt
index b7a80da..5633fab 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -85,7 +85,7 @@ add_custom_target(postprocess_lua ALL DEPENDS
 # Install module
 install(FILES avro_schema/init.lua avro_schema/compiler.lua
               avro_schema/frontend.lua avro_schema/runtime.lua
-              avro_schema/fingerprint.lua
+              avro_schema/fingerprint.lua avro_schema/utils.lua
         DESTINATION ${TARANTOOL_INSTALL_LUADIR}/avro_schema)
 
 install(FILES ${CMAKE_BINARY_DIR}/il.lua
diff --git a/avro_schema/fingerprint.lua b/avro_schema/fingerprint.lua
index caa6d22..115a371 100644
--- a/avro_schema/fingerprint.lua
+++ b/avro_schema/fingerprint.lua
@@ -20,23 +20,31 @@ local function is_primitive_type(xtype)
     return false
 end
 
-local function avro_json_array(data)
+local function avro_json_array(data, extra_fields)
     local res = {}
     for _, item in ipairs(data) do
-        table.insert(res,avro_json(item))
+        table.insert(res,avro_json(item, extra_fields))
     end
     return string.format("[%s]", table.concat(res, ","))
 end
 
-local function avro_json_object(data)
+local function avro_json_object(data, extra_fields)
     local res = {}
     local necessary_order = {
         "name", "type", "fields", "symbols", "items", "values", "size"
     }
+    --
+    -- There are a cases in which it is necessary to extend a schema.
+    -- The source below provides method to add those attrs in sustainable way.
+    --
+    for _, val in ipairs(extra_fields) do
+        table.insert(necessary_order, val)
+    end
+
     for _, name in ipairs(necessary_order) do
         local item = data[name]
         if item ~= nil then
-            local inner = avro_json(item)
+            local inner = avro_json(item, extra_fields)
             inner = string.format([[%s:%s]], json.encode(name), inner)
             table.insert(res, inner)
         end
@@ -46,7 +54,10 @@ end
 
 -- Takes normalized avro schema and produces normalized schema representation
 -- encoded in json format.
-avro_json = function (data)
+avro_json = function (data, extra_fields)
+    extra_fields = extra_fields or {}
+    -- should be sorted for consistency
+    table.sort(extra_fields)
     local xtype = type(data)
     if is_primitive_type(xtype) then
         return json.encode(data)
@@ -56,17 +67,17 @@ avro_json = function (data)
     end
     -- array
     if #data > 0 then
-        return avro_json_array(data)
+        return avro_json_array(data, extra_fields)
     end
     -- object (dict)
-    return avro_json_object(data)
+    return avro_json_object(data, extra_fields)
 end
 
-local function get_fingerprint(schema, algo, size)
+local function get_fingerprint(schema, algo, size, options)
     if digest[algo] == nil or type(digest[algo]) ~= "function" then
         raise_error("The hash function %s is not supported", algo)
     end
-    local fp = digest[algo](avro_json(schema))
+    local fp = digest[algo](avro_json(schema, options.preserve_in_fingerprint))
     return fp:sub(1, size)
 end
 
diff --git a/avro_schema/frontend.lua b/avro_schema/frontend.lua
index 646b448..33a9c8c 100644
--- a/avro_schema/frontend.lua
+++ b/avro_schema/frontend.lua
@@ -67,7 +67,7 @@ local floor = math.floor
 local clear = require('table.clear')
 local next, type = next, type
 
-function deepcopy(orig)
+local function deepcopy(orig)
     local orig_type = type(orig)
     local copy
     if orig_type == 'table' then
@@ -131,6 +131,7 @@ local function type_tag(t)
     return (type(t) == 'string' and t) or t.name or t.type
 end
 
+local copy_schema
 local copy_schema_error
 local copy_schema_location_info
 
@@ -200,13 +201,22 @@ local dcache = setmetatable({}, { __mode = 'k' })
 
 local copy_field_default
 
+local function copy_fields(from, to, fields)
+    for _,field in ipairs(fields) do
+        if from[field] ~= nil then
+            to[field] = deepcopy(from[field])
+        end
+    end
+end
 -- create a private copy and sanitize recursively;
 -- [ns]       current ns (or nil)
 -- [scope]    a dictionary of named types (ocasionally used for unnamed too)
 -- [open_rec] a set consisting of the current record + parent records;
 --            it is used to reject records containing themselves
-copy_schema = function(schema, ns, scope, open_rec)
-    local res, ptr -- we depend on these being locals #5 and #6
+-- [options]  options table, contains:
+--             - preserve_in_ast: names of attrs which should not be deleted
+copy_schema = function(schema, ns, scope, open_rec, options)
+    local res, ptr -- we depend on these being locals #6 and #7
     if type(schema) == 'table' then
         if scope[schema] then
             -- this check is necessary for unnamed complex types (union, array map)
@@ -219,7 +229,7 @@ copy_schema = function(schema, ns, scope, open_rec)
             res = {}
             for branchno, xbranch in ipairs(schema) do
                 ptr = branchno
-                local branch = copy_schema(xbranch, ns, scope)
+                local branch = copy_schema(xbranch, ns, scope, nil, options)
                 local bxtype, bxname
                 if type(branch) == 'table' and not branch.type then
                     copy_schema_error('Union may not immediately contain other unions')
@@ -248,13 +258,21 @@ copy_schema = function(schema, ns, scope, open_rec)
             nullable, xtype = extract_nullable(xtype)
 
             if primitive_type[xtype] then
+                -- Preserve fields which are asked to be in AST.
+                res = {}
+                copy_fields(schema, res, options.preserve_in_ast)
                 -- primitive type normalization
-                if nullable == nil then
+                if nullable == nil and not next(res) then
                     return xtype
                 end
-                return {type = xtype, nullable = nullable}
+                res.type = xtype
+                res.nullable = nullable
+                return res
             elseif xtype == 'record' then
-                res = { type = 'record' }
+                -- Preserve fields which are asked to be in AST.
+                res = {}
+                copy_fields(schema, res, options.preserve_in_ast)
+                res.type = 'record'
                 res.nullable = nullable
                 local name, ns = checkname(schema, ns, scope)
                 scope[name] = res
@@ -298,19 +316,20 @@ copy_schema = function(schema, ns, scope, open_rec)
                     if not xtype then
                         copy_schema_error('Record field must have a "type"')
                     end
-                    field.type = copy_schema(xtype, ns, scope, open_rec)
+                    field.type = copy_schema(xtype, ns, scope, open_rec, 
+                                             options)
                     if open_rec[field.type] then
                         local path, n = {}
                         for i = 1, 1000000 do
-                            local _, res = debug.getlocal(i, 5)
+                            local _, res = debug.getlocal(i, 6)
                             if res == field.type then
                                 n = i
                                 break
                             end
                         end
                         for i = n, 1, -1 do
-                            local _, res = debug.getlocal(i, 5)
-                            local _, ptr = debug.getlocal(i, 6)
+                            local _, res = debug.getlocal(i, 6)
+                            local _, ptr = debug.getlocal(i, 7)
                             insert(path, res.fields[ptr].name)
                         end
                         error(format('Record %s contains itself via %s',
@@ -389,7 +408,7 @@ copy_schema = function(schema, ns, scope, open_rec)
                 if not xitems then
                     copy_schema_error('Array type must have "items"')
                 end
-                res.items = copy_schema(xitems, ns, scope)
+                res.items = copy_schema(xitems, ns, scope, nil, options)
                 scope[schema] = nil
                 return res
             elseif xtype == 'map' then
@@ -399,7 +418,7 @@ copy_schema = function(schema, ns, scope, open_rec)
                 if not xvalues then
                     copy_schema_error('Map type must have "values"')
                 end
-                res.values = copy_schema(xvalues, ns, scope)
+                res.values = copy_schema(xvalues, ns, scope, nil, options)
                 scope[schema] = nil
                 return res
             elseif xtype == 'fixed' then
@@ -478,13 +497,14 @@ copy_schema_location_info = function()
     local top, bottom = find_frames(copy_schema)
     local res = {}
     for i = bottom, top, -1 do
-        local _, node = debug.getlocal(i, 5)
-        local _, ptr  = debug.getlocal(i, 6)
+        -- 6 and 7 are res and ptr vars from copy func
+        local _, node = debug.getlocal(i, 6)
+        local _, ptr  = debug.getlocal(i, 7)
         if type(node) == 'table' then
             if node.type == nil then -- union
                 insert(res, '<union>')
                 if i <= top + 1 then
-                    local _, next_node = debug.getlocal(i - 1, 6)
+                    local _, next_node = debug.getlocal(i - 1, 7)
                     if i == top or (i == top + 1 and
                                     not (next_node and next_node.name)) then
                         insert(res, format('<branch-%d>', ptr))
@@ -525,8 +545,8 @@ copy_schema_error = function(fmt, ...)
 end
 
 -- validate schema definition (creates a copy)
-local function create_schema(schema)
-    return copy_schema(schema, nil, {})
+local function create_schema(schema, options)
+    return copy_schema(schema, nil, {}, nil, options)
 end
 
 -- get a mapping from a (string) type tag -> union branch id
diff --git a/avro_schema/init.lua b/avro_schema/init.lua
index 621030d..c3989de 100644
--- a/avro_schema/init.lua
+++ b/avro_schema/init.lua
@@ -5,6 +5,7 @@ local il          = require('avro_schema.il')
 local backend_lua = require('avro_schema.backend')
 local rt          = require('avro_schema.runtime')
 local fingerprint = require('avro_schema.fingerprint')
+local utils       = require('avro_schema.utils')
 
 local format, find, sub = string.format, string.find, string.sub
 local insert, remove, concat = table.insert, table.remove, table.concat
@@ -22,6 +23,7 @@ local rt_universal_decode = rt.universal_decode
 local install_lua_backend = backend_lua.install
 
 -- We give away a handle but we never expose schema data.
+-- {schema=schema, options=options}
 local schema_by_handle = setmetatable( {}, { __mode = 'k' } )
 
 local function get_schema(handle)
@@ -29,7 +31,7 @@ local function get_schema(handle)
     if not schema then
         error(format('Not a schema: %s', handle), 0)
     end
-    return schema
+    return schema.schema
 end
 
 local function is_schema(schema_handle)
@@ -62,7 +64,7 @@ local function get_ir(from_schema, to_schema, inverse)
 end
 
 local function schema_to_string(handle)
-    local schema = schema_by_handle[handle]
+    local schema = get_schema(handle)
     return format('Schema (%s)',
                   handle[1] or (type(schema) ~= 'table' and schema) or
                   schema.name or schema.type or 'union')
@@ -119,16 +121,53 @@ augment_defaults = function(schema, visited)
     end
 end
 
+local function create_options_validate(options)
+    options = options or {}
+    options = table.deepcopy(options)
+    if type(options) ~= 'table' then
+        return false, "Options should be a table"
+    end
+    if type(options.preserve_in_ast) ~= 'table' then
+        options.preserve_in_ast = {}
+    end
+    for _, f_ast in ipairs(options.preserve_in_ast) do
+        if type(f_ast) ~= 'string' then
+            return false, "preserve fields should be of string type"
+        end
+    end
+    if type(options.preserve_in_fingerprint) ~= 'table' then
+        options.preserve_in_fingerprint = {}
+    end
+    -- preserve_in_fingerprint should not contain fields which are not
+    -- presented in preserve_in_ast
+    for _, f_f in ipairs(options.preserve_in_fingerprint) do
+        if type(f_f) ~= 'string' then
+            return false, "preserve fields should be of string type"
+        end
+        if not utils.table_contains(options.preserve_in_ast, f_f) then
+            return false, "fingerprint should contain only fields from AST"
+        end
+    end
+    return true, options
+end
+
 local function create(raw_schema, options)
-    local ok, schema = pcall(f_create_schema, raw_schema)
+    local ok
+    ok, options = create_options_validate(options)
+    if ok == false then
+        return false, options
+    end
+    local schema
+    ok, schema = pcall(f_create_schema, raw_schema, options)
     if not ok then
         return false, schema
     end
-    if type(options) == 'table' and options.defaults == 'auto' then
+    if options.defaults == 'auto' then
         augment_defaults(schema, {})
     end
     local schema_handle = setmetatable({}, schema_handle_mt)
-    schema_by_handle[schema_handle] = schema
+    schema_by_handle[schema_handle] = {schema = schema,
+                                       options = options}
     return true, schema_handle
 end
 
@@ -511,10 +550,12 @@ end
 local function export(schema_h)
     return export_helper(get_schema(schema_h), {})
 end
-local function get_fingerprint(schema_h, algo, size)
-    if algo == nil then algo = "sha256" end
+local function get_fingerprint(schema_h, hash, size)
+    if hash == nil then hash = "sha256" end
     if size == nil then size = 8 end
-    return fingerprint.get_fingerprint(get_schema(schema_h), algo, size)
+    local schema = schema_by_handle[schema_h]
+    return fingerprint.get_fingerprint(schema.schema, hash,
+                                       size, schema.options)
 end
 local function to_json(schema_h)
     return fingerprint.avro_json(get_schema(schema_h))
diff --git a/avro_schema/utils.lua b/avro_schema/utils.lua
new file mode 100644
index 0000000..da10c25
--- /dev/null
+++ b/avro_schema/utils.lua
@@ -0,0 +1,12 @@
+local function table_contains(t, xval)
+    for k, val in ipairs(t) do
+        if type(k) == "number" and val == xval then
+            return true
+        end
+    end
+    return false
+end
+
+return {
+    table_contains = table_contains
+}
\ No newline at end of file
diff --git a/test/api_tests.lua b/test/api_tests.lua
index 404768b..806c372 100644
--- a/test/api_tests.lua
+++ b/test/api_tests.lua
@@ -5,7 +5,7 @@ local msgpack = require('msgpack')
 
 local test = tap.test('api-tests')
 
-test:plan(54)
+test:plan(64)
 
 test:is_deeply({schema.create()}, {false, 'Unknown Avro type: nil'},
                'error unknown type')
@@ -295,5 +295,130 @@ for i, testcase in ipairs(fingerprint_testcases) do
             "Fingerprint testcase ".. i)
 end
 
+local schema_preserve_fields_testcases = {
+    {
+        name = "1",
+        schema = {
+            type="int",
+            extra_field="extra_field"
+        },
+        options = {},
+        ast = "int"
+    },
+    {
+        name = "2",
+        schema = {
+            type="int",
+            extra_field="extra_field"
+        },
+        options = {preserve_in_ast={"extra_field"}},
+        ast = {
+            type="int",
+            extra_field="extra_field"
+        }
+    },
+    {
+        name = "3-complex",
+        schema = {
+            type="int",
+            extra_field={extra_field={"extra_field"}}
+        },
+        options = {preserve_in_ast={"extra_field"}},
+        ast = {
+            type="int",
+            extra_field={extra_field={"extra_field"}}
+        }
+    }
+}
+
+for _, testcase in ipairs(schema_preserve_fields_testcases) do
+    res = {schema.create(testcase.schema, testcase.options)}
+    test:is_deeply(schema.export(res[2]), testcase.ast,
+                   'schema extra fields ' .. testcase.name)
+end
+
+test:is_deeply(
+        {schema.create("int", {
+                                preserve_in_ast={},
+                                preserve_in_fingerprint={"extra_field"},
+                             })},
+        {false, "fingerprint should contain only fields from AST"},
+        'preserve_in_fingerprint contains more fields than AST')
+
+        local fingerprint
+        res = {schema.create(
+        {
+            type = "record",
+            name = "test",
+            extra_field = "extra_field",
+            fields = {
+                { 
+                    name = "bar", 
+                    type = "null", 
+                    default = msgpack.NULL, 
+                    extra_field = "extra"
+                },
+                { 
+                    name = "foo",
+                    type = {"null", "int"},
+                    default = msgpack.NULL 
+                },
+            }
+        }, nil)}
+        fingerprint = schema.fingerprint(res[2], "sha256", 32)
+test:is(string.lower(string.tohex(fingerprint)),
+        "a64098ee437e9020923c6005db88f37a234ed60daae23b26e33d8ae1bf643356",
+        "Fingerprint extra fields 1")
+
+res = {schema.create(
+    {
+        type = "record",
+        name = "test",
+        extra_field = "extra_field",
+        fields = {
+            { 
+                name = "bar", type = "null", 
+                default = msgpack.NULL, extra_field = "extra" 
+            },
+            { 
+                name = "foo", 
+                type = {"null", "int"}, 
+                default = msgpack.NULL
+            },
+        }
+    },
+    {
+        preserve_in_ast={"extra_field"},
+        preserve_in_fingerprint={"extra_field"}
+    })
+}
+fingerprint = schema.fingerprint(res[2], "sha256", 32)
+test:is(string.lower(string.tohex(fingerprint)),
+    "70bd295335daafff0a4512cadc39a4298cd81c460defec530c7372bdd1ec6f44",
+    "Fingerprint extra fields 2")
+
+res = {schema.create(
+    {
+        type = "int",
+        extra_field = "extra_field",
+    }, 
+    {
+        preserve_in_ast={"extra_field"}
+    })
+}
+    fingerprint = schema.fingerprint(res[2], "sha256", 32)
+    test:is_deeply(schema.export(res[2]), 
+    {type = "int", extra_field = "extra_field"},
+    "Prevent primitive type collapse by extra field")
+
+-- avro_json is used for fingerprint
+fingerprint = require("avro_schema.fingerprint")
+test:is(fingerprint.avro_json({field1="1"}), "{}", "avro_json 1")
+test:is(fingerprint.avro_json({field1="1"}, {"field1"}),
+        '{"field1":"1"}', "avro_json 2")
+test:is(fingerprint.avro_json({field2="1", field1="1"}, {"field2", "field1"}),
+        '{"field1":"1","field2":"1"}', "avro_json 3 order")
+
+
 test:check()
 os.exit(test.planned == test.total and test.failed == 0 and 0 or -1)
-- 
2.14.1




More information about the Tarantool-patches mailing list