[Tarantool-patches] [PATCH 18/20] net.box: rewrite iproto handlers in C
Vladimir Davydov
vdavydov at tarantool.org
Fri Jul 23 14:07:28 MSK 2021
Those are performance-critical paths. Moving them to C should speed up
overall net.box performance.
---
src/box/lua/net_box.c | 373 +++++++++++++++++++++++++++-------------
src/box/lua/net_box.lua | 141 +++------------
2 files changed, 277 insertions(+), 237 deletions(-)
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 85a45c54b979..fde2a8772890 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -41,7 +41,7 @@
#include "box/tuple.h"
#include "box/execute.h"
#include "box/error.h"
-#include "box/mp_error.h"
+#include "box/schema_def.h"
#include "lua/msgpack.h"
#include <base64.h>
@@ -53,6 +53,7 @@
#include "lua/fiber.h"
#include "mpstream/mpstream.h"
#include "misc.h" /* lbox_check_tuple_format() */
+#include "version.h"
#define cfg luaL_msgpack_default
@@ -137,6 +138,13 @@ struct netbox_request {
static const char netbox_registry_typename[] = "net.box.registry";
static const char netbox_request_typename[] = "net.box.request";
+/* Passed to mpstream_init() to set a boolean flag on error. */
+static void
+mpstream_error_handler(void *error_ctx)
+{
+ *(bool *)error_ctx = true;
+}
+
static void
netbox_request_create(struct netbox_request *request)
{
@@ -391,30 +399,22 @@ netbox_encode_ping(lua_State *L, int idx, struct ibuf *ibuf, uint64_t sync)
netbox_end_encode(&stream, svp);
}
+/*
+ * Encodes an authorization request and writes it to the provided buffer.
+ * Returns -1 on memory allocation error.
+ */
static int
-netbox_encode_auth(lua_State *L)
+netbox_encode_auth(struct ibuf *ibuf, uint64_t sync,
+ const char *user, size_t user_len,
+ const char *password, size_t password_len,
+ const char *salt)
{
- if (lua_gettop(L) < 5) {
- return luaL_error(L, "Usage: netbox.encode_update(ibuf, sync, "
- "user, password, greeting)");
- }
- struct ibuf *ibuf = (struct ibuf *) lua_topointer(L, 1);
- uint64_t sync = luaL_touint64(L, 2);
-
+ bool is_error = false;
struct mpstream stream;
mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
- luamp_error, L);
+ mpstream_error_handler, &is_error);
size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH);
- size_t user_len;
- const char *user = lua_tolstring(L, 3, &user_len);
- size_t password_len;
- const char *password = lua_tolstring(L, 4, &password_len);
- size_t salt_len;
- const char *salt = lua_tolstring(L, 5, &salt_len);
- if (salt_len < SCRAMBLE_SIZE)
- return luaL_error(L, "Invalid salt");
-
/* Adapted from xrow_encode_auth() */
mpstream_encode_map(&stream, password != NULL ? 2 : 1);
mpstream_encode_uint(&stream, IPROTO_USER_NAME);
@@ -429,7 +429,30 @@ netbox_encode_auth(lua_State *L)
}
netbox_end_encode(&stream, svp);
- return 0;
+ return is_error ? -1 : 0;
+}
+
+/*
+ * Encodes a SELECT(*) request and writes it to the provided buffer.
+ * Returns -1 on memory allocation error.
+ */
+static int
+netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id)
+{
+ bool is_error = false;
+ struct mpstream stream;
+ mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
+ mpstream_error_handler, &is_error);
+ size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT);
+ mpstream_encode_map(&stream, 3);
+ mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
+ mpstream_encode_uint(&stream, space_id);
+ mpstream_encode_uint(&stream, IPROTO_LIMIT);
+ mpstream_encode_uint(&stream, UINT32_MAX);
+ mpstream_encode_uint(&stream, IPROTO_KEY);
+ mpstream_encode_array(&stream, 0);
+ netbox_end_encode(&stream, svp);
+ return is_error ? -1 : 0;
}
static void
@@ -798,18 +821,14 @@ handle_error:
/*
* Sends and receives data over an iproto connection.
- * Takes socket fd, send_buf (ibuf), recv_buf (ibuf), timeout.
- * On success returns header (table), body_rpos (char *), body_end (char *).
- * On error returns nil, error.
+ * Returns 0 and a decoded response header on success.
+ * On error returns -1.
*/
-static int
-netbox_send_and_recv_iproto(lua_State *L)
+int
+netbox_send_and_recv_iproto(int fd, struct ibuf *send_buf,
+ struct ibuf *recv_buf, double timeout,
+ struct xrow_header *hdr)
{
- int fd = lua_tointeger(L, 1);
- struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2);
- struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 3);
- double timeout = (!lua_isnoneornil(L, 4) ?
- lua_tonumber(L, 4) : TIMEOUT_INFINITY);
while (true) {
size_t required;
size_t data_len = ibuf_used(recv_buf);
@@ -817,21 +836,16 @@ netbox_send_and_recv_iproto(lua_State *L)
if (data_len < fixheader_size) {
required = fixheader_size;
} else {
- /* PWN! insufficient input validation */
const char *bufpos = recv_buf->rpos;
const char *rpos = bufpos;
size_t len = mp_decode_uint(&rpos);
required = (rpos - bufpos) + len;
if (data_len >= required) {
const char *body_end = rpos + len;
- const char *body_rpos = rpos;
- luamp_decode(L, cfg, &body_rpos);
- *(const char **)luaL_pushcdata(
- L, CTID_CONST_CHAR_PTR) = body_rpos;
- *(const char **)luaL_pushcdata(
- L, CTID_CONST_CHAR_PTR) = body_end;
recv_buf->rpos = (char *)body_end;
- return 3;
+ return xrow_header_decode(
+ hdr, &rpos, body_end,
+ /*end_is_exact=*/true);
}
}
size_t unused;
@@ -841,8 +855,7 @@ netbox_send_and_recv_iproto(lua_State *L)
/*boundary=*/NULL,
/*boundary_len=*/0,
timeout, &unused) != 0) {
- luaL_testcancel(L);
- return luaT_push_nil_and_error(L);
+ return -1;
}
timeout = deadline - fiber_clock();
}
@@ -1377,57 +1390,6 @@ netbox_decode_method(struct lua_State *L, enum netbox_method method,
method_decoder[method](L, data, data_end, format);
}
-/*
- * Decodes an error from raw data. On success returns the decoded error object
- * with ref counter incremented. On failure returns NULL.
- */
-static struct error *
-netbox_decode_error(const char **data, uint32_t errcode)
-{
- struct error *error = NULL;
- assert(mp_typeof(**data) == MP_MAP);
- uint32_t map_size = mp_decode_map(data);
- for (uint32_t i = 0; i < map_size; ++i) {
- uint32_t key = mp_decode_uint(data);
- if (key == IPROTO_ERROR) {
- if (error != NULL)
- error_unref(error);
- error = error_unpack_unsafe(data);
- if (error == NULL)
- return NULL;
- error_ref(error);
- /*
- * IPROTO_ERROR comprises error encoded with
- * IPROTO_ERROR_24, so we may ignore content
- * of that key.
- */
- break;
- } else if (key == IPROTO_ERROR_24) {
- if (error != NULL)
- error_unref(error);
- const char *reason = "";
- uint32_t reason_len = 0;
- if (mp_typeof(**data) == MP_STR)
- reason = mp_decode_str(data, &reason_len);
- box_error_raise(errcode, "%.*s", reason_len, reason);
- error = box_error_last();
- error_ref(error);
- continue;
- }
- mp_next(data); /* skip value */
- }
- if (error == NULL) {
- /*
- * Error body is missing in the response.
- * Set the error code without a 'reason' message
- */
- box_error_raise(errcode, "");
- error = box_error_last();
- error_ref(error);
- }
- return error;
-}
-
static inline struct netbox_registry *
luaT_check_netbox_registry(struct lua_State *L, int idx)
{
@@ -1704,37 +1666,35 @@ netbox_new_request(struct lua_State *L)
}
/*
- * Given a request registry, request id (sync), status, and a pointer to a
- * response body, decodes the response and either completes the request or
- * invokes the on-push trigger, depending on the status.
+ * Given a request registry and a response header, decodes the response and
+ * either completes the request or invokes the on-push trigger, depending on
+ * the status.
+ *
+ * Lua stack is used for temporarily storing the response table before getting
+ * a reference to it and executing the on-push trigger.
*/
-static int
-netbox_dispatch_response_iproto(struct lua_State *L)
+static void
+netbox_dispatch_response_iproto(struct lua_State *L,
+ struct netbox_registry *registry,
+ struct xrow_header *hdr)
{
- struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
- uint64_t sync = luaL_touint64(L, 2);
- enum iproto_type status = lua_tointeger(L, 3);
- uint32_t ctypeid;
- const char *data = *(const char **)luaL_checkcdata(L, 4, &ctypeid);
- assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
- const char *data_end = *(const char **)luaL_checkcdata(L, 5, &ctypeid);
- assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
- struct netbox_request *request = netbox_registry_lookup(registry, sync);
+ struct netbox_request *request = netbox_registry_lookup(registry,
+ hdr->sync);
if (request == NULL) {
/* Nobody is waiting for the response. */
- return 0;
+ return;
}
+ enum iproto_type status = hdr->type;
if (status > IPROTO_CHUNK) {
/* Handle errors. */
- struct error *error = netbox_decode_error(
- &data, status & (IPROTO_TYPE_ERROR - 1));
- if (error == NULL)
- return luaT_error(L);
+ xrow_decode_error(hdr);
+ struct error *error = box_error_last();
netbox_request_set_error(request, error);
- error_unref(error);
netbox_request_complete(request);
- return 0;
+ return;
}
+ const char *data = hdr->body[0].iov_base;
+ const char *data_end = data + hdr->body[0].iov_len;
if (request->buffer != NULL) {
/* Copy xrow.body to user-provided buffer. */
if (request->skip_header)
@@ -1774,7 +1734,6 @@ netbox_dispatch_response_iproto(struct lua_State *L)
lua_call(L, 2, 0);
netbox_request_signal(request);
}
- return 0;
}
/*
@@ -1800,6 +1759,188 @@ netbox_dispatch_response_console(struct lua_State *L,
netbox_request_complete(request);
}
+/*
+ * Performs an authorization request for an iproto connection.
+ * Takes user, password, salt, request registry, socket fd,
+ * send_buf (ibuf), recv_buf (ibuf), timeout.
+ * Returns schema_version on success, nil and error on failure.
+ */
+static int
+netbox_iproto_auth(struct lua_State *L)
+{
+ size_t user_len;
+ const char *user = lua_tolstring(L, 1, &user_len);
+ size_t password_len;
+ const char *password = lua_tolstring(L, 2, &password_len);
+ size_t salt_len;
+ const char *salt = lua_tolstring(L, 3, &salt_len);
+ if (salt_len < SCRAMBLE_SIZE)
+ return luaL_error(L, "Invalid salt");
+ struct netbox_registry *registry = luaT_check_netbox_registry(L, 4);
+ int fd = lua_tointeger(L, 5);
+ struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 6);
+ struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 7);
+ double timeout = (!lua_isnoneornil(L, 8) ?
+ lua_tonumber(L, 8) : TIMEOUT_INFINITY);
+ if (netbox_encode_auth(send_buf, registry->next_sync++, user, user_len,
+ password, password_len, salt) != 0) {
+ goto error;
+ }
+ struct xrow_header hdr;
+ if (netbox_send_and_recv_iproto(fd, send_buf, recv_buf, timeout,
+ &hdr) != 0) {
+ goto error;
+ }
+ if (hdr.type != IPROTO_OK) {
+ xrow_decode_error(&hdr);
+ goto error;
+ }
+ lua_pushinteger(L, hdr.schema_version);
+ return 1;
+error:
+ return luaT_push_nil_and_error(L);
+}
+
+/*
+ * Fetches schema over an iproto connection. While waiting for the schema,
+ * processes other requests in a loop, like netbox_iproto_loop().
+ * Takes peer_version_id, request registry, socket fd, send_buf (ibuf),
+ * recv_buf (ibuf), timeout.
+ * Returns schema_version and a table with the following fields:
+ * [VSPACE_ID] = <spaces>
+ * [VINDEX_ID] = <indexes>
+ * [VCOLLATION_ID] = <collations>
+ * On failure returns nil, error.
+ */
+static int
+netbox_iproto_schema(struct lua_State *L)
+{
+ uint32_t peer_version_id = lua_tointeger(L, 1);
+ struct netbox_registry *registry = luaT_check_netbox_registry(L, 2);
+ int fd = lua_tointeger(L, 3);
+ struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 4);
+ struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 5);
+ double timeout = (!lua_isnoneornil(L, 6) ?
+ lua_tonumber(L, 6) : TIMEOUT_INFINITY);
+ /* _vcollation view was added in 2.2.0-389-g3e3ef182f */
+ bool peer_has_vcollation = peer_version_id >= version_id(2, 2, 1);
+restart:
+ lua_newtable(L);
+ uint64_t vspace_sync = registry->next_sync++;
+ if (netbox_encode_select_all(send_buf, vspace_sync,
+ BOX_VSPACE_ID) != 0) {
+ return luaT_error(L);
+ }
+ uint64_t vindex_sync = registry->next_sync++;
+ if (netbox_encode_select_all(send_buf, vindex_sync,
+ BOX_VINDEX_ID) != 0) {
+ return luaT_error(L);
+ }
+ uint64_t vcollation_sync = registry->next_sync++;
+ if (peer_has_vcollation &&
+ netbox_encode_select_all(send_buf, vcollation_sync,
+ BOX_VCOLLATION_ID) != 0) {
+ return luaT_error(L);
+ }
+ bool got_vspace = false;
+ bool got_vindex = false;
+ bool got_vcollation = false;
+ uint32_t schema_version = 0;
+ do {
+ struct xrow_header hdr;
+ if (netbox_send_and_recv_iproto(fd, send_buf, recv_buf,
+ timeout, &hdr) != 0) {
+ luaL_testcancel(L);
+ return luaT_push_nil_and_error(L);
+ }
+ netbox_dispatch_response_iproto(L, registry, &hdr);
+ if (hdr.sync != vspace_sync &&
+ hdr.sync != vindex_sync &&
+ hdr.sync != vcollation_sync) {
+ continue;
+ }
+ if (iproto_type_is_error(hdr.type)) {
+ uint32_t errcode = hdr.type & (IPROTO_TYPE_ERROR - 1);
+ if (errcode == ER_NO_SUCH_SPACE &&
+ hdr.sync == vcollation_sync) {
+ /*
+ * No _vcollation space
+ * (server has old schema version).
+ */
+ peer_has_vcollation = false;
+ continue;
+ }
+ xrow_decode_error(&hdr);
+ return luaT_push_nil_and_error(L);
+ }
+ if (schema_version == 0) {
+ schema_version = hdr.schema_version;
+ } else if (schema_version != hdr.schema_version) {
+ /*
+ * Schema changed while fetching schema.
+ * Restart loader.
+ */
+ lua_pop(L, 1);
+ goto restart;
+ }
+ const char *data = hdr.body[0].iov_base;
+ const char *data_end = data + hdr.body[0].iov_len;
+ int key;
+ if (hdr.sync == vspace_sync) {
+ key = BOX_VSPACE_ID;
+ got_vspace = true;
+ } else if (hdr.sync == vindex_sync) {
+ key = BOX_VINDEX_ID;
+ got_vindex = true;
+ } else if (hdr.sync == vcollation_sync) {
+ key = BOX_VCOLLATION_ID;
+ got_vcollation = true;
+ } else {
+ unreachable();
+ }
+ lua_pushinteger(L, key);
+ netbox_decode_table(L, &data, data_end, tuple_format_runtime);
+ lua_settable(L, -3);
+ } while (!(got_vspace && got_vindex &&
+ (got_vcollation || !peer_has_vcollation)));
+ lua_pushinteger(L, schema_version);
+ lua_insert(L, -2);
+ return 2;
+}
+
+/*
+ * Processes iproto requests in a loop until an error or a schema change.
+ * Takes schema_version, request registry, socket fd, send_buf (ibuf),
+ * recv_buf (ibuf), timeout.
+ * Returns schema_version if the loop was broken because of a schema change.
+ * If the loop was broken by an error, returns nil and the error.
+ */
+static int
+netbox_iproto_loop(struct lua_State *L)
+{
+ uint32_t schema_version = lua_tointeger(L, 1);
+ struct netbox_registry *registry = luaT_check_netbox_registry(L, 2);
+ int fd = lua_tointeger(L, 3);
+ struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 4);
+ struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 5);
+ double timeout = (!lua_isnoneornil(L, 6) ?
+ lua_tonumber(L, 6) : TIMEOUT_INFINITY);
+ while (true) {
+ struct xrow_header hdr;
+ if (netbox_send_and_recv_iproto(fd, send_buf, recv_buf,
+ timeout, &hdr) != 0) {
+ luaL_testcancel(L);
+ return luaT_push_nil_and_error(L);
+ }
+ netbox_dispatch_response_iproto(L, registry, &hdr);
+ if (hdr.schema_version > 0 &&
+ hdr.schema_version != schema_version) {
+ lua_pushinteger(L, hdr.schema_version);
+ return 1;
+ }
+ }
+}
+
/*
* Sets up console delimiter. Should be called before serving any requests.
* Takes socket fd, send_buf (ibuf), recv_buf (ibuf), timeout.
@@ -1892,13 +2033,13 @@ luaopen_net_box(struct lua_State *L)
luaL_register_type(L, netbox_request_typename, netbox_request_meta);
static const luaL_Reg net_box_lib[] = {
- { "encode_auth", netbox_encode_auth },
{ "encode_method", netbox_encode_method },
{ "decode_greeting",netbox_decode_greeting },
- { "send_and_recv_iproto", netbox_send_and_recv_iproto },
{ "new_registry", netbox_new_registry },
{ "new_request", netbox_new_request },
- { "dispatch_response_iproto", netbox_dispatch_response_iproto },
+ { "iproto_auth", netbox_iproto_auth },
+ { "iproto_schema", netbox_iproto_schema },
+ { "iproto_loop", netbox_iproto_loop },
{ "console_setup", netbox_console_setup },
{ "console_loop", netbox_console_loop },
{ NULL, NULL}
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 0a21c1341117..13def54de2c5 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -9,18 +9,15 @@ local urilib = require('uri')
local internal = require('net.box.lib')
local trigger = require('internal.trigger')
-local band = bit.band
local max = math.max
local fiber_clock = fiber.clock
local fiber_self = fiber.self
-local decode = msgpack.decode_unchecked
local check_iterator_type = box.internal.check_iterator_type
local check_index_arg = box.internal.check_index_arg
local check_space_arg = box.internal.check_space_arg
local check_primary_index = box.internal.check_primary_index
-local encode_auth = internal.encode_auth
local encode_method = internal.encode_method
local decode_greeting = internal.decode_greeting
@@ -29,21 +26,12 @@ local VSPACE_ID = 281
local VINDEX_ID = 289
local VCOLLATION_ID = 277
local DEFAULT_CONNECT_TIMEOUT = 10
-
-local IPROTO_STATUS_KEY = 0x00
-local IPROTO_ERRNO_MASK = 0x7FFF
-local IPROTO_SYNC_KEY = 0x01
-local IPROTO_SCHEMA_VERSION_KEY = 0x05
-local IPROTO_DATA_KEY = 0x30
-local IPROTO_ERROR_24 = 0x31
-local IPROTO_ERROR = 0x52
local IPROTO_GREETING_SIZE = 128
-- select errors from box.error
local E_UNKNOWN = box.error.UNKNOWN
local E_NO_CONNECTION = box.error.NO_CONNECTION
local E_PROC_LUA = box.error.PROC_LUA
-local E_NO_SUCH_SPACE = box.error.NO_SUCH_SPACE
-- Method types used internally by net.box.
local M_PING = 0
@@ -69,19 +57,6 @@ local M_INJECT = 17
-- utility tables
local is_final_state = {closed = 1, error = 1}
-local function decode_push(raw_data)
- local response, raw_end = decode(raw_data)
- return response[IPROTO_DATA_KEY][1], raw_end
-end
-
-local function version_id(major, minor, patch)
- return bit.bor(bit.lshift(major, 16), bit.lshift(minor, 8), patch)
-end
-
-local function version_at_least(peer_version_id, major, minor, patch)
- return peer_version_id >= version_id(major, minor, patch)
-end
-
--
-- Connect to a remote server, do handshake.
-- @param host Hostname.
@@ -320,24 +295,6 @@ local function create_transport(host, port, user, password, callback,
return request:wait_result(timeout)
end
- local function dispatch_response_iproto(hdr, body_rpos, body_end)
- local id = hdr[IPROTO_SYNC_KEY]
- local status = hdr[IPROTO_STATUS_KEY]
- internal.dispatch_response_iproto(requests, id, status,
- body_rpos, body_end)
- end
-
- -- IO (WORKER FIBER) --
- local function send_and_recv_iproto(timeout)
- local hdr, body_rpos, body_end = internal.send_and_recv_iproto(
- connection:fd(), send_buf, recv_buf, timeout)
- if not hdr then
- local err = body_rpos
- return err.code, err.message
- end
- return nil, hdr, body_rpos, body_end
- end
-
-- PROTOCOL STATE MACHINE (WORKER FIBER) --
--
-- The sm is implemented as a collection of functions performing
@@ -396,17 +353,13 @@ local function create_transport(host, port, user, password, callback,
set_state('fetch_schema')
return iproto_schema_sm()
end
- encode_auth(send_buf, requests:new_id(), user, password, salt)
- local err, hdr, body_rpos = send_and_recv_iproto()
- if err then
- return error_sm(err, hdr)
- end
- if hdr[IPROTO_STATUS_KEY] ~= 0 then
- local body = decode(body_rpos)
- return error_sm(E_NO_CONNECTION, body[IPROTO_ERROR_24])
+ local schema_version, err = internal.iproto_auth(
+ user, password, salt, requests, connection:fd(), send_buf, recv_buf)
+ if not schema_version then
+ return error_sm(err.code, err.message)
end
set_state('fetch_schema')
- return iproto_schema_sm(hdr[IPROTO_SCHEMA_VERSION_KEY])
+ return iproto_schema_sm(schema_version)
end
iproto_schema_sm = function(schema_version)
@@ -414,82 +367,28 @@ local function create_transport(host, port, user, password, callback,
set_state('active')
return iproto_sm(schema_version)
end
- -- _vcollation view was added in 2.2.0-389-g3e3ef182f
- local peer_has_vcollation = version_at_least(greeting.version_id,
- 2, 2, 1)
- local select1_id = requests:new_id()
- local select2_id = requests:new_id()
- local select3_id
- local response = {}
- -- fetch everything from space _vspace, 2 = ITER_ALL
- encode_method(M_SELECT, send_buf, select1_id, VSPACE_ID, 0, 2, 0,
- 0xFFFFFFFF, nil)
- -- fetch everything from space _vindex, 2 = ITER_ALL
- encode_method(M_SELECT, send_buf, select2_id, VINDEX_ID, 0, 2, 0,
- 0xFFFFFFFF, nil)
- -- fetch everything from space _vcollation, 2 = ITER_ALL
- if peer_has_vcollation then
- select3_id = requests:new_id()
- encode_method(M_SELECT, send_buf, select3_id, VCOLLATION_ID,
- 0, 2, 0, 0xFFFFFFFF, nil)
+ local schema_version, schema = internal.iproto_schema(
+ greeting.version_id, requests, connection:fd(), send_buf, recv_buf)
+ if not schema_version then
+ local err = schema
+ return error_sm(err.code, err.message)
end
-
- schema_version = nil -- any schema_version will do provided that
- -- it is consistent across responses
- repeat
- local err, hdr, body_rpos, body_end = send_and_recv_iproto()
- if err then return error_sm(err, hdr) end
- dispatch_response_iproto(hdr, body_rpos, body_end)
- local id = hdr[IPROTO_SYNC_KEY]
- -- trick: omit check for peer_has_vcollation: id is
- -- not nil
- if id == select1_id or id == select2_id or id == select3_id then
- -- response to a schema query we've submitted
- local status = hdr[IPROTO_STATUS_KEY]
- local response_schema_version = hdr[IPROTO_SCHEMA_VERSION_KEY]
- if status ~= 0 then
- -- No _vcollation space (server has an old
- -- schema version).
- local errno = band(status, IPROTO_ERRNO_MASK)
- if id == select3_id and errno == E_NO_SUCH_SPACE then
- peer_has_vcollation = false
- goto continue
- end
- local body = decode(body_rpos)
- return error_sm(E_NO_CONNECTION, body[IPROTO_ERROR_24])
- end
- if schema_version == nil then
- schema_version = response_schema_version
- elseif schema_version ~= response_schema_version then
- -- schema changed while fetching schema; restart loader
- return iproto_schema_sm()
- end
- local body = decode(body_rpos)
- response[id] = body[IPROTO_DATA_KEY]
- end
- ::continue::
- until response[select1_id] and response[select2_id] and
- (not peer_has_vcollation or response[select3_id])
- -- trick: response[select3_id] is nil when the key is nil
- callback('did_fetch_schema', schema_version, response[select1_id],
- response[select2_id], response[select3_id])
+ callback('did_fetch_schema', schema_version, schema[VSPACE_ID],
+ schema[VINDEX_ID], schema[VCOLLATION_ID])
set_state('active')
return iproto_sm(schema_version)
end
iproto_sm = function(schema_version)
- local err, hdr, body_rpos, body_end = send_and_recv_iproto()
- if err then return error_sm(err, hdr) end
- dispatch_response_iproto(hdr, body_rpos, body_end)
- local response_schema_version = hdr[IPROTO_SCHEMA_VERSION_KEY]
- if response_schema_version > 0 and
- response_schema_version ~= schema_version then
- -- schema_version has been changed - start to load a new version.
- -- Sic: self.schema_version will be updated only after reload.
- set_state('fetch_schema')
- return iproto_schema_sm(schema_version)
+ local schema_version, err = internal.iproto_loop(
+ schema_version, requests, connection:fd(), send_buf, recv_buf)
+ if not schema_version then
+ return error_sm(err.code, err.message)
end
- return iproto_sm(schema_version)
+ -- schema_version has been changed - start to load a new version.
+ -- Sic: self.schema_version will be updated only after reload.
+ set_state('fetch_schema')
+ return iproto_schema_sm(schema_version)
end
error_sm = function(err, msg)
--
2.25.1
More information about the Tarantool-patches
mailing list