[PATCH 4/8] netbox: extend codec with 'decode' methods
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Mon Apr 16 21:39:14 MSK 2018
Netbox has a table 'method_codec' that is used to encode a
request by a method name. But a response is decoded out of codec.
It leads to
1) decoding into Lua tables before decoding into tuples where
needed - it is double decoding and produces a lot of garbage;
2) each method contains hacks like one_tuple(), or single tuple
check.
These things can not be fixed with no real codec instead of
encoder only.
Also global table with decoders is needed for #3107, where
a request could be sent async with no fiber blocking. An async
response when received already does not have a call context - it
has only method name.
Needed for #3107
---
src/box/lua/net_box.lua | 116 +++++++++++++++++++++++++++++-------------------
test/box/net.box.result | 14 ++++++
test/box/sql.result | 2 +
3 files changed, 87 insertions(+), 45 deletions(-)
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 4ed2b375d..3868cdf1c 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -50,7 +50,34 @@ local E_PROC_LUA = box.error.PROC_LUA
-- utility tables
local is_final_state = {closed = 1, error = 1}
-local method_codec = {
+
+local function decode_nil(...) end
+local function decode_nothing(...) return ... end
+local function decode_one_tuple(response)
+ if response[1] then
+ return box.tuple.new(response[1])
+ end
+end
+local function decode_single_tuple(response)
+ if response[2] then
+ return nil, box.error.MORE_THAN_ONE_TUPLE
+ end
+ if response[1] then
+ return box.tuple.new(response[1])
+ end
+end
+local function decode_select(response)
+ setmetatable(response, sequence_mt)
+ for i, v in pairs(response) do
+ response[i] = box.tuple.new(v)
+ end
+ return response
+end
+local function decode_count(response)
+ return response[1]
+end
+
+local method_encoder = {
ping = internal.encode_ping,
call_16 = internal.encode_call_16,
call_17 = internal.encode_call,
@@ -61,6 +88,10 @@ local method_codec = {
update = internal.encode_update,
upsert = internal.encode_upsert,
select = internal.encode_select,
+ get = internal.encode_select,
+ min = internal.encode_select,
+ max = internal.encode_select,
+ count = internal.encode_call,
-- inject raw data into connection, used by console and tests
inject = function(buf, id, schema_version, bytes)
local ptr = buf:reserve(#bytes)
@@ -69,6 +100,24 @@ local method_codec = {
end
}
+local method_decoder = {
+ ping = decode_nil,
+ call_16 = decode_select,
+ call_17 = decode_nothing,
+ eval = decode_nothing,
+ insert = decode_one_tuple,
+ replace = decode_one_tuple,
+ delete = decode_one_tuple,
+ update = decode_one_tuple,
+ upsert = decode_nil,
+ select = decode_select,
+ get = decode_single_tuple,
+ min = decode_single_tuple,
+ max = decode_single_tuple,
+ count = decode_count,
+ inject = decode_nothing,
+}
+
local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
--
@@ -278,7 +327,7 @@ local function create_transport(host, port, user, password, callback,
worker_fiber:wakeup()
end
local id = next_request_id
- method_codec[method](send_buf, id, schema_version, ...)
+ method_encoder[method](send_buf, id, schema_version, ...)
next_request_id = next_id(id)
local request = table_new(0, 6) -- reserve space for 6 keys
request.client = fiber_self()
@@ -336,7 +385,10 @@ local function create_transport(host, port, user, password, callback,
-- Decode xrow.body[DATA] to Lua objects
body, body_end_check = decode(body_rpos)
assert(body_end == body_end_check, "invalid xrow length")
- request.response = body[IPROTO_DATA_KEY]
+ if body and body[IPROTO_DATA_KEY] then
+ request.response, request.errno =
+ method_decoder[request.method](body[IPROTO_DATA_KEY])
+ end
wakeup_client(request.client)
end
@@ -417,7 +469,7 @@ local function create_transport(host, port, user, password, callback,
log.warn("Netbox text protocol support is deprecated since 1.10, "..
"please use require('console').connect() instead")
local setup_delimiter = 'require("console").delimiter("$EOF$")\n'
- method_codec.inject(send_buf, nil, nil, setup_delimiter)
+ method_encoder.inject(send_buf, nil, nil, setup_delimiter)
local err, response = send_and_recv_console()
if err then
return error_sm(err, response)
@@ -835,18 +887,8 @@ function remote_methods:_request(method, opts, ...)
end
err, res = perform_request(timeout, buffer, method,
self.schema_version, ...)
- if not err and buffer ~= nil then
- return res -- the length of xrow.body
- elseif not err then
- setmetatable(res, sequence_mt)
- local postproc = method ~= 'eval' and method ~= 'call_17'
- if postproc then
- local tnew = box.tuple.new
- for i, v in pairs(res) do
- res[i] = tnew(v)
- end
- end
- return res -- decoded xrow.body[DATA]
+ if not err then
+ return res
elseif err == E_WRONG_SCHEMA_VERSION then
err = nil
end
@@ -1056,25 +1098,17 @@ function console_methods:eval(line, timeout)
return res[1] or res
end
-local function one_tuple(tab)
- if type(tab) ~= 'table' then
- return tab
- elseif tab[1] ~= nil then
- return tab[1]
- end
-end
-
space_metatable = function(remote)
local methods = {}
function methods:insert(tuple, opts)
check_space_arg(self, 'insert')
- return one_tuple(remote:_request('insert', opts, self.id, tuple))
+ return remote:_request('insert', opts, self.id, tuple)
end
function methods:replace(tuple, opts)
check_space_arg(self, 'replace')
- return one_tuple(remote:_request('replace', opts, self.id, tuple))
+ return remote:_request('replace', opts, self.id, tuple)
end
function methods:select(key, opts)
@@ -1094,8 +1128,7 @@ space_metatable = function(remote)
function methods:upsert(key, oplist, opts)
check_space_arg(self, 'upsert')
- remote:_request('upsert', opts, self.id, key, oplist)
- return
+ return remote:_request('upsert', opts, self.id, key, oplist)
end
function methods:get(key, opts)
@@ -1133,10 +1166,8 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:get() doesn't support `buffer` argument")
end
- local res = remote:_request('select', opts, self.space.id, self.id,
- box.index.EQ, 0, 2, key)
- if res[2] ~= nil then box.error(box.error.MORE_THAN_ONE_TUPLE) end
- if res[1] ~= nil then return res[1] end
+ return remote:_request('get', opts, self.space.id, self.id,
+ box.index.EQ, 0, 2, key)
end
function methods:min(key, opts)
@@ -1144,9 +1175,8 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:min() doesn't support `buffer` argument")
end
- local res = remote:_request('select', opts, self.space.id, self.id,
- box.index.GE, 0, 1, key)
- return one_tuple(res)
+ return remote:_request('get', opts, self.space.id, self.id,
+ box.index.GE, 0, 1, key)
end
function methods:max(key, opts)
@@ -1154,9 +1184,8 @@ index_metatable = function(remote)
if opts and opts.buffer then
error("index:max() doesn't support `buffer` argument")
end
- local res = remote:_request('select', opts, self.space.id, self.id,
- box.index.LE, 0, 1, key)
- return one_tuple(res)
+ return remote:_request('get', opts, self.space.id, self.id,
+ box.index.LE, 0, 1, key)
end
function methods:count(key, opts)
@@ -1166,21 +1195,18 @@ index_metatable = function(remote)
end
local code = string.format('box.space.%s.index.%s:count',
self.space.name, self.name)
- return remote:_request('call_16', opts, code, { key })[1][1]
+ return remote:_request('count', opts, code, { key })
end
function methods:delete(key, opts)
check_index_arg(self, 'delete')
- local res = remote:_request('delete', opts, self.space.id, self.id,
- key)
- return one_tuple(res)
+ return remote:_request('delete', opts, self.space.id, self.id, key)
end
function methods:update(key, oplist, opts)
check_index_arg(self, 'update')
- local res = remote:_request('update', opts, self.space.id, self.id,
- key, oplist)
- return one_tuple(res)
+ return remote:_request('update', opts, self.space.id, self.id, key,
+ oplist)
end
return { __index = methods, __metatable = false }
diff --git a/test/box/net.box.result b/test/box/net.box.result
index cf7b27f0b..6a3713fc0 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -416,9 +416,11 @@ cn.space.net_box_test_space:select({234}, { iterator = 'LT' })
...
cn.space.net_box_test_space:update({1}, { { '+', 2, 2 } })
---
+- null
...
cn.space.net_box_test_space:delete{1}
---
+- null
...
cn.space.net_box_test_space:delete{2}
---
@@ -426,6 +428,7 @@ cn.space.net_box_test_space:delete{2}
...
cn.space.net_box_test_space:delete{2}
---
+- null
...
-- test one-based indexing in splice operation (see update.test.lua)
cn.space.net_box_test_space:replace({10, 'abcde'})
@@ -754,12 +757,15 @@ remote_space:upsert({3}, {}, { timeout = 1e-9 })
...
remote_space:upsert({4}, {})
---
+- null
...
remote_space:upsert({5}, {}, { timeout = 1.00 })
---
+- null
...
remote_space:upsert({3}, {})
---
+- null
...
remote_space:update({3}, {}, { timeout = 1e-9 })
---
@@ -981,12 +987,15 @@ _ = remote_pk:delete({5})
...
remote_space:get(0)
---
+- null
...
remote_space:get(1)
---
+- null
...
remote_space:get(2)
---
+- null
...
remote_space = nil
---
@@ -1318,6 +1327,7 @@ c.space.test:select{}
...
c.space.test:upsert({1, 2, 'nothing'}, {{'+', 2, 1}}) -- common update
---
+- null
...
c.space.test:select{}
---
@@ -1325,6 +1335,7 @@ c.space.test:select{}
...
c.space.test:upsert({2, 4, 'something'}, {{'+', 2, 1}}) -- insert
---
+- null
...
c.space.test:select{}
---
@@ -1333,6 +1344,7 @@ c.space.test:select{}
...
c.space.test:upsert({2, 4, 'nothing'}, {{'+', 3, 100500}}) -- wrong operation
---
+- null
...
c.space.test:select{}
---
@@ -1481,6 +1493,7 @@ result
-- upsert
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
+- 7
...
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
---
@@ -1492,6 +1505,7 @@ result
-- delete
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
+- 7
...
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
---
diff --git a/test/box/sql.result b/test/box/sql.result
index 11a698850..95f8da7dd 100644
--- a/test/box/sql.result
+++ b/test/box/sql.result
@@ -105,6 +105,7 @@ space:select{1}
-- xxx: update comes through, returns 0 rows affected
space:update(1, {{'=', 2, 'I am a new tuple'}})
---
+- null
...
-- nothing is selected, since nothing was there
space:select{1}
@@ -208,6 +209,7 @@ space:delete(0)
...
space:delete(4294967295)
---
+- null
...
box.space.test:drop()
---
--
2.15.1 (Apple Git-101)
More information about the Tarantool-patches
mailing list