[Tarantool-patches] [PATCH 18/20] net.box: rewrite iproto handlers in C

Vladimir Davydov vdavydov at tarantool.org
Fri Jul 23 14:07:28 MSK 2021


Those are performance-critical paths. Moving them to C should speed up
overall net.box performance.
---
 src/box/lua/net_box.c   | 373 +++++++++++++++++++++++++++-------------
 src/box/lua/net_box.lua | 141 +++------------
 2 files changed, 277 insertions(+), 237 deletions(-)

diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 85a45c54b979..fde2a8772890 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -41,7 +41,7 @@
 #include "box/tuple.h"
 #include "box/execute.h"
 #include "box/error.h"
-#include "box/mp_error.h"
+#include "box/schema_def.h"
 
 #include "lua/msgpack.h"
 #include <base64.h>
@@ -53,6 +53,7 @@
 #include "lua/fiber.h"
 #include "mpstream/mpstream.h"
 #include "misc.h" /* lbox_check_tuple_format() */
+#include "version.h"
 
 #define cfg luaL_msgpack_default
 
@@ -137,6 +138,13 @@ struct netbox_request {
 static const char netbox_registry_typename[] = "net.box.registry";
 static const char netbox_request_typename[] = "net.box.request";
 
+/* Passed to mpstream_init() to set a boolean flag on error. */
+static void
+mpstream_error_handler(void *error_ctx)
+{
+	*(bool *)error_ctx = true;
+}
+
 static void
 netbox_request_create(struct netbox_request *request)
 {
@@ -391,30 +399,22 @@ netbox_encode_ping(lua_State *L, int idx, struct ibuf *ibuf, uint64_t sync)
 	netbox_end_encode(&stream, svp);
 }
 
+/*
+ * Encodes an authorization request and writes it to the provided buffer.
+ * Returns -1 on memory allocation error.
+ */
 static int
-netbox_encode_auth(lua_State *L)
+netbox_encode_auth(struct ibuf *ibuf, uint64_t sync,
+		   const char *user, size_t user_len,
+		   const char *password, size_t password_len,
+		   const char *salt)
 {
-	if (lua_gettop(L) < 5) {
-		return luaL_error(L, "Usage: netbox.encode_update(ibuf, sync, "
-				     "user, password, greeting)");
-	}
-	struct ibuf *ibuf = (struct ibuf *) lua_topointer(L, 1);
-	uint64_t sync = luaL_touint64(L, 2);
-
+	bool is_error = false;
 	struct mpstream stream;
 	mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
-		      luamp_error, L);
+		      mpstream_error_handler, &is_error);
 	size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH);
 
-	size_t user_len;
-	const char *user = lua_tolstring(L, 3, &user_len);
-	size_t password_len;
-	const char *password = lua_tolstring(L, 4, &password_len);
-	size_t salt_len;
-	const char *salt = lua_tolstring(L, 5, &salt_len);
-	if (salt_len < SCRAMBLE_SIZE)
-		return luaL_error(L, "Invalid salt");
-
 	/* Adapted from xrow_encode_auth() */
 	mpstream_encode_map(&stream, password != NULL ? 2 : 1);
 	mpstream_encode_uint(&stream, IPROTO_USER_NAME);
@@ -429,7 +429,30 @@ netbox_encode_auth(lua_State *L)
 	}
 
 	netbox_end_encode(&stream, svp);
-	return 0;
+	return is_error ? -1 : 0;
+}
+
+/*
+ * Encodes a SELECT(*) request and writes it to the provided buffer.
+ * Returns -1 on memory allocation error.
+ */
+static int
+netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id)
+{
+	bool is_error = false;
+	struct mpstream stream;
+	mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
+		      mpstream_error_handler, &is_error);
+	size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT);
+	mpstream_encode_map(&stream, 3);
+	mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
+	mpstream_encode_uint(&stream, space_id);
+	mpstream_encode_uint(&stream, IPROTO_LIMIT);
+	mpstream_encode_uint(&stream, UINT32_MAX);
+	mpstream_encode_uint(&stream, IPROTO_KEY);
+	mpstream_encode_array(&stream, 0);
+	netbox_end_encode(&stream, svp);
+	return is_error ? -1 : 0;
 }
 
 static void
@@ -798,18 +821,14 @@ handle_error:
 
 /*
  * Sends and receives data over an iproto connection.
- * Takes socket fd, send_buf (ibuf), recv_buf (ibuf), timeout.
- * On success returns header (table), body_rpos (char *), body_end (char *).
- * On error returns nil, error.
+ * Returns 0 and a decoded response header on success.
+ * On error returns -1.
  */
-static int
-netbox_send_and_recv_iproto(lua_State *L)
+int
+netbox_send_and_recv_iproto(int fd, struct ibuf *send_buf,
+			    struct ibuf *recv_buf, double timeout,
+			    struct xrow_header *hdr)
 {
-	int fd = lua_tointeger(L, 1);
-	struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2);
-	struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 3);
-	double timeout = (!lua_isnoneornil(L, 4) ?
-			  lua_tonumber(L, 4) : TIMEOUT_INFINITY);
 	while (true) {
 		size_t required;
 		size_t data_len = ibuf_used(recv_buf);
@@ -817,21 +836,16 @@ netbox_send_and_recv_iproto(lua_State *L)
 		if (data_len < fixheader_size) {
 			required = fixheader_size;
 		} else {
-			/* PWN! insufficient input validation */
 			const char *bufpos = recv_buf->rpos;
 			const char *rpos = bufpos;
 			size_t len = mp_decode_uint(&rpos);
 			required = (rpos - bufpos) + len;
 			if (data_len >= required) {
 				const char *body_end = rpos + len;
-				const char *body_rpos = rpos;
-				luamp_decode(L, cfg, &body_rpos);
-				*(const char **)luaL_pushcdata(
-					L, CTID_CONST_CHAR_PTR) = body_rpos;
-				*(const char **)luaL_pushcdata(
-					L, CTID_CONST_CHAR_PTR) = body_end;
 				recv_buf->rpos = (char *)body_end;
-				return 3;
+				return xrow_header_decode(
+					hdr, &rpos, body_end,
+					/*end_is_exact=*/true);
 			}
 		}
 		size_t unused;
@@ -841,8 +855,7 @@ netbox_send_and_recv_iproto(lua_State *L)
 				       /*boundary=*/NULL,
 				       /*boundary_len=*/0,
 				       timeout, &unused) != 0) {
-			luaL_testcancel(L);
-			return luaT_push_nil_and_error(L);
+			return -1;
 		}
 		timeout = deadline - fiber_clock();
 	}
@@ -1377,57 +1390,6 @@ netbox_decode_method(struct lua_State *L, enum netbox_method method,
 	method_decoder[method](L, data, data_end, format);
 }
 
-/*
- * Decodes an error from raw data. On success returns the decoded error object
- * with ref counter incremented. On failure returns NULL.
- */
-static struct error *
-netbox_decode_error(const char **data, uint32_t errcode)
-{
-	struct error *error = NULL;
-	assert(mp_typeof(**data) == MP_MAP);
-	uint32_t map_size = mp_decode_map(data);
-	for (uint32_t i = 0; i < map_size; ++i) {
-		uint32_t key = mp_decode_uint(data);
-		if (key == IPROTO_ERROR) {
-			if (error != NULL)
-				error_unref(error);
-			error = error_unpack_unsafe(data);
-			if (error == NULL)
-				return NULL;
-			error_ref(error);
-			/*
-			 * IPROTO_ERROR comprises error encoded with
-			 * IPROTO_ERROR_24, so we may ignore content
-			 * of that key.
-			 */
-			break;
-		} else if (key == IPROTO_ERROR_24) {
-			if (error != NULL)
-				error_unref(error);
-			const char *reason = "";
-			uint32_t reason_len = 0;
-			if (mp_typeof(**data) == MP_STR)
-				reason = mp_decode_str(data, &reason_len);
-			box_error_raise(errcode, "%.*s", reason_len, reason);
-			error = box_error_last();
-			error_ref(error);
-			continue;
-		}
-		mp_next(data); /* skip value */
-	}
-	if (error == NULL) {
-		/*
-		 * Error body is missing in the response.
-		 * Set the error code without a 'reason' message
-		 */
-		box_error_raise(errcode, "");
-		error = box_error_last();
-		error_ref(error);
-	}
-	return error;
-}
-
 static inline struct netbox_registry *
 luaT_check_netbox_registry(struct lua_State *L, int idx)
 {
@@ -1704,37 +1666,35 @@ netbox_new_request(struct lua_State *L)
 }
 
 /*
- * Given a request registry, request id (sync), status, and a pointer to a
- * response body, decodes the response and either completes the request or
- * invokes the on-push trigger, depending on the status.
+ * Given a request registry and a response header, decodes the response and
+ * either completes the request or invokes the on-push trigger, depending on
+ * the status.
+ *
+ * Lua stack is used for temporarily storing the response table before getting
+ * a reference to it and executing the on-push trigger.
  */
-static int
-netbox_dispatch_response_iproto(struct lua_State *L)
+static void
+netbox_dispatch_response_iproto(struct lua_State *L,
+				struct netbox_registry *registry,
+				struct xrow_header *hdr)
 {
-	struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
-	uint64_t sync = luaL_touint64(L, 2);
-	enum iproto_type status = lua_tointeger(L, 3);
-	uint32_t ctypeid;
-	const char *data = *(const char **)luaL_checkcdata(L, 4, &ctypeid);
-	assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
-	const char *data_end = *(const char **)luaL_checkcdata(L, 5, &ctypeid);
-	assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
-	struct netbox_request *request = netbox_registry_lookup(registry, sync);
+	struct netbox_request *request = netbox_registry_lookup(registry,
+								hdr->sync);
 	if (request == NULL) {
 		/* Nobody is waiting for the response. */
-		return 0;
+		return;
 	}
+	enum iproto_type status = hdr->type;
 	if (status > IPROTO_CHUNK) {
 		/* Handle errors. */
-		struct error *error = netbox_decode_error(
-			&data, status & (IPROTO_TYPE_ERROR - 1));
-		if (error == NULL)
-			return luaT_error(L);
+		xrow_decode_error(hdr);
+		struct error *error = box_error_last();
 		netbox_request_set_error(request, error);
-		error_unref(error);
 		netbox_request_complete(request);
-		return 0;
+		return;
 	}
+	const char *data = hdr->body[0].iov_base;
+	const char *data_end = data + hdr->body[0].iov_len;
 	if (request->buffer != NULL) {
 		/* Copy xrow.body to user-provided buffer. */
 		if (request->skip_header)
@@ -1774,7 +1734,6 @@ netbox_dispatch_response_iproto(struct lua_State *L)
 		lua_call(L, 2, 0);
 		netbox_request_signal(request);
 	}
-	return 0;
 }
 
 /*
@@ -1800,6 +1759,188 @@ netbox_dispatch_response_console(struct lua_State *L,
 	netbox_request_complete(request);
 }
 
+/*
+ * Performs an authorization request for an iproto connection.
+ * Takes user, password, salt, request registry, socket fd,
+ * send_buf (ibuf), recv_buf (ibuf), timeout.
+ * Returns schema_version on success, nil and error on failure.
+ */
+static int
+netbox_iproto_auth(struct lua_State *L)
+{
+	size_t user_len;
+	const char *user = lua_tolstring(L, 1, &user_len);
+	size_t password_len;
+	const char *password = lua_tolstring(L, 2, &password_len);
+	size_t salt_len;
+	const char *salt = lua_tolstring(L, 3, &salt_len);
+	if (salt_len < SCRAMBLE_SIZE)
+		return luaL_error(L, "Invalid salt");
+	struct netbox_registry *registry = luaT_check_netbox_registry(L, 4);
+	int fd = lua_tointeger(L, 5);
+	struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 6);
+	struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 7);
+	double timeout = (!lua_isnoneornil(L, 8) ?
+			  lua_tonumber(L, 8) : TIMEOUT_INFINITY);
+	if (netbox_encode_auth(send_buf, registry->next_sync++, user, user_len,
+			       password, password_len, salt) != 0) {
+		goto error;
+	}
+	struct xrow_header hdr;
+	if (netbox_send_and_recv_iproto(fd, send_buf, recv_buf, timeout,
+					&hdr) != 0) {
+		goto error;
+	}
+	if (hdr.type != IPROTO_OK) {
+		xrow_decode_error(&hdr);
+		goto error;
+	}
+	lua_pushinteger(L, hdr.schema_version);
+	return 1;
+error:
+	return luaT_push_nil_and_error(L);
+}
+
+/*
+ * Fetches schema over an iproto connection. While waiting for the schema,
+ * processes other requests in a loop, like netbox_iproto_loop().
+ * Takes peer_version_id, request registry, socket fd, send_buf (ibuf),
+ * recv_buf (ibuf), timeout.
+ * Returns schema_version and a table with the following fields:
+ *   [VSPACE_ID] = <spaces>
+ *   [VINDEX_ID] = <indexes>
+ *   [VCOLLATION_ID] = <collations>
+ * On failure returns nil, error.
+ */
+static int
+netbox_iproto_schema(struct lua_State *L)
+{
+	uint32_t peer_version_id = lua_tointeger(L, 1);
+	struct netbox_registry *registry = luaT_check_netbox_registry(L, 2);
+	int fd = lua_tointeger(L, 3);
+	struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 4);
+	struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 5);
+	double timeout = (!lua_isnoneornil(L, 6) ?
+			  lua_tonumber(L, 6) : TIMEOUT_INFINITY);
+	/* _vcollation view was added in 2.2.0-389-g3e3ef182f */
+	bool peer_has_vcollation = peer_version_id >= version_id(2, 2, 1);
+restart:
+	lua_newtable(L);
+	uint64_t vspace_sync = registry->next_sync++;
+	if (netbox_encode_select_all(send_buf, vspace_sync,
+				     BOX_VSPACE_ID) != 0) {
+		return luaT_error(L);
+	}
+	uint64_t vindex_sync = registry->next_sync++;
+	if (netbox_encode_select_all(send_buf, vindex_sync,
+				     BOX_VINDEX_ID) != 0) {
+		return luaT_error(L);
+	}
+	uint64_t vcollation_sync = registry->next_sync++;
+	if (peer_has_vcollation &&
+	    netbox_encode_select_all(send_buf, vcollation_sync,
+				     BOX_VCOLLATION_ID) != 0) {
+		return luaT_error(L);
+	}
+	bool got_vspace = false;
+	bool got_vindex = false;
+	bool got_vcollation = false;
+	uint32_t schema_version = 0;
+	do {
+		struct xrow_header hdr;
+		if (netbox_send_and_recv_iproto(fd, send_buf, recv_buf,
+						timeout, &hdr) != 0) {
+			luaL_testcancel(L);
+			return luaT_push_nil_and_error(L);
+		}
+		netbox_dispatch_response_iproto(L, registry, &hdr);
+		if (hdr.sync != vspace_sync &&
+		    hdr.sync != vindex_sync &&
+		    hdr.sync != vcollation_sync) {
+			continue;
+		}
+		if (iproto_type_is_error(hdr.type)) {
+			uint32_t errcode = hdr.type & (IPROTO_TYPE_ERROR - 1);
+			if (errcode == ER_NO_SUCH_SPACE &&
+			    hdr.sync == vcollation_sync) {
+				/*
+				 * No _vcollation space
+				 * (server has old schema version).
+				 */
+				peer_has_vcollation = false;
+				continue;
+			}
+			xrow_decode_error(&hdr);
+			return luaT_push_nil_and_error(L);
+		}
+		if (schema_version == 0) {
+			schema_version = hdr.schema_version;
+		} else if (schema_version != hdr.schema_version) {
+			/*
+			 * Schema changed while fetching schema.
+			 * Restart loader.
+			 */
+			lua_pop(L, 1);
+			goto restart;
+		}
+		const char *data = hdr.body[0].iov_base;
+		const char *data_end = data + hdr.body[0].iov_len;
+		int key;
+		if (hdr.sync == vspace_sync) {
+			key = BOX_VSPACE_ID;
+			got_vspace = true;
+		} else if (hdr.sync == vindex_sync) {
+			key = BOX_VINDEX_ID;
+			got_vindex = true;
+		} else if (hdr.sync == vcollation_sync) {
+			key = BOX_VCOLLATION_ID;
+			got_vcollation = true;
+		} else {
+			unreachable();
+		}
+		lua_pushinteger(L, key);
+		netbox_decode_table(L, &data, data_end, tuple_format_runtime);
+		lua_settable(L, -3);
+	} while (!(got_vspace && got_vindex &&
+		   (got_vcollation || !peer_has_vcollation)));
+	lua_pushinteger(L, schema_version);
+	lua_insert(L, -2);
+	return 2;
+}
+
+/*
+ * Processes iproto requests in a loop until an error or a schema change.
+ * Takes schema_version, request registry, socket fd, send_buf (ibuf),
+ * recv_buf (ibuf), timeout.
+ * Returns schema_version if the loop was broken because of a schema change.
+ * If the loop was broken by an error, returns nil and the error.
+ */
+static int
+netbox_iproto_loop(struct lua_State *L)
+{
+	uint32_t schema_version = lua_tointeger(L, 1);
+	struct netbox_registry *registry = luaT_check_netbox_registry(L, 2);
+	int fd = lua_tointeger(L, 3);
+	struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 4);
+	struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 5);
+	double timeout = (!lua_isnoneornil(L, 6) ?
+			  lua_tonumber(L, 6) : TIMEOUT_INFINITY);
+	while (true) {
+		struct xrow_header hdr;
+		if (netbox_send_and_recv_iproto(fd, send_buf, recv_buf,
+						timeout, &hdr) != 0) {
+			luaL_testcancel(L);
+			return luaT_push_nil_and_error(L);
+		}
+		netbox_dispatch_response_iproto(L, registry, &hdr);
+		if (hdr.schema_version > 0 &&
+		    hdr.schema_version != schema_version) {
+			lua_pushinteger(L, hdr.schema_version);
+			return 1;
+		}
+	}
+}
+
 /*
  * Sets up console delimiter. Should be called before serving any requests.
  * Takes socket fd, send_buf (ibuf), recv_buf (ibuf), timeout.
@@ -1892,13 +2033,13 @@ luaopen_net_box(struct lua_State *L)
 	luaL_register_type(L, netbox_request_typename, netbox_request_meta);
 
 	static const luaL_Reg net_box_lib[] = {
-		{ "encode_auth",    netbox_encode_auth },
 		{ "encode_method",  netbox_encode_method },
 		{ "decode_greeting",netbox_decode_greeting },
-		{ "send_and_recv_iproto", netbox_send_and_recv_iproto },
 		{ "new_registry",   netbox_new_registry },
 		{ "new_request",    netbox_new_request },
-		{ "dispatch_response_iproto", netbox_dispatch_response_iproto },
+		{ "iproto_auth",    netbox_iproto_auth },
+		{ "iproto_schema",  netbox_iproto_schema },
+		{ "iproto_loop",    netbox_iproto_loop },
 		{ "console_setup",  netbox_console_setup },
 		{ "console_loop",   netbox_console_loop },
 		{ NULL, NULL}
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 0a21c1341117..13def54de2c5 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -9,18 +9,15 @@ local urilib   = require('uri')
 local internal = require('net.box.lib')
 local trigger  = require('internal.trigger')
 
-local band              = bit.band
 local max               = math.max
 local fiber_clock       = fiber.clock
 local fiber_self        = fiber.self
-local decode            = msgpack.decode_unchecked
 
 local check_iterator_type = box.internal.check_iterator_type
 local check_index_arg     = box.internal.check_index_arg
 local check_space_arg     = box.internal.check_space_arg
 local check_primary_index = box.internal.check_primary_index
 
-local encode_auth     = internal.encode_auth
 local encode_method   = internal.encode_method
 local decode_greeting = internal.decode_greeting
 
@@ -29,21 +26,12 @@ local VSPACE_ID        = 281
 local VINDEX_ID        = 289
 local VCOLLATION_ID    = 277
 local DEFAULT_CONNECT_TIMEOUT = 10
-
-local IPROTO_STATUS_KEY    = 0x00
-local IPROTO_ERRNO_MASK    = 0x7FFF
-local IPROTO_SYNC_KEY      = 0x01
-local IPROTO_SCHEMA_VERSION_KEY = 0x05
-local IPROTO_DATA_KEY      = 0x30
-local IPROTO_ERROR_24      = 0x31
-local IPROTO_ERROR         = 0x52
 local IPROTO_GREETING_SIZE = 128
 
 -- select errors from box.error
 local E_UNKNOWN              = box.error.UNKNOWN
 local E_NO_CONNECTION        = box.error.NO_CONNECTION
 local E_PROC_LUA             = box.error.PROC_LUA
-local E_NO_SUCH_SPACE        = box.error.NO_SUCH_SPACE
 
 -- Method types used internally by net.box.
 local M_PING        = 0
@@ -69,19 +57,6 @@ local M_INJECT      = 17
 -- utility tables
 local is_final_state         = {closed = 1, error = 1}
 
-local function decode_push(raw_data)
-    local response, raw_end = decode(raw_data)
-    return response[IPROTO_DATA_KEY][1], raw_end
-end
-
-local function version_id(major, minor, patch)
-    return bit.bor(bit.lshift(major, 16), bit.lshift(minor, 8), patch)
-end
-
-local function version_at_least(peer_version_id, major, minor, patch)
-    return peer_version_id >= version_id(major, minor, patch)
-end
-
 --
 -- Connect to a remote server, do handshake.
 -- @param host Hostname.
@@ -320,24 +295,6 @@ local function create_transport(host, port, user, password, callback,
         return request:wait_result(timeout)
     end
 
-    local function dispatch_response_iproto(hdr, body_rpos, body_end)
-        local id = hdr[IPROTO_SYNC_KEY]
-        local status = hdr[IPROTO_STATUS_KEY]
-        internal.dispatch_response_iproto(requests, id, status,
-                                          body_rpos, body_end)
-    end
-
-    -- IO (WORKER FIBER) --
-    local function send_and_recv_iproto(timeout)
-        local hdr, body_rpos, body_end = internal.send_and_recv_iproto(
-            connection:fd(), send_buf, recv_buf, timeout)
-        if not hdr then
-            local err = body_rpos
-            return err.code, err.message
-        end
-        return nil, hdr, body_rpos, body_end
-    end
-
     -- PROTOCOL STATE MACHINE (WORKER FIBER) --
     --
     -- The sm is implemented as a collection of functions performing
@@ -396,17 +353,13 @@ local function create_transport(host, port, user, password, callback,
             set_state('fetch_schema')
             return iproto_schema_sm()
         end
-        encode_auth(send_buf, requests:new_id(), user, password, salt)
-        local err, hdr, body_rpos = send_and_recv_iproto()
-        if err then
-            return error_sm(err, hdr)
-        end
-        if hdr[IPROTO_STATUS_KEY] ~= 0 then
-            local body = decode(body_rpos)
-            return error_sm(E_NO_CONNECTION, body[IPROTO_ERROR_24])
+        local schema_version, err = internal.iproto_auth(
+            user, password, salt, requests, connection:fd(), send_buf, recv_buf)
+        if not schema_version then
+            return error_sm(err.code, err.message)
         end
         set_state('fetch_schema')
-        return iproto_schema_sm(hdr[IPROTO_SCHEMA_VERSION_KEY])
+        return iproto_schema_sm(schema_version)
     end
 
     iproto_schema_sm = function(schema_version)
@@ -414,82 +367,28 @@ local function create_transport(host, port, user, password, callback,
             set_state('active')
             return iproto_sm(schema_version)
         end
-        -- _vcollation view was added in 2.2.0-389-g3e3ef182f
-        local peer_has_vcollation = version_at_least(greeting.version_id,
-                                                     2, 2, 1)
-        local select1_id = requests:new_id()
-        local select2_id = requests:new_id()
-        local select3_id
-        local response = {}
-        -- fetch everything from space _vspace, 2 = ITER_ALL
-        encode_method(M_SELECT, send_buf, select1_id, VSPACE_ID, 0, 2, 0,
-                      0xFFFFFFFF, nil)
-        -- fetch everything from space _vindex, 2 = ITER_ALL
-        encode_method(M_SELECT, send_buf, select2_id, VINDEX_ID, 0, 2, 0,
-                      0xFFFFFFFF, nil)
-        -- fetch everything from space _vcollation, 2 = ITER_ALL
-        if peer_has_vcollation then
-            select3_id = requests:new_id()
-            encode_method(M_SELECT, send_buf, select3_id, VCOLLATION_ID,
-                          0, 2, 0, 0xFFFFFFFF, nil)
+        local schema_version, schema = internal.iproto_schema(
+            greeting.version_id, requests, connection:fd(), send_buf, recv_buf)
+        if not schema_version then
+            local err = schema
+            return error_sm(err.code, err.message)
         end
-
-        schema_version = nil -- any schema_version will do provided that
-                             -- it is consistent across responses
-        repeat
-            local err, hdr, body_rpos, body_end = send_and_recv_iproto()
-            if err then return error_sm(err, hdr) end
-            dispatch_response_iproto(hdr, body_rpos, body_end)
-            local id = hdr[IPROTO_SYNC_KEY]
-            -- trick: omit check for peer_has_vcollation: id is
-            -- not nil
-            if id == select1_id or id == select2_id or id == select3_id then
-                -- response to a schema query we've submitted
-                local status = hdr[IPROTO_STATUS_KEY]
-                local response_schema_version = hdr[IPROTO_SCHEMA_VERSION_KEY]
-                if status ~= 0 then
-                    -- No _vcollation space (server has an old
-                    -- schema version).
-                    local errno = band(status, IPROTO_ERRNO_MASK)
-                    if id == select3_id and errno == E_NO_SUCH_SPACE then
-                        peer_has_vcollation = false
-                        goto continue
-                    end
-                    local body = decode(body_rpos)
-                    return error_sm(E_NO_CONNECTION, body[IPROTO_ERROR_24])
-                end
-                if schema_version == nil then
-                    schema_version = response_schema_version
-                elseif schema_version ~= response_schema_version then
-                    -- schema changed while fetching schema; restart loader
-                    return iproto_schema_sm()
-                end
-                local body = decode(body_rpos)
-                response[id] = body[IPROTO_DATA_KEY]
-            end
-            ::continue::
-        until response[select1_id] and response[select2_id] and
-              (not peer_has_vcollation or response[select3_id])
-        -- trick: response[select3_id] is nil when the key is nil
-        callback('did_fetch_schema', schema_version, response[select1_id],
-                 response[select2_id], response[select3_id])
+        callback('did_fetch_schema', schema_version, schema[VSPACE_ID],
+                 schema[VINDEX_ID], schema[VCOLLATION_ID])
         set_state('active')
         return iproto_sm(schema_version)
     end
 
     iproto_sm = function(schema_version)
-        local err, hdr, body_rpos, body_end = send_and_recv_iproto()
-        if err then return error_sm(err, hdr) end
-        dispatch_response_iproto(hdr, body_rpos, body_end)
-        local response_schema_version = hdr[IPROTO_SCHEMA_VERSION_KEY]
-        if response_schema_version > 0 and
-           response_schema_version ~= schema_version then
-            -- schema_version has been changed - start to load a new version.
-            -- Sic: self.schema_version will be updated only after reload.
-            set_state('fetch_schema')
-            return iproto_schema_sm(schema_version)
+        local schema_version, err = internal.iproto_loop(
+            schema_version, requests, connection:fd(), send_buf, recv_buf)
+        if not schema_version then
+            return error_sm(err.code, err.message)
         end
-        return iproto_sm(schema_version)
+        -- schema_version has been changed - start to load a new version.
+        -- Sic: self.schema_version will be updated only after reload.
+        set_state('fetch_schema')
+        return iproto_schema_sm(schema_version)
     end
 
     error_sm = function(err, msg)
-- 
2.25.1



More information about the Tarantool-patches mailing list