From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 00E456EC57; Fri, 23 Jul 2021 14:16:33 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 00E456EC57 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1627038994; bh=bcj/YnmCQ+NEpoU1GWYRIhf4zLGdICX9dtSiYqGhVik=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To: From; b=DoYuA63DH7rufFBQiu4VRkNuG34iRbMZ1EFvOc3uNlzHzd2uQSXHcTI6gtICpzJ6p 6NOH/juLTz96cT4VhLQXMDuo1HsIucgypKYZF1nX2yBWD01mjKUcOFB2yhhOkvNOf6 kw6tpm55xIeE+v9O5ocWJ1F9QAUKjZIoevpBUTT8= Received: from smtpng1.i.mail.ru (smtpng1.i.mail.ru [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id C612E6E224 for ; Fri, 23 Jul 2021 14:08:00 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org C612E6E224 Received: by smtpng1.m.smailru.net with esmtpa (envelope-from ) id 1m6t27-0004dl-G7; Fri, 23 Jul 2021 14:08:00 +0300 To: tarantool-patches@dev.tarantool.org Date: Fri, 23 Jul 2021 14:07:28 +0300 Message-Id: <21b94b5bea3b00bde3e866300992fbff78a76b5c.1627024646.git.vdavydov@tarantool.org> X-Mailer: git-send-email 2.25.1 In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-174C08C4: 5188C02AEC42908C481ED7ADC579193296BBA28369E3F2D2713F3D5F7D406D31BCF678C7329BA986 X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD941C43E597735A9C3FDAB68B812060C77E621B90589399EB5182A05F538085040ABB346F28BC02EC54D3DE5A482DF9F6E6D3D46D50546C888F7E4805128DD2ACC X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE70CB15FA6C489297DEA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637835928C62272F24E8638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D8E2AAACB7C54CE3B0BCBBF135195368A9117882F4460429724CE54428C33FAD305F5C1EE8F4F765FCAA867293B0326636D2E47CDBA5A96583BD4B6F7A4D31EC0BC014FD901B82EE079FA2833FD35BB23D27C277FBC8AE2E8BAA867293B0326636D2E47CDBA5A96583BA9C0B312567BB231DD303D21008E29813377AFFFEAFD269A417C69337E82CC2E827F84554CEF50127C277FBC8AE2E8BA83251EDC214901ED5E8D9A59859A8B6045A9A90E9EED90B089D37D7C0E48F6C5571747095F342E88FB05168BE4CE3AF X-C1DE0DAB: 8BD88D57C5CADBC8B2710865C386751094C72BDDC9A8ED5CA3B1A56EE2B804F6B226C914C9968946695E9D90444CEC264DCC8C77FBA9901322D2CEDE4E95CF1BDBE8DEE28BC9005C095FFBCAB1CFE8AABCA57AF85F7723F20D16461F25966F64E400A5B394447C81589120F7DAE46353205367B2BCC23E5B060ACF648AB0CB71BDAD6C7F3747799A X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34AC6E62257D6CD1C97795E12EA9DE68583FD4F5C93EB434ABE079A693973FDA11153697672FB632181D7E09C32AA3244C5410565CB91D02B58DFBB946C8BEA0CAF94338140B71B8EE729B2BEF169E0186 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojbL9S8ysBdXiEX0g4jkpDtcN5a9yHKbmI X-Mailru-Sender: 689FA8AB762F7393C37E3C1AEC41BA5DEFF92E05DA2A92C7CEA858C4DB46D907274CEFED1673C562683ABF942079399BFB559BB5D741EB966A65DFF43FF7BE03240331F90058701C67EA787935ED9F1B X-Mras: Ok Subject: [Tarantool-patches] [PATCH 18/20] net.box: rewrite iproto handlers in C X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladimir Davydov via Tarantool-patches Reply-To: Vladimir Davydov Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Those are performance-critical paths. Moving them to C should speed up overall net.box performance. --- src/box/lua/net_box.c | 373 +++++++++++++++++++++++++++------------- src/box/lua/net_box.lua | 141 +++------------ 2 files changed, 277 insertions(+), 237 deletions(-) diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index 85a45c54b979..fde2a8772890 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -41,7 +41,7 @@ #include "box/tuple.h" #include "box/execute.h" #include "box/error.h" -#include "box/mp_error.h" +#include "box/schema_def.h" #include "lua/msgpack.h" #include @@ -53,6 +53,7 @@ #include "lua/fiber.h" #include "mpstream/mpstream.h" #include "misc.h" /* lbox_check_tuple_format() */ +#include "version.h" #define cfg luaL_msgpack_default @@ -137,6 +138,13 @@ struct netbox_request { static const char netbox_registry_typename[] = "net.box.registry"; static const char netbox_request_typename[] = "net.box.request"; +/* Passed to mpstream_init() to set a boolean flag on error. */ +static void +mpstream_error_handler(void *error_ctx) +{ + *(bool *)error_ctx = true; +} + static void netbox_request_create(struct netbox_request *request) { @@ -391,30 +399,22 @@ netbox_encode_ping(lua_State *L, int idx, struct ibuf *ibuf, uint64_t sync) netbox_end_encode(&stream, svp); } +/* + * Encodes an authorization request and writes it to the provided buffer. + * Returns -1 on memory allocation error. + */ static int -netbox_encode_auth(lua_State *L) +netbox_encode_auth(struct ibuf *ibuf, uint64_t sync, + const char *user, size_t user_len, + const char *password, size_t password_len, + const char *salt) { - if (lua_gettop(L) < 5) { - return luaL_error(L, "Usage: netbox.encode_update(ibuf, sync, " - "user, password, greeting)"); - } - struct ibuf *ibuf = (struct ibuf *) lua_topointer(L, 1); - uint64_t sync = luaL_touint64(L, 2); - + bool is_error = false; struct mpstream stream; mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, - luamp_error, L); + mpstream_error_handler, &is_error); size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH); - size_t user_len; - const char *user = lua_tolstring(L, 3, &user_len); - size_t password_len; - const char *password = lua_tolstring(L, 4, &password_len); - size_t salt_len; - const char *salt = lua_tolstring(L, 5, &salt_len); - if (salt_len < SCRAMBLE_SIZE) - return luaL_error(L, "Invalid salt"); - /* Adapted from xrow_encode_auth() */ mpstream_encode_map(&stream, password != NULL ? 2 : 1); mpstream_encode_uint(&stream, IPROTO_USER_NAME); @@ -429,7 +429,30 @@ netbox_encode_auth(lua_State *L) } netbox_end_encode(&stream, svp); - return 0; + return is_error ? -1 : 0; +} + +/* + * Encodes a SELECT(*) request and writes it to the provided buffer. + * Returns -1 on memory allocation error. + */ +static int +netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id) +{ + bool is_error = false; + struct mpstream stream; + mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb, + mpstream_error_handler, &is_error); + size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT); + mpstream_encode_map(&stream, 3); + mpstream_encode_uint(&stream, IPROTO_SPACE_ID); + mpstream_encode_uint(&stream, space_id); + mpstream_encode_uint(&stream, IPROTO_LIMIT); + mpstream_encode_uint(&stream, UINT32_MAX); + mpstream_encode_uint(&stream, IPROTO_KEY); + mpstream_encode_array(&stream, 0); + netbox_end_encode(&stream, svp); + return is_error ? -1 : 0; } static void @@ -798,18 +821,14 @@ handle_error: /* * Sends and receives data over an iproto connection. - * Takes socket fd, send_buf (ibuf), recv_buf (ibuf), timeout. - * On success returns header (table), body_rpos (char *), body_end (char *). - * On error returns nil, error. + * Returns 0 and a decoded response header on success. + * On error returns -1. */ -static int -netbox_send_and_recv_iproto(lua_State *L) +int +netbox_send_and_recv_iproto(int fd, struct ibuf *send_buf, + struct ibuf *recv_buf, double timeout, + struct xrow_header *hdr) { - int fd = lua_tointeger(L, 1); - struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2); - struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 3); - double timeout = (!lua_isnoneornil(L, 4) ? - lua_tonumber(L, 4) : TIMEOUT_INFINITY); while (true) { size_t required; size_t data_len = ibuf_used(recv_buf); @@ -817,21 +836,16 @@ netbox_send_and_recv_iproto(lua_State *L) if (data_len < fixheader_size) { required = fixheader_size; } else { - /* PWN! insufficient input validation */ const char *bufpos = recv_buf->rpos; const char *rpos = bufpos; size_t len = mp_decode_uint(&rpos); required = (rpos - bufpos) + len; if (data_len >= required) { const char *body_end = rpos + len; - const char *body_rpos = rpos; - luamp_decode(L, cfg, &body_rpos); - *(const char **)luaL_pushcdata( - L, CTID_CONST_CHAR_PTR) = body_rpos; - *(const char **)luaL_pushcdata( - L, CTID_CONST_CHAR_PTR) = body_end; recv_buf->rpos = (char *)body_end; - return 3; + return xrow_header_decode( + hdr, &rpos, body_end, + /*end_is_exact=*/true); } } size_t unused; @@ -841,8 +855,7 @@ netbox_send_and_recv_iproto(lua_State *L) /*boundary=*/NULL, /*boundary_len=*/0, timeout, &unused) != 0) { - luaL_testcancel(L); - return luaT_push_nil_and_error(L); + return -1; } timeout = deadline - fiber_clock(); } @@ -1377,57 +1390,6 @@ netbox_decode_method(struct lua_State *L, enum netbox_method method, method_decoder[method](L, data, data_end, format); } -/* - * Decodes an error from raw data. On success returns the decoded error object - * with ref counter incremented. On failure returns NULL. - */ -static struct error * -netbox_decode_error(const char **data, uint32_t errcode) -{ - struct error *error = NULL; - assert(mp_typeof(**data) == MP_MAP); - uint32_t map_size = mp_decode_map(data); - for (uint32_t i = 0; i < map_size; ++i) { - uint32_t key = mp_decode_uint(data); - if (key == IPROTO_ERROR) { - if (error != NULL) - error_unref(error); - error = error_unpack_unsafe(data); - if (error == NULL) - return NULL; - error_ref(error); - /* - * IPROTO_ERROR comprises error encoded with - * IPROTO_ERROR_24, so we may ignore content - * of that key. - */ - break; - } else if (key == IPROTO_ERROR_24) { - if (error != NULL) - error_unref(error); - const char *reason = ""; - uint32_t reason_len = 0; - if (mp_typeof(**data) == MP_STR) - reason = mp_decode_str(data, &reason_len); - box_error_raise(errcode, "%.*s", reason_len, reason); - error = box_error_last(); - error_ref(error); - continue; - } - mp_next(data); /* skip value */ - } - if (error == NULL) { - /* - * Error body is missing in the response. - * Set the error code without a 'reason' message - */ - box_error_raise(errcode, ""); - error = box_error_last(); - error_ref(error); - } - return error; -} - static inline struct netbox_registry * luaT_check_netbox_registry(struct lua_State *L, int idx) { @@ -1704,37 +1666,35 @@ netbox_new_request(struct lua_State *L) } /* - * Given a request registry, request id (sync), status, and a pointer to a - * response body, decodes the response and either completes the request or - * invokes the on-push trigger, depending on the status. + * Given a request registry and a response header, decodes the response and + * either completes the request or invokes the on-push trigger, depending on + * the status. + * + * Lua stack is used for temporarily storing the response table before getting + * a reference to it and executing the on-push trigger. */ -static int -netbox_dispatch_response_iproto(struct lua_State *L) +static void +netbox_dispatch_response_iproto(struct lua_State *L, + struct netbox_registry *registry, + struct xrow_header *hdr) { - struct netbox_registry *registry = luaT_check_netbox_registry(L, 1); - uint64_t sync = luaL_touint64(L, 2); - enum iproto_type status = lua_tointeger(L, 3); - uint32_t ctypeid; - const char *data = *(const char **)luaL_checkcdata(L, 4, &ctypeid); - assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR); - const char *data_end = *(const char **)luaL_checkcdata(L, 5, &ctypeid); - assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR); - struct netbox_request *request = netbox_registry_lookup(registry, sync); + struct netbox_request *request = netbox_registry_lookup(registry, + hdr->sync); if (request == NULL) { /* Nobody is waiting for the response. */ - return 0; + return; } + enum iproto_type status = hdr->type; if (status > IPROTO_CHUNK) { /* Handle errors. */ - struct error *error = netbox_decode_error( - &data, status & (IPROTO_TYPE_ERROR - 1)); - if (error == NULL) - return luaT_error(L); + xrow_decode_error(hdr); + struct error *error = box_error_last(); netbox_request_set_error(request, error); - error_unref(error); netbox_request_complete(request); - return 0; + return; } + const char *data = hdr->body[0].iov_base; + const char *data_end = data + hdr->body[0].iov_len; if (request->buffer != NULL) { /* Copy xrow.body to user-provided buffer. */ if (request->skip_header) @@ -1774,7 +1734,6 @@ netbox_dispatch_response_iproto(struct lua_State *L) lua_call(L, 2, 0); netbox_request_signal(request); } - return 0; } /* @@ -1800,6 +1759,188 @@ netbox_dispatch_response_console(struct lua_State *L, netbox_request_complete(request); } +/* + * Performs an authorization request for an iproto connection. + * Takes user, password, salt, request registry, socket fd, + * send_buf (ibuf), recv_buf (ibuf), timeout. + * Returns schema_version on success, nil and error on failure. + */ +static int +netbox_iproto_auth(struct lua_State *L) +{ + size_t user_len; + const char *user = lua_tolstring(L, 1, &user_len); + size_t password_len; + const char *password = lua_tolstring(L, 2, &password_len); + size_t salt_len; + const char *salt = lua_tolstring(L, 3, &salt_len); + if (salt_len < SCRAMBLE_SIZE) + return luaL_error(L, "Invalid salt"); + struct netbox_registry *registry = luaT_check_netbox_registry(L, 4); + int fd = lua_tointeger(L, 5); + struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 6); + struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 7); + double timeout = (!lua_isnoneornil(L, 8) ? + lua_tonumber(L, 8) : TIMEOUT_INFINITY); + if (netbox_encode_auth(send_buf, registry->next_sync++, user, user_len, + password, password_len, salt) != 0) { + goto error; + } + struct xrow_header hdr; + if (netbox_send_and_recv_iproto(fd, send_buf, recv_buf, timeout, + &hdr) != 0) { + goto error; + } + if (hdr.type != IPROTO_OK) { + xrow_decode_error(&hdr); + goto error; + } + lua_pushinteger(L, hdr.schema_version); + return 1; +error: + return luaT_push_nil_and_error(L); +} + +/* + * Fetches schema over an iproto connection. While waiting for the schema, + * processes other requests in a loop, like netbox_iproto_loop(). + * Takes peer_version_id, request registry, socket fd, send_buf (ibuf), + * recv_buf (ibuf), timeout. + * Returns schema_version and a table with the following fields: + * [VSPACE_ID] = + * [VINDEX_ID] = + * [VCOLLATION_ID] = + * On failure returns nil, error. + */ +static int +netbox_iproto_schema(struct lua_State *L) +{ + uint32_t peer_version_id = lua_tointeger(L, 1); + struct netbox_registry *registry = luaT_check_netbox_registry(L, 2); + int fd = lua_tointeger(L, 3); + struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 4); + struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 5); + double timeout = (!lua_isnoneornil(L, 6) ? + lua_tonumber(L, 6) : TIMEOUT_INFINITY); + /* _vcollation view was added in 2.2.0-389-g3e3ef182f */ + bool peer_has_vcollation = peer_version_id >= version_id(2, 2, 1); +restart: + lua_newtable(L); + uint64_t vspace_sync = registry->next_sync++; + if (netbox_encode_select_all(send_buf, vspace_sync, + BOX_VSPACE_ID) != 0) { + return luaT_error(L); + } + uint64_t vindex_sync = registry->next_sync++; + if (netbox_encode_select_all(send_buf, vindex_sync, + BOX_VINDEX_ID) != 0) { + return luaT_error(L); + } + uint64_t vcollation_sync = registry->next_sync++; + if (peer_has_vcollation && + netbox_encode_select_all(send_buf, vcollation_sync, + BOX_VCOLLATION_ID) != 0) { + return luaT_error(L); + } + bool got_vspace = false; + bool got_vindex = false; + bool got_vcollation = false; + uint32_t schema_version = 0; + do { + struct xrow_header hdr; + if (netbox_send_and_recv_iproto(fd, send_buf, recv_buf, + timeout, &hdr) != 0) { + luaL_testcancel(L); + return luaT_push_nil_and_error(L); + } + netbox_dispatch_response_iproto(L, registry, &hdr); + if (hdr.sync != vspace_sync && + hdr.sync != vindex_sync && + hdr.sync != vcollation_sync) { + continue; + } + if (iproto_type_is_error(hdr.type)) { + uint32_t errcode = hdr.type & (IPROTO_TYPE_ERROR - 1); + if (errcode == ER_NO_SUCH_SPACE && + hdr.sync == vcollation_sync) { + /* + * No _vcollation space + * (server has old schema version). + */ + peer_has_vcollation = false; + continue; + } + xrow_decode_error(&hdr); + return luaT_push_nil_and_error(L); + } + if (schema_version == 0) { + schema_version = hdr.schema_version; + } else if (schema_version != hdr.schema_version) { + /* + * Schema changed while fetching schema. + * Restart loader. + */ + lua_pop(L, 1); + goto restart; + } + const char *data = hdr.body[0].iov_base; + const char *data_end = data + hdr.body[0].iov_len; + int key; + if (hdr.sync == vspace_sync) { + key = BOX_VSPACE_ID; + got_vspace = true; + } else if (hdr.sync == vindex_sync) { + key = BOX_VINDEX_ID; + got_vindex = true; + } else if (hdr.sync == vcollation_sync) { + key = BOX_VCOLLATION_ID; + got_vcollation = true; + } else { + unreachable(); + } + lua_pushinteger(L, key); + netbox_decode_table(L, &data, data_end, tuple_format_runtime); + lua_settable(L, -3); + } while (!(got_vspace && got_vindex && + (got_vcollation || !peer_has_vcollation))); + lua_pushinteger(L, schema_version); + lua_insert(L, -2); + return 2; +} + +/* + * Processes iproto requests in a loop until an error or a schema change. + * Takes schema_version, request registry, socket fd, send_buf (ibuf), + * recv_buf (ibuf), timeout. + * Returns schema_version if the loop was broken because of a schema change. + * If the loop was broken by an error, returns nil and the error. + */ +static int +netbox_iproto_loop(struct lua_State *L) +{ + uint32_t schema_version = lua_tointeger(L, 1); + struct netbox_registry *registry = luaT_check_netbox_registry(L, 2); + int fd = lua_tointeger(L, 3); + struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 4); + struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 5); + double timeout = (!lua_isnoneornil(L, 6) ? + lua_tonumber(L, 6) : TIMEOUT_INFINITY); + while (true) { + struct xrow_header hdr; + if (netbox_send_and_recv_iproto(fd, send_buf, recv_buf, + timeout, &hdr) != 0) { + luaL_testcancel(L); + return luaT_push_nil_and_error(L); + } + netbox_dispatch_response_iproto(L, registry, &hdr); + if (hdr.schema_version > 0 && + hdr.schema_version != schema_version) { + lua_pushinteger(L, hdr.schema_version); + return 1; + } + } +} + /* * Sets up console delimiter. Should be called before serving any requests. * Takes socket fd, send_buf (ibuf), recv_buf (ibuf), timeout. @@ -1892,13 +2033,13 @@ luaopen_net_box(struct lua_State *L) luaL_register_type(L, netbox_request_typename, netbox_request_meta); static const luaL_Reg net_box_lib[] = { - { "encode_auth", netbox_encode_auth }, { "encode_method", netbox_encode_method }, { "decode_greeting",netbox_decode_greeting }, - { "send_and_recv_iproto", netbox_send_and_recv_iproto }, { "new_registry", netbox_new_registry }, { "new_request", netbox_new_request }, - { "dispatch_response_iproto", netbox_dispatch_response_iproto }, + { "iproto_auth", netbox_iproto_auth }, + { "iproto_schema", netbox_iproto_schema }, + { "iproto_loop", netbox_iproto_loop }, { "console_setup", netbox_console_setup }, { "console_loop", netbox_console_loop }, { NULL, NULL} diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 0a21c1341117..13def54de2c5 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -9,18 +9,15 @@ local urilib = require('uri') local internal = require('net.box.lib') local trigger = require('internal.trigger') -local band = bit.band local max = math.max local fiber_clock = fiber.clock local fiber_self = fiber.self -local decode = msgpack.decode_unchecked local check_iterator_type = box.internal.check_iterator_type local check_index_arg = box.internal.check_index_arg local check_space_arg = box.internal.check_space_arg local check_primary_index = box.internal.check_primary_index -local encode_auth = internal.encode_auth local encode_method = internal.encode_method local decode_greeting = internal.decode_greeting @@ -29,21 +26,12 @@ local VSPACE_ID = 281 local VINDEX_ID = 289 local VCOLLATION_ID = 277 local DEFAULT_CONNECT_TIMEOUT = 10 - -local IPROTO_STATUS_KEY = 0x00 -local IPROTO_ERRNO_MASK = 0x7FFF -local IPROTO_SYNC_KEY = 0x01 -local IPROTO_SCHEMA_VERSION_KEY = 0x05 -local IPROTO_DATA_KEY = 0x30 -local IPROTO_ERROR_24 = 0x31 -local IPROTO_ERROR = 0x52 local IPROTO_GREETING_SIZE = 128 -- select errors from box.error local E_UNKNOWN = box.error.UNKNOWN local E_NO_CONNECTION = box.error.NO_CONNECTION local E_PROC_LUA = box.error.PROC_LUA -local E_NO_SUCH_SPACE = box.error.NO_SUCH_SPACE -- Method types used internally by net.box. local M_PING = 0 @@ -69,19 +57,6 @@ local M_INJECT = 17 -- utility tables local is_final_state = {closed = 1, error = 1} -local function decode_push(raw_data) - local response, raw_end = decode(raw_data) - return response[IPROTO_DATA_KEY][1], raw_end -end - -local function version_id(major, minor, patch) - return bit.bor(bit.lshift(major, 16), bit.lshift(minor, 8), patch) -end - -local function version_at_least(peer_version_id, major, minor, patch) - return peer_version_id >= version_id(major, minor, patch) -end - -- -- Connect to a remote server, do handshake. -- @param host Hostname. @@ -320,24 +295,6 @@ local function create_transport(host, port, user, password, callback, return request:wait_result(timeout) end - local function dispatch_response_iproto(hdr, body_rpos, body_end) - local id = hdr[IPROTO_SYNC_KEY] - local status = hdr[IPROTO_STATUS_KEY] - internal.dispatch_response_iproto(requests, id, status, - body_rpos, body_end) - end - - -- IO (WORKER FIBER) -- - local function send_and_recv_iproto(timeout) - local hdr, body_rpos, body_end = internal.send_and_recv_iproto( - connection:fd(), send_buf, recv_buf, timeout) - if not hdr then - local err = body_rpos - return err.code, err.message - end - return nil, hdr, body_rpos, body_end - end - -- PROTOCOL STATE MACHINE (WORKER FIBER) -- -- -- The sm is implemented as a collection of functions performing @@ -396,17 +353,13 @@ local function create_transport(host, port, user, password, callback, set_state('fetch_schema') return iproto_schema_sm() end - encode_auth(send_buf, requests:new_id(), user, password, salt) - local err, hdr, body_rpos = send_and_recv_iproto() - if err then - return error_sm(err, hdr) - end - if hdr[IPROTO_STATUS_KEY] ~= 0 then - local body = decode(body_rpos) - return error_sm(E_NO_CONNECTION, body[IPROTO_ERROR_24]) + local schema_version, err = internal.iproto_auth( + user, password, salt, requests, connection:fd(), send_buf, recv_buf) + if not schema_version then + return error_sm(err.code, err.message) end set_state('fetch_schema') - return iproto_schema_sm(hdr[IPROTO_SCHEMA_VERSION_KEY]) + return iproto_schema_sm(schema_version) end iproto_schema_sm = function(schema_version) @@ -414,82 +367,28 @@ local function create_transport(host, port, user, password, callback, set_state('active') return iproto_sm(schema_version) end - -- _vcollation view was added in 2.2.0-389-g3e3ef182f - local peer_has_vcollation = version_at_least(greeting.version_id, - 2, 2, 1) - local select1_id = requests:new_id() - local select2_id = requests:new_id() - local select3_id - local response = {} - -- fetch everything from space _vspace, 2 = ITER_ALL - encode_method(M_SELECT, send_buf, select1_id, VSPACE_ID, 0, 2, 0, - 0xFFFFFFFF, nil) - -- fetch everything from space _vindex, 2 = ITER_ALL - encode_method(M_SELECT, send_buf, select2_id, VINDEX_ID, 0, 2, 0, - 0xFFFFFFFF, nil) - -- fetch everything from space _vcollation, 2 = ITER_ALL - if peer_has_vcollation then - select3_id = requests:new_id() - encode_method(M_SELECT, send_buf, select3_id, VCOLLATION_ID, - 0, 2, 0, 0xFFFFFFFF, nil) + local schema_version, schema = internal.iproto_schema( + greeting.version_id, requests, connection:fd(), send_buf, recv_buf) + if not schema_version then + local err = schema + return error_sm(err.code, err.message) end - - schema_version = nil -- any schema_version will do provided that - -- it is consistent across responses - repeat - local err, hdr, body_rpos, body_end = send_and_recv_iproto() - if err then return error_sm(err, hdr) end - dispatch_response_iproto(hdr, body_rpos, body_end) - local id = hdr[IPROTO_SYNC_KEY] - -- trick: omit check for peer_has_vcollation: id is - -- not nil - if id == select1_id or id == select2_id or id == select3_id then - -- response to a schema query we've submitted - local status = hdr[IPROTO_STATUS_KEY] - local response_schema_version = hdr[IPROTO_SCHEMA_VERSION_KEY] - if status ~= 0 then - -- No _vcollation space (server has an old - -- schema version). - local errno = band(status, IPROTO_ERRNO_MASK) - if id == select3_id and errno == E_NO_SUCH_SPACE then - peer_has_vcollation = false - goto continue - end - local body = decode(body_rpos) - return error_sm(E_NO_CONNECTION, body[IPROTO_ERROR_24]) - end - if schema_version == nil then - schema_version = response_schema_version - elseif schema_version ~= response_schema_version then - -- schema changed while fetching schema; restart loader - return iproto_schema_sm() - end - local body = decode(body_rpos) - response[id] = body[IPROTO_DATA_KEY] - end - ::continue:: - until response[select1_id] and response[select2_id] and - (not peer_has_vcollation or response[select3_id]) - -- trick: response[select3_id] is nil when the key is nil - callback('did_fetch_schema', schema_version, response[select1_id], - response[select2_id], response[select3_id]) + callback('did_fetch_schema', schema_version, schema[VSPACE_ID], + schema[VINDEX_ID], schema[VCOLLATION_ID]) set_state('active') return iproto_sm(schema_version) end iproto_sm = function(schema_version) - local err, hdr, body_rpos, body_end = send_and_recv_iproto() - if err then return error_sm(err, hdr) end - dispatch_response_iproto(hdr, body_rpos, body_end) - local response_schema_version = hdr[IPROTO_SCHEMA_VERSION_KEY] - if response_schema_version > 0 and - response_schema_version ~= schema_version then - -- schema_version has been changed - start to load a new version. - -- Sic: self.schema_version will be updated only after reload. - set_state('fetch_schema') - return iproto_schema_sm(schema_version) + local schema_version, err = internal.iproto_loop( + schema_version, requests, connection:fd(), send_buf, recv_buf) + if not schema_version then + return error_sm(err.code, err.message) end - return iproto_sm(schema_version) + -- schema_version has been changed - start to load a new version. + -- Sic: self.schema_version will be updated only after reload. + set_state('fetch_schema') + return iproto_schema_sm(schema_version) end error_sm = function(err, msg) -- 2.25.1