[patches] [V2 AVRO 3/3] Allow to preserve extra fields in AST and fingerprint
AKhatskevich
avkhatskevich at tarantool.org
Thu Feb 22 14:20:34 MSK 2018
From: "AKhatskevich avkhatskevich at tarantool.org" <avkhatskevich at gmail.com>
Add two options (tables of attr names) to `avro.create`:
- preserve_in_ast: attibutes which should not be deleted from AST
- preserve_in_fingerprint: attributes which shold be used in calculating
fingerprint (in addition to fields mentioned in Avro 1.8.2
specification)
This feature may be useful in case of creating frameworks which works
over AVRO.
Closes #31
---
CMakeLists.txt | 2 +-
avro_schema/fingerprint.lua | 29 ++++++----
avro_schema/frontend.lua | 56 ++++++++++++-------
avro_schema/init.lua | 57 +++++++++++++++++---
avro_schema/utils.lua | 12 +++++
test/api_tests.lua | 127 +++++++++++++++++++++++++++++++++++++++++++-
6 files changed, 246 insertions(+), 37 deletions(-)
create mode 100644 avro_schema/utils.lua
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b7a80da..5633fab 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -85,7 +85,7 @@ add_custom_target(postprocess_lua ALL DEPENDS
# Install module
install(FILES avro_schema/init.lua avro_schema/compiler.lua
avro_schema/frontend.lua avro_schema/runtime.lua
- avro_schema/fingerprint.lua
+ avro_schema/fingerprint.lua avro_schema/utils.lua
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/avro_schema)
install(FILES ${CMAKE_BINARY_DIR}/il.lua
diff --git a/avro_schema/fingerprint.lua b/avro_schema/fingerprint.lua
index caa6d22..115a371 100644
--- a/avro_schema/fingerprint.lua
+++ b/avro_schema/fingerprint.lua
@@ -20,23 +20,31 @@ local function is_primitive_type(xtype)
return false
end
-local function avro_json_array(data)
+local function avro_json_array(data, extra_fields)
local res = {}
for _, item in ipairs(data) do
- table.insert(res,avro_json(item))
+ table.insert(res,avro_json(item, extra_fields))
end
return string.format("[%s]", table.concat(res, ","))
end
-local function avro_json_object(data)
+local function avro_json_object(data, extra_fields)
local res = {}
local necessary_order = {
"name", "type", "fields", "symbols", "items", "values", "size"
}
+ --
+ -- There are a cases in which it is necessary to extend a schema.
+ -- The source below provides method to add those attrs in sustainable way.
+ --
+ for _, val in ipairs(extra_fields) do
+ table.insert(necessary_order, val)
+ end
+
for _, name in ipairs(necessary_order) do
local item = data[name]
if item ~= nil then
- local inner = avro_json(item)
+ local inner = avro_json(item, extra_fields)
inner = string.format([[%s:%s]], json.encode(name), inner)
table.insert(res, inner)
end
@@ -46,7 +54,10 @@ end
-- Takes normalized avro schema and produces normalized schema representation
-- encoded in json format.
-avro_json = function (data)
+avro_json = function (data, extra_fields)
+ extra_fields = extra_fields or {}
+ -- should be sorted for consistency
+ table.sort(extra_fields)
local xtype = type(data)
if is_primitive_type(xtype) then
return json.encode(data)
@@ -56,17 +67,17 @@ avro_json = function (data)
end
-- array
if #data > 0 then
- return avro_json_array(data)
+ return avro_json_array(data, extra_fields)
end
-- object (dict)
- return avro_json_object(data)
+ return avro_json_object(data, extra_fields)
end
-local function get_fingerprint(schema, algo, size)
+local function get_fingerprint(schema, algo, size, options)
if digest[algo] == nil or type(digest[algo]) ~= "function" then
raise_error("The hash function %s is not supported", algo)
end
- local fp = digest[algo](avro_json(schema))
+ local fp = digest[algo](avro_json(schema, options.preserve_in_fingerprint))
return fp:sub(1, size)
end
diff --git a/avro_schema/frontend.lua b/avro_schema/frontend.lua
index 646b448..33a9c8c 100644
--- a/avro_schema/frontend.lua
+++ b/avro_schema/frontend.lua
@@ -67,7 +67,7 @@ local floor = math.floor
local clear = require('table.clear')
local next, type = next, type
-function deepcopy(orig)
+local function deepcopy(orig)
local orig_type = type(orig)
local copy
if orig_type == 'table' then
@@ -131,6 +131,7 @@ local function type_tag(t)
return (type(t) == 'string' and t) or t.name or t.type
end
+local copy_schema
local copy_schema_error
local copy_schema_location_info
@@ -200,13 +201,22 @@ local dcache = setmetatable({}, { __mode = 'k' })
local copy_field_default
+local function copy_fields(from, to, fields)
+ for _,field in ipairs(fields) do
+ if from[field] ~= nil then
+ to[field] = deepcopy(from[field])
+ end
+ end
+end
-- create a private copy and sanitize recursively;
-- [ns] current ns (or nil)
-- [scope] a dictionary of named types (ocasionally used for unnamed too)
-- [open_rec] a set consisting of the current record + parent records;
-- it is used to reject records containing themselves
-copy_schema = function(schema, ns, scope, open_rec)
- local res, ptr -- we depend on these being locals #5 and #6
+-- [options] options table, contains:
+-- - preserve_in_ast: names of attrs which should not be deleted
+copy_schema = function(schema, ns, scope, open_rec, options)
+ local res, ptr -- we depend on these being locals #6 and #7
if type(schema) == 'table' then
if scope[schema] then
-- this check is necessary for unnamed complex types (union, array map)
@@ -219,7 +229,7 @@ copy_schema = function(schema, ns, scope, open_rec)
res = {}
for branchno, xbranch in ipairs(schema) do
ptr = branchno
- local branch = copy_schema(xbranch, ns, scope)
+ local branch = copy_schema(xbranch, ns, scope, nil, options)
local bxtype, bxname
if type(branch) == 'table' and not branch.type then
copy_schema_error('Union may not immediately contain other unions')
@@ -248,13 +258,21 @@ copy_schema = function(schema, ns, scope, open_rec)
nullable, xtype = extract_nullable(xtype)
if primitive_type[xtype] then
+ -- Preserve fields which are asked to be in AST.
+ res = {}
+ copy_fields(schema, res, options.preserve_in_ast)
-- primitive type normalization
- if nullable == nil then
+ if nullable == nil and not next(res) then
return xtype
end
- return {type = xtype, nullable = nullable}
+ res.type = xtype
+ res.nullable = nullable
+ return res
elseif xtype == 'record' then
- res = { type = 'record' }
+ -- Preserve fields which are asked to be in AST.
+ res = {}
+ copy_fields(schema, res, options.preserve_in_ast)
+ res.type = 'record'
res.nullable = nullable
local name, ns = checkname(schema, ns, scope)
scope[name] = res
@@ -298,19 +316,20 @@ copy_schema = function(schema, ns, scope, open_rec)
if not xtype then
copy_schema_error('Record field must have a "type"')
end
- field.type = copy_schema(xtype, ns, scope, open_rec)
+ field.type = copy_schema(xtype, ns, scope, open_rec,
+ options)
if open_rec[field.type] then
local path, n = {}
for i = 1, 1000000 do
- local _, res = debug.getlocal(i, 5)
+ local _, res = debug.getlocal(i, 6)
if res == field.type then
n = i
break
end
end
for i = n, 1, -1 do
- local _, res = debug.getlocal(i, 5)
- local _, ptr = debug.getlocal(i, 6)
+ local _, res = debug.getlocal(i, 6)
+ local _, ptr = debug.getlocal(i, 7)
insert(path, res.fields[ptr].name)
end
error(format('Record %s contains itself via %s',
@@ -389,7 +408,7 @@ copy_schema = function(schema, ns, scope, open_rec)
if not xitems then
copy_schema_error('Array type must have "items"')
end
- res.items = copy_schema(xitems, ns, scope)
+ res.items = copy_schema(xitems, ns, scope, nil, options)
scope[schema] = nil
return res
elseif xtype == 'map' then
@@ -399,7 +418,7 @@ copy_schema = function(schema, ns, scope, open_rec)
if not xvalues then
copy_schema_error('Map type must have "values"')
end
- res.values = copy_schema(xvalues, ns, scope)
+ res.values = copy_schema(xvalues, ns, scope, nil, options)
scope[schema] = nil
return res
elseif xtype == 'fixed' then
@@ -478,13 +497,14 @@ copy_schema_location_info = function()
local top, bottom = find_frames(copy_schema)
local res = {}
for i = bottom, top, -1 do
- local _, node = debug.getlocal(i, 5)
- local _, ptr = debug.getlocal(i, 6)
+ -- 6 and 7 are res and ptr vars from copy func
+ local _, node = debug.getlocal(i, 6)
+ local _, ptr = debug.getlocal(i, 7)
if type(node) == 'table' then
if node.type == nil then -- union
insert(res, '<union>')
if i <= top + 1 then
- local _, next_node = debug.getlocal(i - 1, 6)
+ local _, next_node = debug.getlocal(i - 1, 7)
if i == top or (i == top + 1 and
not (next_node and next_node.name)) then
insert(res, format('<branch-%d>', ptr))
@@ -525,8 +545,8 @@ copy_schema_error = function(fmt, ...)
end
-- validate schema definition (creates a copy)
-local function create_schema(schema)
- return copy_schema(schema, nil, {})
+local function create_schema(schema, options)
+ return copy_schema(schema, nil, {}, nil, options)
end
-- get a mapping from a (string) type tag -> union branch id
diff --git a/avro_schema/init.lua b/avro_schema/init.lua
index 621030d..c3989de 100644
--- a/avro_schema/init.lua
+++ b/avro_schema/init.lua
@@ -5,6 +5,7 @@ local il = require('avro_schema.il')
local backend_lua = require('avro_schema.backend')
local rt = require('avro_schema.runtime')
local fingerprint = require('avro_schema.fingerprint')
+local utils = require('avro_schema.utils')
local format, find, sub = string.format, string.find, string.sub
local insert, remove, concat = table.insert, table.remove, table.concat
@@ -22,6 +23,7 @@ local rt_universal_decode = rt.universal_decode
local install_lua_backend = backend_lua.install
-- We give away a handle but we never expose schema data.
+-- {schema=schema, options=options}
local schema_by_handle = setmetatable( {}, { __mode = 'k' } )
local function get_schema(handle)
@@ -29,7 +31,7 @@ local function get_schema(handle)
if not schema then
error(format('Not a schema: %s', handle), 0)
end
- return schema
+ return schema.schema
end
local function is_schema(schema_handle)
@@ -62,7 +64,7 @@ local function get_ir(from_schema, to_schema, inverse)
end
local function schema_to_string(handle)
- local schema = schema_by_handle[handle]
+ local schema = get_schema(handle)
return format('Schema (%s)',
handle[1] or (type(schema) ~= 'table' and schema) or
schema.name or schema.type or 'union')
@@ -119,16 +121,53 @@ augment_defaults = function(schema, visited)
end
end
+local function create_options_validate(options)
+ options = options or {}
+ options = table.deepcopy(options)
+ if type(options) ~= 'table' then
+ return false, "Options should be a table"
+ end
+ if type(options.preserve_in_ast) ~= 'table' then
+ options.preserve_in_ast = {}
+ end
+ for _, f_ast in ipairs(options.preserve_in_ast) do
+ if type(f_ast) ~= 'string' then
+ return false, "preserve fields should be of string type"
+ end
+ end
+ if type(options.preserve_in_fingerprint) ~= 'table' then
+ options.preserve_in_fingerprint = {}
+ end
+ -- preserve_in_fingerprint should not contain fields which are not
+ -- presented in preserve_in_ast
+ for _, f_f in ipairs(options.preserve_in_fingerprint) do
+ if type(f_f) ~= 'string' then
+ return false, "preserve fields should be of string type"
+ end
+ if not utils.table_contains(options.preserve_in_ast, f_f) then
+ return false, "fingerprint should contain only fields from AST"
+ end
+ end
+ return true, options
+end
+
local function create(raw_schema, options)
- local ok, schema = pcall(f_create_schema, raw_schema)
+ local ok
+ ok, options = create_options_validate(options)
+ if ok == false then
+ return false, options
+ end
+ local schema
+ ok, schema = pcall(f_create_schema, raw_schema, options)
if not ok then
return false, schema
end
- if type(options) == 'table' and options.defaults == 'auto' then
+ if options.defaults == 'auto' then
augment_defaults(schema, {})
end
local schema_handle = setmetatable({}, schema_handle_mt)
- schema_by_handle[schema_handle] = schema
+ schema_by_handle[schema_handle] = {schema = schema,
+ options = options}
return true, schema_handle
end
@@ -511,10 +550,12 @@ end
local function export(schema_h)
return export_helper(get_schema(schema_h), {})
end
-local function get_fingerprint(schema_h, algo, size)
- if algo == nil then algo = "sha256" end
+local function get_fingerprint(schema_h, hash, size)
+ if hash == nil then hash = "sha256" end
if size == nil then size = 8 end
- return fingerprint.get_fingerprint(get_schema(schema_h), algo, size)
+ local schema = schema_by_handle[schema_h]
+ return fingerprint.get_fingerprint(schema.schema, hash,
+ size, schema.options)
end
local function to_json(schema_h)
return fingerprint.avro_json(get_schema(schema_h))
diff --git a/avro_schema/utils.lua b/avro_schema/utils.lua
new file mode 100644
index 0000000..da10c25
--- /dev/null
+++ b/avro_schema/utils.lua
@@ -0,0 +1,12 @@
+local function table_contains(t, xval)
+ for k, val in ipairs(t) do
+ if type(k) == "number" and val == xval then
+ return true
+ end
+ end
+ return false
+end
+
+return {
+ table_contains = table_contains
+}
\ No newline at end of file
diff --git a/test/api_tests.lua b/test/api_tests.lua
index 404768b..806c372 100644
--- a/test/api_tests.lua
+++ b/test/api_tests.lua
@@ -5,7 +5,7 @@ local msgpack = require('msgpack')
local test = tap.test('api-tests')
-test:plan(54)
+test:plan(64)
test:is_deeply({schema.create()}, {false, 'Unknown Avro type: nil'},
'error unknown type')
@@ -295,5 +295,130 @@ for i, testcase in ipairs(fingerprint_testcases) do
"Fingerprint testcase ".. i)
end
+local schema_preserve_fields_testcases = {
+ {
+ name = "1",
+ schema = {
+ type="int",
+ extra_field="extra_field"
+ },
+ options = {},
+ ast = "int"
+ },
+ {
+ name = "2",
+ schema = {
+ type="int",
+ extra_field="extra_field"
+ },
+ options = {preserve_in_ast={"extra_field"}},
+ ast = {
+ type="int",
+ extra_field="extra_field"
+ }
+ },
+ {
+ name = "3-complex",
+ schema = {
+ type="int",
+ extra_field={extra_field={"extra_field"}}
+ },
+ options = {preserve_in_ast={"extra_field"}},
+ ast = {
+ type="int",
+ extra_field={extra_field={"extra_field"}}
+ }
+ }
+}
+
+for _, testcase in ipairs(schema_preserve_fields_testcases) do
+ res = {schema.create(testcase.schema, testcase.options)}
+ test:is_deeply(schema.export(res[2]), testcase.ast,
+ 'schema extra fields ' .. testcase.name)
+end
+
+test:is_deeply(
+ {schema.create("int", {
+ preserve_in_ast={},
+ preserve_in_fingerprint={"extra_field"},
+ })},
+ {false, "fingerprint should contain only fields from AST"},
+ 'preserve_in_fingerprint contains more fields than AST')
+
+ local fingerprint
+ res = {schema.create(
+ {
+ type = "record",
+ name = "test",
+ extra_field = "extra_field",
+ fields = {
+ {
+ name = "bar",
+ type = "null",
+ default = msgpack.NULL,
+ extra_field = "extra"
+ },
+ {
+ name = "foo",
+ type = {"null", "int"},
+ default = msgpack.NULL
+ },
+ }
+ }, nil)}
+ fingerprint = schema.fingerprint(res[2], "sha256", 32)
+test:is(string.lower(string.tohex(fingerprint)),
+ "a64098ee437e9020923c6005db88f37a234ed60daae23b26e33d8ae1bf643356",
+ "Fingerprint extra fields 1")
+
+res = {schema.create(
+ {
+ type = "record",
+ name = "test",
+ extra_field = "extra_field",
+ fields = {
+ {
+ name = "bar", type = "null",
+ default = msgpack.NULL, extra_field = "extra"
+ },
+ {
+ name = "foo",
+ type = {"null", "int"},
+ default = msgpack.NULL
+ },
+ }
+ },
+ {
+ preserve_in_ast={"extra_field"},
+ preserve_in_fingerprint={"extra_field"}
+ })
+}
+fingerprint = schema.fingerprint(res[2], "sha256", 32)
+test:is(string.lower(string.tohex(fingerprint)),
+ "70bd295335daafff0a4512cadc39a4298cd81c460defec530c7372bdd1ec6f44",
+ "Fingerprint extra fields 2")
+
+res = {schema.create(
+ {
+ type = "int",
+ extra_field = "extra_field",
+ },
+ {
+ preserve_in_ast={"extra_field"}
+ })
+}
+ fingerprint = schema.fingerprint(res[2], "sha256", 32)
+ test:is_deeply(schema.export(res[2]),
+ {type = "int", extra_field = "extra_field"},
+ "Prevent primitive type collapse by extra field")
+
+-- avro_json is used for fingerprint
+fingerprint = require("avro_schema.fingerprint")
+test:is(fingerprint.avro_json({field1="1"}), "{}", "avro_json 1")
+test:is(fingerprint.avro_json({field1="1"}, {"field1"}),
+ '{"field1":"1"}', "avro_json 2")
+test:is(fingerprint.avro_json({field2="1", field1="1"}, {"field2", "field1"}),
+ '{"field1":"1","field2":"1"}', "avro_json 3 order")
+
+
test:check()
os.exit(test.planned == test.total and test.failed == 0 and 0 or -1)
--
2.14.1
More information about the Tarantool-patches
mailing list