[Tarantool-patches] [PATCH 15/20] net.box: rewrite request implementation in C

Vladimir Davydov vdavydov at tarantool.org
Fri Jul 23 14:07:25 MSK 2021


This patch turns 'request' object and 'requests' table into userdata and
rewrites all their methods in C. Needed to rewrite performance-critical
parts of net.box in C.
---
 src/box/lua/net_box.c   | 685 +++++++++++++++++++++++++++++++++++++---
 src/box/lua/net_box.lua | 259 +--------------
 2 files changed, 657 insertions(+), 287 deletions(-)

diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 122d69e9219e..044f7d337ca7 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -46,7 +46,9 @@
 #include "lua/msgpack.h"
 #include <base64.h>
 
+#include "assoc.h"
 #include "coio.h"
+#include "fiber_cond.h"
 #include "box/errcode.h"
 #include "lua/fiber.h"
 #include "mpstream/mpstream.h"
@@ -76,6 +78,253 @@ enum netbox_method {
 	netbox_method_MAX
 };
 
+struct netbox_registry {
+	/* sync -> netbox_request */
+	struct mh_i64ptr_t *requests;
+};
+
+struct netbox_request {
+	enum netbox_method method;
+	/*
+	 * Unique identifier needed for matching the request with its response.
+	 * Used as a key in the registry.
+	 */
+	uint64_t sync;
+	/*
+	 * The registry this request belongs to or NULL if the request has been
+	 * completed.
+	 */
+	struct netbox_registry *registry;
+	/* Format used for decoding the response (ref incremented). */
+	struct tuple_format *format;
+	/* Signaled when the response is received. */
+	struct fiber_cond cond;
+	/*
+	 * A user-provided buffer to which the response body should be copied.
+	 * If NULL, the response will be decoded to Lua stack.
+	 */
+	struct ibuf *buffer;
+	/*
+	 * Lua reference to the buffer. Used to prevent garbage collection in
+	 * case the user discards the request.
+	 */
+	int buffer_ref;
+	/*
+	 * Whether to skip MessagePack map header and IPROTO_DATA key when
+	 * copying the response body to a user-provided buffer. Ignored if
+	 * buffer is not set.
+	 */
+	bool skip_header;
+	/* Lua references to on_push trigger and its context. */
+	int on_push_ref;
+	int on_push_ctx_ref;
+	/*
+	 * Reference to the request result or LUA_NOREF if the response hasn't
+	 * been received yet. If the response was decoded to a user-provided
+	 * buffer, the result stores the length of the decoded data.
+	 */
+	int result_ref;
+	/*
+	 * Error if the request failed (ref incremented). NULL on success or if
+	 * the response hasn't been received yet.
+	 */
+	struct error *error;
+
+};
+
+static const char netbox_registry_typename[] = "net.box.registry";
+static const char netbox_request_typename[] = "net.box.request";
+
+static void
+netbox_request_create(struct netbox_request *request)
+{
+	request->method = netbox_method_MAX;
+	request->sync = -1;
+	request->registry = NULL;
+	request->format = NULL;
+	fiber_cond_create(&request->cond);
+	request->buffer = NULL;
+	request->buffer_ref = LUA_REFNIL;
+	request->skip_header = false;
+	request->on_push_ref = LUA_REFNIL;
+	request->on_push_ctx_ref = LUA_REFNIL;
+	request->result_ref = LUA_NOREF;
+	request->error = NULL;
+}
+
+static void
+netbox_request_destroy(struct netbox_request *request)
+{
+	assert(request->registry == NULL);
+	if (request->format != NULL)
+		tuple_format_unref(request->format);
+	fiber_cond_destroy(&request->cond);
+	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->buffer_ref);
+	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->on_push_ref);
+	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->on_push_ctx_ref);
+	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->result_ref);
+	if (request->error != NULL)
+		error_unref(request->error);
+}
+
+/*
+ * Adds a request to a registry. There must not be a request with the same id
+ * (sync) in the registry. Returns -1 if out of memory.
+ */
+static int
+netbox_request_register(struct netbox_request *request,
+			struct netbox_registry *registry)
+{
+	assert(request->registry == NULL);
+	struct mh_i64ptr_t *h = registry->requests;
+	struct mh_i64ptr_node_t node = { request->sync, request };
+	struct mh_i64ptr_node_t *old_node = NULL;
+	if (mh_i64ptr_put(h, &node, &old_node, NULL) == mh_end(h)) {
+		diag_set(OutOfMemory, 0, "mhash", "netbox_registry");
+		return -1;
+	}
+	assert(old_node == NULL);
+	request->registry = registry;
+	return 0;
+}
+
+/*
+ * Unregisters a previously registered request. Does nothing if the request has
+ * already been unregistered or has never been registered.
+ */
+static void
+netbox_request_unregister(struct netbox_request *request)
+{
+	struct netbox_registry *registry = request->registry;
+	if (registry == NULL)
+		return;
+	request->registry = NULL;
+	struct mh_i64ptr_t *h = registry->requests;
+	mh_int_t k = mh_i64ptr_find(h, request->sync, NULL);
+	assert(k != mh_end(h));
+	assert(mh_i64ptr_node(h, k)->val == request);
+	mh_i64ptr_del(h, k, NULL);
+}
+
+static bool
+netbox_request_is_ready(struct netbox_request *request)
+{
+	return request->registry == NULL;
+}
+
+static void
+netbox_request_signal(struct netbox_request *request)
+{
+	fiber_cond_broadcast(&request->cond);
+}
+
+static void
+netbox_request_complete(struct netbox_request *request)
+{
+	netbox_request_unregister(request);
+	netbox_request_signal(request);
+}
+
+/*
+ * Waits on netbox_request::cond. Subtracts the wait time from the timeout.
+ * Returns false on timeout or if the fiber was cancelled.
+ */
+static bool
+netbox_request_wait(struct netbox_request *request, double *timeout)
+{
+	double ts = ev_monotonic_now(loop());
+	int rc = fiber_cond_wait_timeout(&request->cond, *timeout);
+	*timeout -= ev_monotonic_now(loop()) - ts;
+	return rc == 0;
+}
+
+static void
+netbox_request_set_result(struct netbox_request *request, int result_ref)
+{
+	assert(request->result_ref == LUA_NOREF);
+	request->result_ref = result_ref;
+}
+
+static void
+netbox_request_set_error(struct netbox_request *request, struct error *error)
+{
+	assert(request->error == NULL);
+	request->error = error;
+	error_ref(error);
+}
+
+/*
+ * Pushes the result or error to Lua stack. See the comment to request.result()
+ * for more information about the values pushed.
+ */
+static int
+netbox_request_push_result(struct netbox_request *request, struct lua_State *L)
+{
+	if (!netbox_request_is_ready(request)) {
+		diag_set(ClientError, ER_PROC_LUA, "Response is not ready");
+		return luaT_push_nil_and_error(L);
+	}
+	if (request->error != NULL) {
+		assert(request->result_ref == LUA_NOREF);
+		diag_set_error(diag_get(), request->error);
+		return luaT_push_nil_and_error(L);
+	} else {
+		assert(request->result_ref != LUA_NOREF);
+		lua_rawgeti(L, LUA_REGISTRYINDEX, request->result_ref);
+	}
+	return 1;
+}
+
+static int
+netbox_registry_create(struct netbox_registry *registry)
+{
+	registry->requests = mh_i64ptr_new();
+	if (registry->requests == NULL) {
+		diag_set(OutOfMemory, 0, "mhash", "netbox_registry");
+		return -1;
+	}
+	return 0;
+}
+
+static void
+netbox_registry_destroy(struct netbox_registry *registry)
+{
+	struct mh_i64ptr_t *h = registry->requests;
+	assert(mh_size(h) == 0);
+	mh_i64ptr_delete(h);
+}
+
+/*
+ * Looks up a request by id (sync). Returns NULL if not found.
+ */
+static struct netbox_request *
+netbox_registry_lookup(struct netbox_registry *registry, uint64_t sync)
+{
+	struct mh_i64ptr_t *h = registry->requests;
+	mh_int_t k = mh_i64ptr_find(h, sync, NULL);
+	if (k == mh_end(h))
+		return NULL;
+	return mh_i64ptr_node(h, k)->val;
+}
+
+/*
+ * Completes all requests in the registry with the given error and cleans up
+ * the hash. Called when the associated connection is closed.
+ */
+static void
+netbox_registry_reset(struct netbox_registry *registry, struct error *error)
+{
+	struct mh_i64ptr_t *h = registry->requests;
+	mh_int_t k;
+	mh_foreach(h, k) {
+		struct netbox_request *request = mh_i64ptr_node(h, k)->val;
+		request->registry = NULL;
+		netbox_request_set_error(request, error);
+		netbox_request_signal(request);
+	}
+	mh_i64ptr_clear(h);
+}
+
 static inline size_t
 netbox_begin_encode(struct mpstream *stream, uint64_t sync,
 		    enum iproto_type type)
@@ -1096,17 +1345,13 @@ netbox_decode_prepare(struct lua_State *L, const char **data,
 }
 
 /*
- * Decodes a response body for the specified method. Pushes the result and the
- * end of the decoded data to Lua stack.
- *
- * Takes the following arguments:
- *  - method: a value from the netbox_method enumeration
- *  - data: pointer to the data to decode (char ptr)
- *  - data_end: pointer to the end of the data (char ptr)
- *  - format: tuple format to use for decoding the body or nil
+ * Decodes a response body for the specified method and pushes the result to
+ * Lua stack.
  */
-static int
-netbox_decode_method(struct lua_State *L)
+static void
+netbox_decode_method(struct lua_State *L, enum netbox_method method,
+		     const char **data, const char *data_end,
+		     struct tuple_format *format)
 {
 	typedef void (*method_decoder_f)(struct lua_State *L, const char **data,
 					 const char *data_end,
@@ -1131,34 +1376,16 @@ netbox_decode_method(struct lua_State *L)
 		[NETBOX_COUNT]		= netbox_decode_value,
 		[NETBOX_INJECT]		= netbox_decode_table,
 	};
-	enum netbox_method method = lua_tointeger(L, 1);
-	assert(method < netbox_method_MAX);
-	uint32_t ctypeid;
-	const char *data = *(const char **)luaL_checkcdata(L, 2, &ctypeid);
-	assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
-	const char *data_end = *(const char **)luaL_checkcdata(L, 3, &ctypeid);
-	assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
-	struct tuple_format *format;
-	if (!lua_isnil(L, 4))
-		format = lbox_check_tuple_format(L, 4);
-	else
-		format = tuple_format_runtime;
-	method_decoder[method](L, &data, data_end, format);
-	*(const char **)luaL_pushcdata(L, CTID_CONST_CHAR_PTR) = data;
-	return 2;
+	method_decoder[method](L, data, data_end, format);
 }
 
 /*
- * Decodes an error from raw data and pushes it to Lua stack. Takes a pointer
- * to the data (char ptr) and an error code.
+ * Decodes an error from raw data. On success returns the decoded error object
+ * with ref counter incremented. On failure returns NULL.
  */
-static int
-netbox_decode_error(struct lua_State *L)
+static struct error *
+netbox_decode_error(const char **data, uint32_t errcode)
 {
-	uint32_t ctypeid;
-	const char **data = luaL_checkcdata(L, 1, &ctypeid);
-	assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
-	uint32_t errcode = lua_tointeger(L, 2);
 	struct error *error = NULL;
 	assert(mp_typeof(**data) == MP_MAP);
 	uint32_t map_size = mp_decode_map(data);
@@ -1169,7 +1396,7 @@ netbox_decode_error(struct lua_State *L)
 				error_unref(error);
 			error = error_unpack_unsafe(data);
 			if (error == NULL)
-				return luaT_error(L);
+				return NULL;
 			error_ref(error);
 			/*
 			 * IPROTO_ERROR comprises error encoded with
@@ -1200,22 +1427,404 @@ netbox_decode_error(struct lua_State *L)
 		error = box_error_last();
 		error_ref(error);
 	}
-	luaT_pusherror(L, error);
-	error_unref(error);
+	return error;
+}
+
+static inline struct netbox_registry *
+luaT_check_netbox_registry(struct lua_State *L, int idx)
+{
+	return luaL_checkudata(L, idx, netbox_registry_typename);
+}
+
+static int
+luaT_netbox_registry_gc(struct lua_State *L)
+{
+	struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
+	netbox_registry_destroy(registry);
+	return 0;
+}
+
+static int
+luaT_netbox_registry_reset(struct lua_State *L)
+{
+	struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
+	struct error *error = luaL_checkerror(L, 2);
+	netbox_registry_reset(registry, error);
+	return 0;
+}
+
+static inline struct netbox_request *
+luaT_check_netbox_request(struct lua_State *L, int idx)
+{
+	return luaL_checkudata(L, idx, netbox_request_typename);
+}
+
+static int
+luaT_netbox_request_gc(struct lua_State *L)
+{
+	struct netbox_request *request = luaT_check_netbox_request(L, 1);
+	netbox_request_unregister(request);
+	netbox_request_destroy(request);
+	return 0;
+}
+
+/*
+ * Returns true if the response was received for the given request.
+ */
+static int
+luaT_netbox_request_is_ready(struct lua_State *L)
+{
+	struct netbox_request *request = luaT_check_netbox_request(L, 1);
+	lua_pushboolean(L, netbox_request_is_ready(request));
 	return 1;
 }
 
+/*
+ * Obtains the result of the given request.
+ *
+ * Returns:
+ *  - nil, error             if the response failed or not ready
+ *  - response body (table)  if the response is ready and buffer is nil
+ *  - body length in bytes   if the response was written to the buffer
+ */
+static int
+luaT_netbox_request_result(struct lua_State *L)
+{
+	struct netbox_request *request = luaT_check_netbox_request(L, 1);
+	return netbox_request_push_result(request, L);
+}
+
+/*
+ * Waits until the response is received for the given request and obtains the
+ * result. Takes an optional timeout argument.
+ *
+ * See the comment to request.result() for the return value format.
+ */
+static int
+luaT_netbox_request_wait_result(struct lua_State *L)
+{
+	struct netbox_request *request = luaT_check_netbox_request(L, 1);
+	double timeout = TIMEOUT_INFINITY;
+	if (!lua_isnoneornil(L, 2)) {
+		if (lua_type(L, 2) != LUA_TNUMBER ||
+		    (timeout = lua_tonumber(L, 2)) < 0)
+			luaL_error(L, "Usage: future:wait_result(timeout)");
+	}
+	while (!netbox_request_is_ready(request)) {
+		if (!netbox_request_wait(request, &timeout)) {
+			luaL_testcancel(L);
+			diag_set(ClientError, ER_TIMEOUT);
+			return luaT_push_nil_and_error(L);
+		}
+	}
+	return netbox_request_push_result(request, L);
+}
+
+/*
+ * Makes the connection forget about the given request. When the response is
+ * received, it will be ignored. It reduces the size of the request registry
+ * speeding up other requests.
+ */
+static int
+luaT_netbox_request_discard(struct lua_State *L)
+{
+	struct netbox_request *request = luaT_check_netbox_request(L, 1);
+	if (!netbox_request_is_ready(request)) {
+		diag_set(ClientError, ER_PROC_LUA, "Response is discarded");
+		netbox_request_set_error(request, diag_last_error(diag_get()));
+		netbox_request_complete(request);
+	}
+	return 0;
+}
+
+/*
+ * Gets the next message or the final result. Takes the index of the last
+ * returned message as a second argument (the first argument is ignored).
+ * The request and timeout are passed as up-values (see request.pairs()).
+ *
+ * On success returns the index of the current message (used by the iterator
+ * implementation to continue iteration) and an object, which is either the
+ * message pushed with box.session.push() or the final response. If there's no
+ * more messages left for the request, returns nil, nil.
+ *
+ * On error returns box.NULL, error. We return box.NULL instead of nil to
+ * distinguish end of iteration from error when this function is called in
+ * `for k, v in future:pairs()`, because nil is treated by Lua as end of
+ * iteration marker.
+ */
+static int
+luaT_netbox_request_iterator_next(struct lua_State *L)
+{
+	struct netbox_request *request = luaT_check_netbox_request(
+		L, lua_upvalueindex(1));
+	double timeout = lua_tonumber(L, lua_upvalueindex(2));
+	if (luaL_isnull(L, 2)) {
+		/* The previous call returned an error. */
+		goto stop;
+	}
+	int i = lua_tointeger(L, 2) + 1;
+	/*
+	 * In the async mode (and this is the async mode, because 'future'
+	 * objects are not available to the user in the sync mode), on_push_ctx
+	 * refers to a table that contains received messages. We iterate over
+	 * the content of the table.
+	 */
+	lua_rawgeti(L, LUA_REGISTRYINDEX, request->on_push_ctx_ref);
+	int messages_idx = lua_gettop(L);
+	assert(lua_istable(L, messages_idx));
+	int message_count = lua_objlen(L, messages_idx);
+retry:
+	if (i <= message_count) {
+		lua_pushinteger(L, i);
+		lua_rawgeti(L, messages_idx, i);
+		return 2;
+	}
+	if (netbox_request_is_ready(request)) {
+		/*
+		 * After all the messages are iterated, `i` is equal to
+		 * #messages + 1. After we return the response, `i` becomes
+		 * #messages + 2. It is the trigger to finish the iteration.
+		 */
+		if (i > message_count + 1)
+			goto stop;
+		int n = netbox_request_push_result(request, L);
+		if (n == 2)
+			goto error;
+		/* Success. Return i, response. */
+		assert(n == 1);
+		lua_pushinteger(L, i);
+		lua_insert(L, -2);
+		return 2;
+	}
+	int old_message_count = message_count;
+	do {
+		if (netbox_request_wait(request, &timeout) != 0) {
+			luaL_testcancel(L);
+			diag_set(ClientError, ER_TIMEOUT);
+			luaT_push_nil_and_error(L);
+			goto error;
+		}
+		message_count = lua_objlen(L, messages_idx);
+	} while (!netbox_request_is_ready(request) &&
+		 message_count == old_message_count);
+	goto retry;
+stop:
+	lua_pushnil(L);
+	lua_pushnil(L);
+	return 2;
+error:
+	/*
+	 * When we get here, the top two elements on the stack are nil, error.
+	 * We need to replace nil with box.NULL.
+	 */
+	luaL_pushnull(L);
+	lua_replace(L, -3);
+	return 2;
+}
+
+static int
+luaT_netbox_request_pairs(struct lua_State *L)
+{
+	if (!lua_isnoneornil(L, 2)) {
+		if (lua_type(L, 2) != LUA_TNUMBER || lua_tonumber(L, 2) < 0)
+			luaL_error(L, "Usage: future:pairs(timeout)");
+	} else {
+		if (lua_isnil(L, 2))
+			lua_pop(L, 1);
+		lua_pushnumber(L, TIMEOUT_INFINITY);
+	}
+	lua_pushcclosure(L, luaT_netbox_request_iterator_next, 2);
+	lua_pushnil(L);
+	lua_pushinteger(L, 0);
+	return 3;
+}
+
+/*
+ * Creates a request registry object (userdata) and pushes it to Lua stack.
+ */
+static int
+netbox_new_registry(struct lua_State *L)
+{
+	struct netbox_registry *registry = lua_newuserdata(
+		L, sizeof(*registry));
+	if (netbox_registry_create(registry) != 0)
+		luaT_error(L);
+	luaL_getmetatable(L, netbox_registry_typename);
+	lua_setmetatable(L, -2);
+	return 1;
+}
+
+/*
+ * Creates a request object (userdata) and pushes it to Lua stack.
+ *
+ * Takes the following arguments:
+ *  - requests: registry to register the new request with
+ *  - id: id (sync) to assign to the new request
+ *  - buffer: buffer (ibuf) to write the result to or nil
+ *  - skip_header: whether to skip header when writing the result to the buffer
+ *  - method: a value from the netbox_method enumeration
+ *  - on_push: on_push trigger function
+ *  - on_push_ctx: on_push trigger function argument
+ *  - format: tuple format to use for decoding the body or nil
+ */
+static int
+netbox_new_request(struct lua_State *L)
+{
+	struct netbox_request *request = lua_newuserdata(L, sizeof(*request));
+	netbox_request_create(request);
+	luaL_getmetatable(L, netbox_request_typename);
+	lua_setmetatable(L, -2);
+	struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
+	request->sync = luaL_touint64(L, 2);
+	request->buffer = (struct ibuf *) lua_topointer(L, 3);
+	lua_pushvalue(L, 3);
+	request->buffer_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+	request->skip_header = lua_toboolean(L, 4);
+	request->method = lua_tointeger(L, 5);
+	assert(request->method < netbox_method_MAX);
+	lua_pushvalue(L, 6);
+	request->on_push_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+	lua_pushvalue(L, 7);
+	request->on_push_ctx_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+	if (!lua_isnil(L, 8))
+		request->format = lbox_check_tuple_format(L, 8);
+	else
+		request->format = tuple_format_runtime;
+	tuple_format_ref(request->format);
+	if (netbox_request_register(request, registry) != 0)
+		luaT_error(L);
+	return 1;
+}
+
+/*
+ * Given a request registry, request id (sync), status, and a pointer to a
+ * response body, decodes the response and either completes the request or
+ * invokes the on-push trigger, depending on the status.
+ */
+static int
+netbox_dispatch_response_iproto(struct lua_State *L)
+{
+	struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
+	uint64_t sync = luaL_touint64(L, 2);
+	enum iproto_type status = lua_tointeger(L, 3);
+	uint32_t ctypeid;
+	const char *data = *(const char **)luaL_checkcdata(L, 4, &ctypeid);
+	assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
+	const char *data_end = *(const char **)luaL_checkcdata(L, 5, &ctypeid);
+	assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
+	struct netbox_request *request = netbox_registry_lookup(registry, sync);
+	if (request == NULL) {
+		/* Nobody is waiting for the response. */
+		return 0;
+	}
+	if (status > IPROTO_CHUNK) {
+		/* Handle errors. */
+		struct error *error = netbox_decode_error(
+			&data, status & (IPROTO_TYPE_ERROR - 1));
+		if (error == NULL)
+			return luaT_error(L);
+		netbox_request_set_error(request, error);
+		error_unref(error);
+		netbox_request_complete(request);
+		return 0;
+	}
+	if (request->buffer != NULL) {
+		/* Copy xrow.body to user-provided buffer. */
+		if (request->skip_header)
+			netbox_skip_to_data(&data);
+		size_t data_len = data_end - data;
+		void *wpos = ibuf_alloc(request->buffer, data_len);
+		if (wpos == NULL)
+			luaL_error(L, "out of memory");
+		memcpy(wpos, data, data_len);
+		lua_pushinteger(L, data_len);
+	} else {
+		/* Decode xrow.body[DATA] to Lua objects. */
+		if (status == IPROTO_OK) {
+			netbox_decode_method(L, request->method, &data,
+					     data_end, request->format);
+		} else {
+			netbox_decode_value(L, &data, data_end,
+					    request->format);
+		}
+		assert(data == data_end);
+	}
+	if (status == IPROTO_OK) {
+		/*
+		 * We received the final response and pushed it to Lua stack.
+		 * Store a reference to it in the request, remove the request
+		 * from the registry, and wake up waiters.
+		 */
+		netbox_request_set_result(request,
+					  luaL_ref(L, LUA_REGISTRYINDEX));
+		netbox_request_complete(request);
+	} else {
+		/* We received a push. Invoke on_push trigger. */
+		lua_rawgeti(L, LUA_REGISTRYINDEX, request->on_push_ref);
+		lua_rawgeti(L, LUA_REGISTRYINDEX, request->on_push_ctx_ref);
+		/* Push the received message as the second argument. */
+		lua_pushvalue(L, -3);
+		lua_call(L, 2, 0);
+		netbox_request_signal(request);
+	}
+	return 0;
+}
+
+/*
+ * Given a request registry, request id (sync), and a response string, assigns
+ * the response to the request and completes it.
+ */
+static int
+netbox_dispatch_response_console(struct lua_State *L)
+{
+	struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
+	uint64_t sync = luaL_touint64(L, 2);
+	struct netbox_request *request = netbox_registry_lookup(registry, sync);
+	if (request == NULL) {
+		/* Nobody is waiting for the response. */
+		return 0;
+	}
+	/*
+	 * The response is the last argument of this function so it's already
+	 * on the top of the Lua stack.
+	 */
+	netbox_request_set_result(request, luaL_ref(L, LUA_REGISTRYINDEX));
+	netbox_request_complete(request);
+	return 0;
+}
+
 int
 luaopen_net_box(struct lua_State *L)
 {
+	static const struct luaL_Reg netbox_registry_meta[] = {
+		{ "__gc",           luaT_netbox_registry_gc },
+		{ "reset",          luaT_netbox_registry_reset },
+		{ NULL, NULL }
+	};
+	luaL_register_type(L, netbox_registry_typename, netbox_registry_meta);
+
+	static const struct luaL_Reg netbox_request_meta[] = {
+		{ "__gc",           luaT_netbox_request_gc },
+		{ "is_ready",       luaT_netbox_request_is_ready },
+		{ "result",         luaT_netbox_request_result },
+		{ "wait_result",    luaT_netbox_request_wait_result },
+		{ "discard",        luaT_netbox_request_discard },
+		{ "pairs",          luaT_netbox_request_pairs },
+		{ NULL, NULL }
+	};
+	luaL_register_type(L, netbox_request_typename, netbox_request_meta);
+
 	static const luaL_Reg net_box_lib[] = {
 		{ "encode_auth",    netbox_encode_auth },
 		{ "encode_method",  netbox_encode_method },
 		{ "decode_greeting",netbox_decode_greeting },
-		{ "decode_method",  netbox_decode_method },
-		{ "decode_error",   netbox_decode_error },
 		{ "send_and_recv_iproto", netbox_send_and_recv_iproto },
 		{ "send_and_recv_console", netbox_send_and_recv_console },
+		{ "new_registry",   netbox_new_registry },
+		{ "new_request",    netbox_new_request },
+		{ "dispatch_response_iproto", netbox_dispatch_response_iproto },
+		{ "dispatch_response_console",
+			netbox_dispatch_response_console },
 		{ NULL, NULL}
 	};
 	/* luaL_register_module polutes _G */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 0ad6cac022f2..4bc66940ea2a 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -14,9 +14,7 @@ local max               = math.max
 local fiber_clock       = fiber.clock
 local fiber_self        = fiber.self
 local decode            = msgpack.decode_unchecked
-local decode_map_header = msgpack.decode_map_header
 
-local table_new           = require('table.new')
 local check_iterator_type = box.internal.check_iterator_type
 local check_index_arg     = box.internal.check_index_arg
 local check_space_arg     = box.internal.check_space_arg
@@ -25,8 +23,6 @@ local check_primary_index = box.internal.check_primary_index
 local encode_auth     = internal.encode_auth
 local encode_method   = internal.encode_method
 local decode_greeting = internal.decode_greeting
-local decode_method   = internal.decode_method
-local decode_error    = internal.decode_error
 
 local TIMEOUT_INFINITY = 500 * 365 * 86400
 local VSPACE_ID        = 281
@@ -42,13 +38,10 @@ local IPROTO_DATA_KEY      = 0x30
 local IPROTO_ERROR_24      = 0x31
 local IPROTO_ERROR         = 0x52
 local IPROTO_GREETING_SIZE = 128
-local IPROTO_CHUNK_KEY     = 128
-local IPROTO_OK_KEY        = 0
 
 -- select errors from box.error
 local E_UNKNOWN              = box.error.UNKNOWN
 local E_NO_CONNECTION        = box.error.NO_CONNECTION
-local E_TIMEOUT              = box.error.TIMEOUT
 local E_PROC_LUA             = box.error.PROC_LUA
 local E_NO_SUCH_SPACE        = box.error.NO_SUCH_SPACE
 
@@ -191,172 +184,20 @@ local function create_transport(host, port, user, password, callback,
     local last_error
     local state_cond       = fiber.cond() -- signaled when the state changes
 
-    -- Async requests currently 'in flight', keyed by a request
-    -- id. Value refs are weak hence if a client dies
-    -- unexpectedly, GC cleans the mess.
+    -- The registry stores requests that are currently 'in flight'
+    -- for this connection.
     -- Async request can not be timed out completely. Instead a
     -- user must decide when he does not want to wait for
     -- response anymore.
     -- Sync requests are implemented as async call + immediate
     -- wait for a result.
-    local requests         = setmetatable({}, { __mode = 'v' })
+    local requests         = internal.new_registry()
     local next_request_id  = 1
 
     local worker_fiber
     local send_buf         = buffer.ibuf(buffer.READAHEAD)
     local recv_buf         = buffer.ibuf(buffer.READAHEAD)
 
-    --
-    -- Async request metamethods.
-    --
-    local request_index = {}
-    --
-    -- When an async request is finalized (with ok or error - no
-    -- matter), its 'id' field is nullified by a response
-    -- dispatcher.
-    --
-    function request_index:is_ready()
-        return self.id == nil
-    end
-    --
-    -- When a request is finished, a result can be got from a
-    -- future object anytime.
-    -- @retval result, nil Success, the response is returned.
-    -- @retval nil, error Error occured.
-    --
-    function request_index:result()
-        if self.errno then
-            if type(self.response) ~= 'cdata' then
-                -- Error could be set by the connection state
-                -- machine, and it is usually a string explaining
-                -- a reason.
-                self.response = box.error.new({code = self.errno,
-                                               reason = self.response})
-            end
-            return nil, self.response
-        elseif not self.id then
-            return self.response
-        else
-            return nil, box.error.new(box.error.PROC_LUA,
-                                      'Response is not ready')
-        end
-    end
-    --
-    -- Get the next message or the final result.
-    -- @param iterator Iterator object.
-    -- @param i Index to get a next message from.
-    --
-    -- @retval nil, nil The request is finished.
-    -- @retval i + 1, object A message/response and its index.
-    -- @retval box.NULL, error An error occured. When this
-    --         function is called in 'for k, v in future:pairs()',
-    --         `k` becomes box.NULL, and `v` becomes error object.
-    --         On error the key becomes exactly box.NULL instead
-    --         of nil, because nil is treated by Lua as iteration
-    --         end marker. Nil does not participate in iteration,
-    --         and does not allow to continue it.
-    --
-    local function request_iterator_next(iterator, i)
-        if i == box.NULL then
-            return nil, nil
-        else
-            i = i + 1
-        end
-        local request = iterator.request
-        local messages = request.on_push_ctx
-    ::retry::
-        if i <= #messages then
-            return i, messages[i]
-        end
-        if request:is_ready() then
-            -- After all the messages are iterated, `i` is equal
-            -- to #messages + 1. After response reading `i`
-            -- becomes #messages + 2. It is the trigger to finish
-            -- the iteration.
-            if i > #messages + 1 then
-                return nil, nil
-            end
-            local response, err = request:result()
-            if err then
-                return box.NULL, err
-            end
-            return i, response
-        end
-        local old_message_count = #messages
-        local timeout = iterator.timeout
-        repeat
-            local ts = fiber_clock()
-            request.cond:wait(timeout)
-            timeout = timeout - (fiber_clock() - ts)
-            if request:is_ready() or old_message_count ~= #messages then
-                goto retry
-            end
-        until timeout <= 0
-        return box.NULL, box.error.new(E_TIMEOUT)
-    end
-    --
-    -- Iterate over all messages, received by a request. @Sa
-    -- request_iterator_next for details what to expect in `for`
-    -- key/value pairs.
-    -- @param timeout One iteration timeout.
-    -- @retval next() callback, iterator, zero key.
-    --
-    function request_index:pairs(timeout)
-        if timeout then
-            if type(timeout) ~= 'number' or timeout < 0 then
-                error('Usage: future:pairs(timeout)')
-            end
-        else
-            timeout = TIMEOUT_INFINITY
-        end
-        local iterator = {request = self, timeout = timeout}
-        return request_iterator_next, iterator, 0
-    end
-    --
-    -- Wait for a response or error max timeout seconds.
-    -- @param timeout Max seconds to wait.
-    -- @retval result, nil Success, the response is returned.
-    -- @retval nil, error Error occured.
-    --
-    function request_index:wait_result(timeout)
-        if timeout then
-            if type(timeout) ~= 'number' or timeout < 0 then
-                error('Usage: future:wait_result(timeout)')
-            end
-        else
-            timeout = TIMEOUT_INFINITY
-        end
-        if not self:is_ready() then
-            -- When a response is ready before timeout, the
-            -- waiting client is waked up prematurely.
-            while timeout > 0 and not self:is_ready() do
-                local ts = fiber.clock()
-                self.cond:wait(timeout)
-                timeout = timeout - (fiber.clock() - ts)
-            end
-            if not self:is_ready() then
-                return nil, box.error.new(E_TIMEOUT)
-            end
-        end
-        return self:result()
-    end
-    --
-    -- Make a connection forget about the response. When it will
-    -- be received, it will be ignored. It reduces size of
-    -- requests table speeding up other requests.
-    --
-    function request_index:discard()
-        if self.id then
-            requests[self.id] = nil
-            self.id = nil
-            self.errno = box.error.PROC_LUA
-            self.response = 'Response is discarded'
-            self.cond:broadcast()
-        end
-    end
-
-    local request_mt = { __index = request_index }
-
     -- STATE SWITCHING --
     local function set_state(new_state, new_errno, new_error)
         state = new_state
@@ -366,13 +207,8 @@ local function create_transport(host, port, user, password, callback,
         state_cond:broadcast()
         if state == 'error' or state == 'error_reconnect' or
            state == 'closed' then
-            for _, request in pairs(requests) do
-                request.id = nil
-                request.errno = new_errno
-                request.response = new_error
-                request.cond:broadcast()
-            end
-            requests = {}
+            requests:reset(box.error.new({code = new_errno,
+                                          reason = new_error}))
         end
     end
 
@@ -468,20 +304,8 @@ local function create_transport(host, port, user, password, callback,
         local id = next_request_id
         encode_method(method, send_buf, id, ...)
         next_request_id = next_id(id)
-        -- Request in most cases has maximum 10 members:
-        -- method, buffer, skip_header, id, cond, errno, response,
-        -- on_push, on_push_ctx and format.
-        local request = setmetatable(table_new(0, 10), request_mt)
-        request.method = method
-        request.buffer = buffer
-        request.skip_header = skip_header
-        request.id = id
-        request.cond = fiber.cond()
-        requests[id] = request
-        request.on_push = on_push
-        request.on_push_ctx = on_push_ctx
-        request.format = format
-        return request
+        return internal.new_request(requests, id, buffer, skip_header, method,
+                                    on_push, on_push_ctx, format)
     end
 
     --
@@ -502,76 +326,13 @@ local function create_transport(host, port, user, password, callback,
 
     local function dispatch_response_iproto(hdr, body_rpos, body_end)
         local id = hdr[IPROTO_SYNC_KEY]
-        local request = requests[id]
-        if request == nil then -- nobody is waiting for the response
-            return
-        end
         local status = hdr[IPROTO_STATUS_KEY]
-        local body_len = body_end - body_rpos
-
-        if status > IPROTO_CHUNK_KEY then
-            -- Handle errors
-            requests[id] = nil
-            request.id = nil
-            request.errno = band(status, IPROTO_ERRNO_MASK)
-            request.response = decode_error(body_rpos, request.errno)
-            request.cond:broadcast()
-            return
-        end
-
-        local buffer = request.buffer
-        if buffer ~= nil then
-            -- Copy xrow.body to user-provided buffer
-            if request.skip_header then
-                -- Skip {[IPROTO_DATA_KEY] = ...} wrapper.
-                local map_len, key
-                map_len, body_rpos = decode_map_header(body_rpos, body_len)
-                assert(map_len == 1)
-                key, body_rpos = decode(body_rpos)
-                assert(key == IPROTO_DATA_KEY)
-                body_len = body_end - body_rpos
-            end
-            local wpos = buffer:alloc(body_len)
-            ffi.copy(wpos, body_rpos, body_len)
-            body_len = tonumber(body_len)
-            if status == IPROTO_OK_KEY then
-                request.response = body_len
-                requests[id] = nil
-                request.id = nil
-            else
-                request.on_push(request.on_push_ctx, body_len)
-            end
-            request.cond:broadcast()
-            return
-        end
-
-        local real_end
-        -- Decode xrow.body[DATA] to Lua objects
-        if status == IPROTO_OK_KEY then
-            request.response, real_end = decode_method(request.method,
-                                                       body_rpos, body_end,
-                                                       request.format)
-            assert(real_end == body_end, "invalid body length")
-            requests[id] = nil
-            request.id = nil
-        else
-            local msg
-            msg, real_end, request.errno = decode_push(body_rpos, body_end)
-            assert(real_end == body_end, "invalid body length")
-            request.on_push(request.on_push_ctx, msg)
-        end
-        request.cond:broadcast()
+        internal.dispatch_response_iproto(requests, id, status,
+                                          body_rpos, body_end)
     end
 
     local function dispatch_response_console(rid, response)
-        local request = requests[rid]
-        if request == nil then -- nobody is waiting for the response
-            return
-        end
-        request.id = nil
-        requests[rid] = nil
-        request.response = response
-        request.cond:broadcast()
+        internal.dispatch_response_console(requests, rid, response)
     end
 
     local function new_request_id()
-- 
2.25.1



More information about the Tarantool-patches mailing list