From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH 7/8] netbox: remove schema_version from requests Date: Mon, 16 Apr 2018 21:39:17 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com List-ID: Schema_version was used in netbox to update local box-like schema. The box-like schema makes able to access spaces and indexes via connection object. It was updated each time, when a response from a server is received with a schema version non-equal to the local value. But there was no reason why a schema version is needed in a request. It leads to ER_WRONG_SCHEMA_VERSION error sometimes, but netbox on this error just resends the same request again. The same behaviour can be reached with just no sending any schema version to a server. Remove schema_version from request, and just track schema version changes in responses. Part of #3351 Part of #3333 Follow up #3107 --- src/box/lua/net_box.c | 105 +++++++++++++++++++++++----------------------- src/box/lua/net_box.lua | 90 ++++++++++++++++----------------------- test/box/net.box.result | 6 +-- test/box/net.box.test.lua | 6 +-- 4 files changed, 95 insertions(+), 112 deletions(-) diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index db2d2dbb4..04fe70b03 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -53,7 +53,6 @@ netbox_prepare_request(lua_State *L, struct mpstream *stream, uint32_t r_type) { struct ibuf *ibuf = (struct ibuf *) lua_topointer(L, 1); uint64_t sync = luaL_touint64(L, 2); - uint64_t schema_version = luaL_touint64(L, 3); mpstream_init(stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, luamp_error, L); @@ -67,14 +66,11 @@ netbox_prepare_request(lua_State *L, struct mpstream *stream, uint32_t r_type) mpstream_advance(stream, fixheader_size); /* encode header */ - luamp_encode_map(cfg, stream, 3); + luamp_encode_map(cfg, stream, 2); luamp_encode_uint(cfg, stream, IPROTO_SYNC); luamp_encode_uint(cfg, stream, sync); - luamp_encode_uint(cfg, stream, IPROTO_SCHEMA_VERSION); - luamp_encode_uint(cfg, stream, schema_version); - luamp_encode_uint(cfg, stream, IPROTO_REQUEST_TYPE); luamp_encode_uint(cfg, stream, r_type); @@ -111,9 +107,8 @@ netbox_encode_request(struct mpstream *stream, size_t initial_size) static int netbox_encode_ping(lua_State *L) { - if (lua_gettop(L) < 3) - return luaL_error(L, "Usage: netbox.encode_ping(ibuf, sync, " - "schema_version)"); + if (lua_gettop(L) < 2) + return luaL_error(L, "Usage: netbox.encode_ping(ibuf, sync)"); struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, IPROTO_PING); @@ -124,19 +119,20 @@ netbox_encode_ping(lua_State *L) static int netbox_encode_auth(lua_State *L) { - if (lua_gettop(L) < 6) + if (lua_gettop(L) < 5) { return luaL_error(L, "Usage: netbox.encode_update(ibuf, sync, " - "schema_version, user, password, greeting)"); + "user, password, greeting)"); + } struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, IPROTO_AUTH); size_t user_len; - const char *user = lua_tolstring(L, 4, &user_len); + const char *user = lua_tolstring(L, 3, &user_len); size_t password_len; - const char *password = lua_tolstring(L, 5, &password_len); + const char *password = lua_tolstring(L, 4, &password_len); size_t salt_len; - const char *salt = lua_tolstring(L, 6, &salt_len); + const char *salt = lua_tolstring(L, 5, &salt_len); if (salt_len < SCRAMBLE_SIZE) return luaL_error(L, "Invalid salt"); @@ -160,9 +156,10 @@ netbox_encode_auth(lua_State *L) static int netbox_encode_call_impl(lua_State *L, enum iproto_type type) { - if (lua_gettop(L) < 5) + if (lua_gettop(L) < 4) { return luaL_error(L, "Usage: netbox.encode_call(ibuf, sync, " - "schema_version, function_name, args)"); + "function_name, args)"); + } struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, type); @@ -171,13 +168,13 @@ netbox_encode_call_impl(lua_State *L, enum iproto_type type) /* encode proc name */ size_t name_len; - const char *name = lua_tolstring(L, 4, &name_len); + const char *name = lua_tolstring(L, 3, &name_len); luamp_encode_uint(cfg, &stream, IPROTO_FUNCTION_NAME); luamp_encode_str(cfg, &stream, name, name_len); /* encode args */ luamp_encode_uint(cfg, &stream, IPROTO_TUPLE); - luamp_encode_tuple(L, cfg, &stream, 5); + luamp_encode_tuple(L, cfg, &stream, 4); netbox_encode_request(&stream, svp); return 0; @@ -198,9 +195,10 @@ netbox_encode_call(lua_State *L) static int netbox_encode_eval(lua_State *L) { - if (lua_gettop(L) < 5) + if (lua_gettop(L) < 4) { return luaL_error(L, "Usage: netbox.encode_eval(ibuf, sync, " - "schema_version, expr, args)"); + "expr, args)"); + } struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, IPROTO_EVAL); @@ -209,13 +207,13 @@ netbox_encode_eval(lua_State *L) /* encode expr */ size_t expr_len; - const char *expr = lua_tolstring(L, 4, &expr_len); + const char *expr = lua_tolstring(L, 3, &expr_len); luamp_encode_uint(cfg, &stream, IPROTO_EXPR); luamp_encode_str(cfg, &stream, expr, expr_len); /* encode args */ luamp_encode_uint(cfg, &stream, IPROTO_TUPLE); - luamp_encode_tuple(L, cfg, &stream, 5); + luamp_encode_tuple(L, cfg, &stream, 4); netbox_encode_request(&stream, svp); return 0; @@ -224,21 +222,22 @@ netbox_encode_eval(lua_State *L) static int netbox_encode_select(lua_State *L) { - if (lua_gettop(L) < 9) + if (lua_gettop(L) < 8) { return luaL_error(L, "Usage netbox.encode_select(ibuf, sync, " - "schema_version, space_id, index_id, iterator, " - "offset, limit, key)"); + "space_id, index_id, iterator, offset, " + "limit, key)"); + } struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, IPROTO_SELECT); luamp_encode_map(cfg, &stream, 6); - uint32_t space_id = lua_tonumber(L, 4); - uint32_t index_id = lua_tonumber(L, 5); - int iterator = lua_tointeger(L, 6); - uint32_t offset = lua_tonumber(L, 7); - uint32_t limit = lua_tonumber(L, 8); + uint32_t space_id = lua_tonumber(L, 3); + uint32_t index_id = lua_tonumber(L, 4); + int iterator = lua_tointeger(L, 5); + uint32_t offset = lua_tonumber(L, 6); + uint32_t limit = lua_tonumber(L, 7); /* encode space_id */ luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID); @@ -262,7 +261,7 @@ netbox_encode_select(lua_State *L) /* encode key */ luamp_encode_uint(cfg, &stream, IPROTO_KEY); - luamp_convert_key(L, cfg, &stream, 9); + luamp_convert_key(L, cfg, &stream, 8); netbox_encode_request(&stream, svp); return 0; @@ -271,24 +270,23 @@ netbox_encode_select(lua_State *L) static inline int netbox_encode_insert_or_replace(lua_State *L, uint32_t reqtype) { - if (lua_gettop(L) < 5) + if (lua_gettop(L) < 4) { return luaL_error(L, "Usage: netbox.encode_insert(ibuf, sync, " - "schema_version, space_id, tuple)"); - lua_settop(L, 5); - + "space_id, tuple)"); + } struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, reqtype); luamp_encode_map(cfg, &stream, 2); /* encode space_id */ - uint32_t space_id = lua_tonumber(L, 4); + uint32_t space_id = lua_tonumber(L, 3); luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID); luamp_encode_uint(cfg, &stream, space_id); /* encode args */ luamp_encode_uint(cfg, &stream, IPROTO_TUPLE); - luamp_encode_tuple(L, cfg, &stream, 5); + luamp_encode_tuple(L, cfg, &stream, 4); netbox_encode_request(&stream, svp); return 0; @@ -309,9 +307,10 @@ netbox_encode_replace(lua_State *L) static int netbox_encode_delete(lua_State *L) { - if (lua_gettop(L) < 6) + if (lua_gettop(L) < 5) { return luaL_error(L, "Usage: netbox.encode_delete(ibuf, sync, " - "schema_version, space_id, index_id, key)"); + "space_id, index_id, key)"); + } struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, IPROTO_DELETE); @@ -319,18 +318,18 @@ netbox_encode_delete(lua_State *L) luamp_encode_map(cfg, &stream, 3); /* encode space_id */ - uint32_t space_id = lua_tonumber(L, 4); + uint32_t space_id = lua_tonumber(L, 3); luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID); luamp_encode_uint(cfg, &stream, space_id); /* encode space_id */ - uint32_t index_id = lua_tonumber(L, 5); + uint32_t index_id = lua_tonumber(L, 4); luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID); luamp_encode_uint(cfg, &stream, index_id); /* encode key */ luamp_encode_uint(cfg, &stream, IPROTO_KEY); - luamp_convert_key(L, cfg, &stream, 6); + luamp_convert_key(L, cfg, &stream, 5); netbox_encode_request(&stream, svp); return 0; @@ -339,9 +338,10 @@ netbox_encode_delete(lua_State *L) static int netbox_encode_update(lua_State *L) { - if (lua_gettop(L) < 7) + if (lua_gettop(L) < 6) { return luaL_error(L, "Usage: netbox.encode_update(ibuf, sync, " - "schema_version, space_id, index_id, key, ops)"); + "space_id, index_id, key, ops)"); + } struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, IPROTO_UPDATE); @@ -349,12 +349,12 @@ netbox_encode_update(lua_State *L) luamp_encode_map(cfg, &stream, 5); /* encode space_id */ - uint32_t space_id = lua_tonumber(L, 4); + uint32_t space_id = lua_tonumber(L, 3); luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID); luamp_encode_uint(cfg, &stream, space_id); /* encode index_id */ - uint32_t index_id = lua_tonumber(L, 5); + uint32_t index_id = lua_tonumber(L, 4); luamp_encode_uint(cfg, &stream, IPROTO_INDEX_ID); luamp_encode_uint(cfg, &stream, index_id); @@ -365,12 +365,12 @@ netbox_encode_update(lua_State *L) /* encode in reverse order for speedup - see luamp_encode() code */ /* encode ops */ luamp_encode_uint(cfg, &stream, IPROTO_TUPLE); - luamp_encode_tuple(L, cfg, &stream, 7); + luamp_encode_tuple(L, cfg, &stream, 6); lua_pop(L, 1); /* ops */ /* encode key */ luamp_encode_uint(cfg, &stream, IPROTO_KEY); - luamp_convert_key(L, cfg, &stream, 6); + luamp_convert_key(L, cfg, &stream, 5); netbox_encode_request(&stream, svp); return 0; @@ -379,9 +379,10 @@ netbox_encode_update(lua_State *L) static int netbox_encode_upsert(lua_State *L) { - if (lua_gettop(L) != 6) + if (lua_gettop(L) != 5) { return luaL_error(L, "Usage: netbox.encode_upsert(ibuf, sync, " - "schema_version, space_id, tuple, ops)"); + "space_id, tuple, ops)"); + } struct mpstream stream; size_t svp = netbox_prepare_request(L, &stream, IPROTO_UPSERT); @@ -389,7 +390,7 @@ netbox_encode_upsert(lua_State *L) luamp_encode_map(cfg, &stream, 4); /* encode space_id */ - uint32_t space_id = lua_tonumber(L, 4); + uint32_t space_id = lua_tonumber(L, 3); luamp_encode_uint(cfg, &stream, IPROTO_SPACE_ID); luamp_encode_uint(cfg, &stream, space_id); @@ -400,12 +401,12 @@ netbox_encode_upsert(lua_State *L) /* encode in reverse order for speedup - see luamp_encode() code */ /* encode ops */ luamp_encode_uint(cfg, &stream, IPROTO_OPS); - luamp_encode_tuple(L, cfg, &stream, 6); + luamp_encode_tuple(L, cfg, &stream, 5); lua_pop(L, 1); /* ops */ /* encode tuple */ luamp_encode_uint(cfg, &stream, IPROTO_TUPLE); - luamp_encode_tuple(L, cfg, &stream, 5); + luamp_encode_tuple(L, cfg, &stream, 4); netbox_encode_request(&stream, svp); return 0; diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 96f528963..a2b7b39d2 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -45,7 +45,6 @@ local IPROTO_GREETING_SIZE = 128 local E_UNKNOWN = box.error.UNKNOWN local E_NO_CONNECTION = box.error.NO_CONNECTION local E_TIMEOUT = box.error.TIMEOUT -local E_WRONG_SCHEMA_VERSION = box.error.WRONG_SCHEMA_VERSION local E_PROC_LUA = box.error.PROC_LUA -- utility tables @@ -93,7 +92,7 @@ local method_encoder = { max = internal.encode_select, count = internal.encode_call, -- inject raw data into connection, used by console and tests - inject = function(buf, id, schema_version, bytes) + inject = function(buf, id, bytes) local ptr = buf:reserve(#bytes) ffi.copy(ptr, bytes, #bytes) buf.wpos = ptr + #bytes @@ -158,7 +157,7 @@ end -- * implements protocols; concurrent perform_request()-s benefit from -- multiplexing support in the protocol; -- * schema-aware (optional) - snoops responses and initiates --- schema reload when a request fails due to schema version mismatch; +-- schema reload when a response has a new schema version; -- * delivers transport events via the callback. -- -- Transport state machine: @@ -415,7 +414,7 @@ local function create_transport(host, port, user, password, callback, -- @retval nil, error Error occured. -- @retval not nil Future object. -- - local function perform_async_request(buffer, method, schema_version, ...) + local function perform_async_request(buffer, method, ...) if state ~= 'active' then return nil, box.error.new({code = last_errno or E_NO_CONNECTION, reason = last_error}) @@ -426,11 +425,10 @@ local function create_transport(host, port, user, password, callback, worker_fiber:wakeup() end local id = next_request_id - method_encoder[method](send_buf, id, schema_version, ...) + method_encoder[method](send_buf, id, ...) next_request_id = next_id(id) - local request = setmetatable(table_new(0, 7), request_mt) + local request = setmetatable(table_new(0, 6), request_mt) request.method = method - request.schema_version = schema_version request.buffer = buffer request.id = id requests[id] = request @@ -440,9 +438,9 @@ local function create_transport(host, port, user, password, callback, -- -- Send a request and wait for response. -- - local function perform_request(timeout, buffer, method, schema_version, ...) + local function perform_request(timeout, buffer, method, ...) local request, err = - perform_async_request(buffer, method, schema_version, ...) + perform_async_request(buffer, method, ...) if not request then return last_errno or E_NO_CONNECTION, last_error end @@ -578,7 +576,7 @@ local function create_transport(host, port, user, password, callback, log.warn("Netbox text protocol support is deprecated since 1.10, ".. "please use require('console').connect() instead") local setup_delimiter = 'require("console").delimiter("$EOF$")\n' - method_encoder.inject(send_buf, nil, nil, setup_delimiter) + method_encoder.inject(send_buf, nil, setup_delimiter) local err, response = send_and_recv_console() if err then return error_sm(err, response) @@ -619,7 +617,7 @@ local function create_transport(host, port, user, password, callback, set_state('fetch_schema') return iproto_schema_sm() end - encode_auth(send_buf, new_request_id(), nil, user, password, salt) + encode_auth(send_buf, new_request_id(), user, password, salt) local err, hdr, body_rpos, body_end = send_and_recv_iproto() if err then return error_sm(err, hdr) @@ -642,11 +640,9 @@ local function create_transport(host, port, user, password, callback, local select2_id = new_request_id() local response = {} -- fetch everything from space _vspace, 2 = ITER_ALL - encode_select(send_buf, select1_id, nil, VSPACE_ID, 0, 2, 0, - 0xFFFFFFFF, nil) + encode_select(send_buf, select1_id, VSPACE_ID, 0, 2, 0, 0xFFFFFFFF, nil) -- fetch everything from space _vindex, 2 = ITER_ALL - encode_select(send_buf, select2_id, nil, VINDEX_ID, 0, 2, 0, - 0xFFFFFFFF, nil) + encode_select(send_buf, select2_id, VINDEX_ID, 0, 2, 0, 0xFFFFFFFF, nil) schema_version = nil -- any schema_version will do provided that -- it is consistent across responses repeat @@ -692,8 +688,7 @@ local function create_transport(host, port, user, password, callback, -- Sic: self.schema_version will be updated only after reload. local body body, body_end = decode(body_rpos) - set_state('fetch_schema', - E_WRONG_SCHEMA_VERSION, body[IPROTO_ERROR_KEY]) + set_state('fetch_schema') return iproto_schema_sm(schema_version) end return iproto_sm(schema_version) @@ -977,57 +972,44 @@ function remote_methods:_request(method, opts, ...) local transport = self._transport local buffer = opts and opts.buffer if opts and opts.is_async then - return transport.perform_async_request(buffer, method, 0, ...) + return transport.perform_async_request(buffer, method, ...) end - local this_fiber = fiber_self() - local perform_request = transport.perform_request - local wait_state = transport.wait_state - local deadline = nil + local deadline if opts and opts.timeout then -- conn.space:request(, { timeout = timeout }) deadline = fiber_clock() + opts.timeout else -- conn:timeout(timeout).space:request() -- @deprecated since 1.7.4 - deadline = self._deadlines[this_fiber] + deadline = self._deadlines[fiber_self()] end - local err, res - repeat - local timeout = deadline and max(0, deadline - fiber_clock()) - if self.state ~= 'active' then - wait_state('active', timeout) - timeout = deadline and max(0, deadline - fiber_clock()) - end - err, res = perform_request(timeout, buffer, method, - self.schema_version, ...) - if not err then - return res - elseif err == E_WRONG_SCHEMA_VERSION then - err = nil - end - until err - box.error({code = err, reason = res}) + local timeout = deadline and max(0, deadline - fiber_clock()) + if self.state ~= 'active' then + transport.wait_state('active', timeout) + timeout = deadline and max(0, deadline - fiber_clock()) + end + local err, res = transport.perform_request(timeout, buffer, method, ...) + if err then + box.error({code = err, reason = res}) + end + -- Try to wait until a schema is reloaded if needed. + -- Regardless of reloading result, the main response is + -- returned, since it does not depend on any schema things. + if self.state == 'fetch_schema' then + timeout = deadline and max(0, deadline - fiber_clock()) + transport.wait_state('active', timeout) + end + return res end function remote_methods:ping(opts) check_remote_arg(self, 'ping') - local timeout = opts and opts.timeout - if timeout == nil then - -- conn:timeout(timeout):ping() - -- @deprecated since 1.7.4 - local deadline = self._deadlines[fiber_self()] - timeout = deadline and max(0, deadline - fiber_clock()) - or (opts and opts.timeout) - end - local err = self._transport.perform_request(timeout, nil, 'ping', - self.schema_version) - return not err or err == E_WRONG_SCHEMA_VERSION + return (pcall(self._request, self, 'ping', opts)) end function remote_methods:reload_schema() check_remote_arg(self, 'reload_schema') - self:_request('select', nil, VSPACE_ID, 0, box.index.GE, 0, 0xFFFFFFFF, - nil) + self:ping() end -- @deprecated since 1.7.4 @@ -1200,10 +1182,10 @@ function console_methods:eval(line, timeout) end if self.protocol == 'Binary' then local loader = 'return require("console").eval(...)' - err, res = pr(timeout, nil, 'eval', nil, loader, {line}) + err, res = pr(timeout, nil, 'eval', loader, {line}) else assert(self.protocol == 'Lua console') - err, res = pr(timeout, nil, 'inject', nil, line..'$EOF$\n') + err, res = pr(timeout, nil, 'inject', line..'$EOF$\n') end if err then box.error({code = err, reason = res}) diff --git a/test/box/net.box.result b/test/box/net.box.result index aaa421ec6..3c494696b 100644 --- a/test/box/net.box.result +++ b/test/box/net.box.result @@ -28,7 +28,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) return cn:_request('select', opts, space_id, index_id, iterator, offset, limit, key) end -function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end +function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', '\x80') end test_run:cmd("setopt delimiter ''"); --- ... @@ -2377,7 +2377,7 @@ c.space.test:delete{1} -- -- Break a connection to test reconnect_after. -- -_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') +_ = c._transport.perform_request(nil, nil, 'inject', '\x80') --- ... c.state @@ -2940,7 +2940,7 @@ c = net:connect(box.cfg.listen, {reconnect_after = 0.01}) future = c:call('long_function', {1, 2, 3}, {is_async = true}) --- ... -_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') +_ = c._transport.perform_request(nil, nil, 'inject', '\x80') --- ... while not c:is_connected() do fiber.sleep(0.01) end diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua index 82c538fbe..9a826dc6d 100644 --- a/test/box/net.box.test.lua +++ b/test/box/net.box.test.lua @@ -11,7 +11,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) return cn:_request('select', opts, space_id, index_id, iterator, offset, limit, key) end -function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end +function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', '\x80') end test_run:cmd("setopt delimiter ''"); LISTEN = require('uri').parse(box.cfg.listen) @@ -965,7 +965,7 @@ c.space.test:delete{1} -- -- Break a connection to test reconnect_after. -- -_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') +_ = c._transport.perform_request(nil, nil, 'inject', '\x80') c.state while not c:is_connected() do fiber.sleep(0.01) end c:ping() @@ -1169,7 +1169,7 @@ future:is_ready() -- c = net:connect(box.cfg.listen, {reconnect_after = 0.01}) future = c:call('long_function', {1, 2, 3}, {is_async = true}) -_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') +_ = c._transport.perform_request(nil, nil, 'inject', '\x80') while not c:is_connected() do fiber.sleep(0.01) end future:wait_result(100) future:result() -- 2.15.1 (Apple Git-101)