[Tarantool-patches] [PATCH 15/20] net.box: rewrite request implementation in C
Vladimir Davydov
vdavydov at tarantool.org
Fri Jul 23 14:07:25 MSK 2021
This patch turns 'request' object and 'requests' table into userdata and
rewrites all their methods in C. Needed to rewrite performance-critical
parts of net.box in C.
---
src/box/lua/net_box.c | 685 +++++++++++++++++++++++++++++++++++++---
src/box/lua/net_box.lua | 259 +--------------
2 files changed, 657 insertions(+), 287 deletions(-)
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 122d69e9219e..044f7d337ca7 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -46,7 +46,9 @@
#include "lua/msgpack.h"
#include <base64.h>
+#include "assoc.h"
#include "coio.h"
+#include "fiber_cond.h"
#include "box/errcode.h"
#include "lua/fiber.h"
#include "mpstream/mpstream.h"
@@ -76,6 +78,253 @@ enum netbox_method {
netbox_method_MAX
};
+struct netbox_registry {
+ /* sync -> netbox_request */
+ struct mh_i64ptr_t *requests;
+};
+
+struct netbox_request {
+ enum netbox_method method;
+ /*
+ * Unique identifier needed for matching the request with its response.
+ * Used as a key in the registry.
+ */
+ uint64_t sync;
+ /*
+ * The registry this request belongs to or NULL if the request has been
+ * completed.
+ */
+ struct netbox_registry *registry;
+ /* Format used for decoding the response (ref incremented). */
+ struct tuple_format *format;
+ /* Signaled when the response is received. */
+ struct fiber_cond cond;
+ /*
+ * A user-provided buffer to which the response body should be copied.
+ * If NULL, the response will be decoded to Lua stack.
+ */
+ struct ibuf *buffer;
+ /*
+ * Lua reference to the buffer. Used to prevent garbage collection in
+ * case the user discards the request.
+ */
+ int buffer_ref;
+ /*
+ * Whether to skip MessagePack map header and IPROTO_DATA key when
+ * copying the response body to a user-provided buffer. Ignored if
+ * buffer is not set.
+ */
+ bool skip_header;
+ /* Lua references to on_push trigger and its context. */
+ int on_push_ref;
+ int on_push_ctx_ref;
+ /*
+ * Reference to the request result or LUA_NOREF if the response hasn't
+ * been received yet. If the response was decoded to a user-provided
+ * buffer, the result stores the length of the decoded data.
+ */
+ int result_ref;
+ /*
+ * Error if the request failed (ref incremented). NULL on success or if
+ * the response hasn't been received yet.
+ */
+ struct error *error;
+
+};
+
+static const char netbox_registry_typename[] = "net.box.registry";
+static const char netbox_request_typename[] = "net.box.request";
+
+static void
+netbox_request_create(struct netbox_request *request)
+{
+ request->method = netbox_method_MAX;
+ request->sync = -1;
+ request->registry = NULL;
+ request->format = NULL;
+ fiber_cond_create(&request->cond);
+ request->buffer = NULL;
+ request->buffer_ref = LUA_REFNIL;
+ request->skip_header = false;
+ request->on_push_ref = LUA_REFNIL;
+ request->on_push_ctx_ref = LUA_REFNIL;
+ request->result_ref = LUA_NOREF;
+ request->error = NULL;
+}
+
+static void
+netbox_request_destroy(struct netbox_request *request)
+{
+ assert(request->registry == NULL);
+ if (request->format != NULL)
+ tuple_format_unref(request->format);
+ fiber_cond_destroy(&request->cond);
+ luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->buffer_ref);
+ luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->on_push_ref);
+ luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->on_push_ctx_ref);
+ luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->result_ref);
+ if (request->error != NULL)
+ error_unref(request->error);
+}
+
+/*
+ * Adds a request to a registry. There must not be a request with the same id
+ * (sync) in the registry. Returns -1 if out of memory.
+ */
+static int
+netbox_request_register(struct netbox_request *request,
+ struct netbox_registry *registry)
+{
+ assert(request->registry == NULL);
+ struct mh_i64ptr_t *h = registry->requests;
+ struct mh_i64ptr_node_t node = { request->sync, request };
+ struct mh_i64ptr_node_t *old_node = NULL;
+ if (mh_i64ptr_put(h, &node, &old_node, NULL) == mh_end(h)) {
+ diag_set(OutOfMemory, 0, "mhash", "netbox_registry");
+ return -1;
+ }
+ assert(old_node == NULL);
+ request->registry = registry;
+ return 0;
+}
+
+/*
+ * Unregisters a previously registered request. Does nothing if the request has
+ * already been unregistered or has never been registered.
+ */
+static void
+netbox_request_unregister(struct netbox_request *request)
+{
+ struct netbox_registry *registry = request->registry;
+ if (registry == NULL)
+ return;
+ request->registry = NULL;
+ struct mh_i64ptr_t *h = registry->requests;
+ mh_int_t k = mh_i64ptr_find(h, request->sync, NULL);
+ assert(k != mh_end(h));
+ assert(mh_i64ptr_node(h, k)->val == request);
+ mh_i64ptr_del(h, k, NULL);
+}
+
+static bool
+netbox_request_is_ready(struct netbox_request *request)
+{
+ return request->registry == NULL;
+}
+
+static void
+netbox_request_signal(struct netbox_request *request)
+{
+ fiber_cond_broadcast(&request->cond);
+}
+
+static void
+netbox_request_complete(struct netbox_request *request)
+{
+ netbox_request_unregister(request);
+ netbox_request_signal(request);
+}
+
+/*
+ * Waits on netbox_request::cond. Subtracts the wait time from the timeout.
+ * Returns false on timeout or if the fiber was cancelled.
+ */
+static bool
+netbox_request_wait(struct netbox_request *request, double *timeout)
+{
+ double ts = ev_monotonic_now(loop());
+ int rc = fiber_cond_wait_timeout(&request->cond, *timeout);
+ *timeout -= ev_monotonic_now(loop()) - ts;
+ return rc == 0;
+}
+
+static void
+netbox_request_set_result(struct netbox_request *request, int result_ref)
+{
+ assert(request->result_ref == LUA_NOREF);
+ request->result_ref = result_ref;
+}
+
+static void
+netbox_request_set_error(struct netbox_request *request, struct error *error)
+{
+ assert(request->error == NULL);
+ request->error = error;
+ error_ref(error);
+}
+
+/*
+ * Pushes the result or error to Lua stack. See the comment to request.result()
+ * for more information about the values pushed.
+ */
+static int
+netbox_request_push_result(struct netbox_request *request, struct lua_State *L)
+{
+ if (!netbox_request_is_ready(request)) {
+ diag_set(ClientError, ER_PROC_LUA, "Response is not ready");
+ return luaT_push_nil_and_error(L);
+ }
+ if (request->error != NULL) {
+ assert(request->result_ref == LUA_NOREF);
+ diag_set_error(diag_get(), request->error);
+ return luaT_push_nil_and_error(L);
+ } else {
+ assert(request->result_ref != LUA_NOREF);
+ lua_rawgeti(L, LUA_REGISTRYINDEX, request->result_ref);
+ }
+ return 1;
+}
+
+static int
+netbox_registry_create(struct netbox_registry *registry)
+{
+ registry->requests = mh_i64ptr_new();
+ if (registry->requests == NULL) {
+ diag_set(OutOfMemory, 0, "mhash", "netbox_registry");
+ return -1;
+ }
+ return 0;
+}
+
+static void
+netbox_registry_destroy(struct netbox_registry *registry)
+{
+ struct mh_i64ptr_t *h = registry->requests;
+ assert(mh_size(h) == 0);
+ mh_i64ptr_delete(h);
+}
+
+/*
+ * Looks up a request by id (sync). Returns NULL if not found.
+ */
+static struct netbox_request *
+netbox_registry_lookup(struct netbox_registry *registry, uint64_t sync)
+{
+ struct mh_i64ptr_t *h = registry->requests;
+ mh_int_t k = mh_i64ptr_find(h, sync, NULL);
+ if (k == mh_end(h))
+ return NULL;
+ return mh_i64ptr_node(h, k)->val;
+}
+
+/*
+ * Completes all requests in the registry with the given error and cleans up
+ * the hash. Called when the associated connection is closed.
+ */
+static void
+netbox_registry_reset(struct netbox_registry *registry, struct error *error)
+{
+ struct mh_i64ptr_t *h = registry->requests;
+ mh_int_t k;
+ mh_foreach(h, k) {
+ struct netbox_request *request = mh_i64ptr_node(h, k)->val;
+ request->registry = NULL;
+ netbox_request_set_error(request, error);
+ netbox_request_signal(request);
+ }
+ mh_i64ptr_clear(h);
+}
+
static inline size_t
netbox_begin_encode(struct mpstream *stream, uint64_t sync,
enum iproto_type type)
@@ -1096,17 +1345,13 @@ netbox_decode_prepare(struct lua_State *L, const char **data,
}
/*
- * Decodes a response body for the specified method. Pushes the result and the
- * end of the decoded data to Lua stack.
- *
- * Takes the following arguments:
- * - method: a value from the netbox_method enumeration
- * - data: pointer to the data to decode (char ptr)
- * - data_end: pointer to the end of the data (char ptr)
- * - format: tuple format to use for decoding the body or nil
+ * Decodes a response body for the specified method and pushes the result to
+ * Lua stack.
*/
-static int
-netbox_decode_method(struct lua_State *L)
+static void
+netbox_decode_method(struct lua_State *L, enum netbox_method method,
+ const char **data, const char *data_end,
+ struct tuple_format *format)
{
typedef void (*method_decoder_f)(struct lua_State *L, const char **data,
const char *data_end,
@@ -1131,34 +1376,16 @@ netbox_decode_method(struct lua_State *L)
[NETBOX_COUNT] = netbox_decode_value,
[NETBOX_INJECT] = netbox_decode_table,
};
- enum netbox_method method = lua_tointeger(L, 1);
- assert(method < netbox_method_MAX);
- uint32_t ctypeid;
- const char *data = *(const char **)luaL_checkcdata(L, 2, &ctypeid);
- assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
- const char *data_end = *(const char **)luaL_checkcdata(L, 3, &ctypeid);
- assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
- struct tuple_format *format;
- if (!lua_isnil(L, 4))
- format = lbox_check_tuple_format(L, 4);
- else
- format = tuple_format_runtime;
- method_decoder[method](L, &data, data_end, format);
- *(const char **)luaL_pushcdata(L, CTID_CONST_CHAR_PTR) = data;
- return 2;
+ method_decoder[method](L, data, data_end, format);
}
/*
- * Decodes an error from raw data and pushes it to Lua stack. Takes a pointer
- * to the data (char ptr) and an error code.
+ * Decodes an error from raw data. On success returns the decoded error object
+ * with ref counter incremented. On failure returns NULL.
*/
-static int
-netbox_decode_error(struct lua_State *L)
+static struct error *
+netbox_decode_error(const char **data, uint32_t errcode)
{
- uint32_t ctypeid;
- const char **data = luaL_checkcdata(L, 1, &ctypeid);
- assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
- uint32_t errcode = lua_tointeger(L, 2);
struct error *error = NULL;
assert(mp_typeof(**data) == MP_MAP);
uint32_t map_size = mp_decode_map(data);
@@ -1169,7 +1396,7 @@ netbox_decode_error(struct lua_State *L)
error_unref(error);
error = error_unpack_unsafe(data);
if (error == NULL)
- return luaT_error(L);
+ return NULL;
error_ref(error);
/*
* IPROTO_ERROR comprises error encoded with
@@ -1200,22 +1427,404 @@ netbox_decode_error(struct lua_State *L)
error = box_error_last();
error_ref(error);
}
- luaT_pusherror(L, error);
- error_unref(error);
+ return error;
+}
+
+static inline struct netbox_registry *
+luaT_check_netbox_registry(struct lua_State *L, int idx)
+{
+ return luaL_checkudata(L, idx, netbox_registry_typename);
+}
+
+static int
+luaT_netbox_registry_gc(struct lua_State *L)
+{
+ struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
+ netbox_registry_destroy(registry);
+ return 0;
+}
+
+static int
+luaT_netbox_registry_reset(struct lua_State *L)
+{
+ struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
+ struct error *error = luaL_checkerror(L, 2);
+ netbox_registry_reset(registry, error);
+ return 0;
+}
+
+static inline struct netbox_request *
+luaT_check_netbox_request(struct lua_State *L, int idx)
+{
+ return luaL_checkudata(L, idx, netbox_request_typename);
+}
+
+static int
+luaT_netbox_request_gc(struct lua_State *L)
+{
+ struct netbox_request *request = luaT_check_netbox_request(L, 1);
+ netbox_request_unregister(request);
+ netbox_request_destroy(request);
+ return 0;
+}
+
+/*
+ * Returns true if the response was received for the given request.
+ */
+static int
+luaT_netbox_request_is_ready(struct lua_State *L)
+{
+ struct netbox_request *request = luaT_check_netbox_request(L, 1);
+ lua_pushboolean(L, netbox_request_is_ready(request));
return 1;
}
+/*
+ * Obtains the result of the given request.
+ *
+ * Returns:
+ * - nil, error if the response failed or not ready
+ * - response body (table) if the response is ready and buffer is nil
+ * - body length in bytes if the response was written to the buffer
+ */
+static int
+luaT_netbox_request_result(struct lua_State *L)
+{
+ struct netbox_request *request = luaT_check_netbox_request(L, 1);
+ return netbox_request_push_result(request, L);
+}
+
+/*
+ * Waits until the response is received for the given request and obtains the
+ * result. Takes an optional timeout argument.
+ *
+ * See the comment to request.result() for the return value format.
+ */
+static int
+luaT_netbox_request_wait_result(struct lua_State *L)
+{
+ struct netbox_request *request = luaT_check_netbox_request(L, 1);
+ double timeout = TIMEOUT_INFINITY;
+ if (!lua_isnoneornil(L, 2)) {
+ if (lua_type(L, 2) != LUA_TNUMBER ||
+ (timeout = lua_tonumber(L, 2)) < 0)
+ luaL_error(L, "Usage: future:wait_result(timeout)");
+ }
+ while (!netbox_request_is_ready(request)) {
+ if (!netbox_request_wait(request, &timeout)) {
+ luaL_testcancel(L);
+ diag_set(ClientError, ER_TIMEOUT);
+ return luaT_push_nil_and_error(L);
+ }
+ }
+ return netbox_request_push_result(request, L);
+}
+
+/*
+ * Makes the connection forget about the given request. When the response is
+ * received, it will be ignored. It reduces the size of the request registry
+ * speeding up other requests.
+ */
+static int
+luaT_netbox_request_discard(struct lua_State *L)
+{
+ struct netbox_request *request = luaT_check_netbox_request(L, 1);
+ if (!netbox_request_is_ready(request)) {
+ diag_set(ClientError, ER_PROC_LUA, "Response is discarded");
+ netbox_request_set_error(request, diag_last_error(diag_get()));
+ netbox_request_complete(request);
+ }
+ return 0;
+}
+
+/*
+ * Gets the next message or the final result. Takes the index of the last
+ * returned message as a second argument (the first argument is ignored).
+ * The request and timeout are passed as up-values (see request.pairs()).
+ *
+ * On success returns the index of the current message (used by the iterator
+ * implementation to continue iteration) and an object, which is either the
+ * message pushed with box.session.push() or the final response. If there's no
+ * more messages left for the request, returns nil, nil.
+ *
+ * On error returns box.NULL, error. We return box.NULL instead of nil to
+ * distinguish end of iteration from error when this function is called in
+ * `for k, v in future:pairs()`, because nil is treated by Lua as end of
+ * iteration marker.
+ */
+static int
+luaT_netbox_request_iterator_next(struct lua_State *L)
+{
+ struct netbox_request *request = luaT_check_netbox_request(
+ L, lua_upvalueindex(1));
+ double timeout = lua_tonumber(L, lua_upvalueindex(2));
+ if (luaL_isnull(L, 2)) {
+ /* The previous call returned an error. */
+ goto stop;
+ }
+ int i = lua_tointeger(L, 2) + 1;
+ /*
+ * In the async mode (and this is the async mode, because 'future'
+ * objects are not available to the user in the sync mode), on_push_ctx
+ * refers to a table that contains received messages. We iterate over
+ * the content of the table.
+ */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, request->on_push_ctx_ref);
+ int messages_idx = lua_gettop(L);
+ assert(lua_istable(L, messages_idx));
+ int message_count = lua_objlen(L, messages_idx);
+retry:
+ if (i <= message_count) {
+ lua_pushinteger(L, i);
+ lua_rawgeti(L, messages_idx, i);
+ return 2;
+ }
+ if (netbox_request_is_ready(request)) {
+ /*
+ * After all the messages are iterated, `i` is equal to
+ * #messages + 1. After we return the response, `i` becomes
+ * #messages + 2. It is the trigger to finish the iteration.
+ */
+ if (i > message_count + 1)
+ goto stop;
+ int n = netbox_request_push_result(request, L);
+ if (n == 2)
+ goto error;
+ /* Success. Return i, response. */
+ assert(n == 1);
+ lua_pushinteger(L, i);
+ lua_insert(L, -2);
+ return 2;
+ }
+ int old_message_count = message_count;
+ do {
+ if (netbox_request_wait(request, &timeout) != 0) {
+ luaL_testcancel(L);
+ diag_set(ClientError, ER_TIMEOUT);
+ luaT_push_nil_and_error(L);
+ goto error;
+ }
+ message_count = lua_objlen(L, messages_idx);
+ } while (!netbox_request_is_ready(request) &&
+ message_count == old_message_count);
+ goto retry;
+stop:
+ lua_pushnil(L);
+ lua_pushnil(L);
+ return 2;
+error:
+ /*
+ * When we get here, the top two elements on the stack are nil, error.
+ * We need to replace nil with box.NULL.
+ */
+ luaL_pushnull(L);
+ lua_replace(L, -3);
+ return 2;
+}
+
+static int
+luaT_netbox_request_pairs(struct lua_State *L)
+{
+ if (!lua_isnoneornil(L, 2)) {
+ if (lua_type(L, 2) != LUA_TNUMBER || lua_tonumber(L, 2) < 0)
+ luaL_error(L, "Usage: future:pairs(timeout)");
+ } else {
+ if (lua_isnil(L, 2))
+ lua_pop(L, 1);
+ lua_pushnumber(L, TIMEOUT_INFINITY);
+ }
+ lua_pushcclosure(L, luaT_netbox_request_iterator_next, 2);
+ lua_pushnil(L);
+ lua_pushinteger(L, 0);
+ return 3;
+}
+
+/*
+ * Creates a request registry object (userdata) and pushes it to Lua stack.
+ */
+static int
+netbox_new_registry(struct lua_State *L)
+{
+ struct netbox_registry *registry = lua_newuserdata(
+ L, sizeof(*registry));
+ if (netbox_registry_create(registry) != 0)
+ luaT_error(L);
+ luaL_getmetatable(L, netbox_registry_typename);
+ lua_setmetatable(L, -2);
+ return 1;
+}
+
+/*
+ * Creates a request object (userdata) and pushes it to Lua stack.
+ *
+ * Takes the following arguments:
+ * - requests: registry to register the new request with
+ * - id: id (sync) to assign to the new request
+ * - buffer: buffer (ibuf) to write the result to or nil
+ * - skip_header: whether to skip header when writing the result to the buffer
+ * - method: a value from the netbox_method enumeration
+ * - on_push: on_push trigger function
+ * - on_push_ctx: on_push trigger function argument
+ * - format: tuple format to use for decoding the body or nil
+ */
+static int
+netbox_new_request(struct lua_State *L)
+{
+ struct netbox_request *request = lua_newuserdata(L, sizeof(*request));
+ netbox_request_create(request);
+ luaL_getmetatable(L, netbox_request_typename);
+ lua_setmetatable(L, -2);
+ struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
+ request->sync = luaL_touint64(L, 2);
+ request->buffer = (struct ibuf *) lua_topointer(L, 3);
+ lua_pushvalue(L, 3);
+ request->buffer_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ request->skip_header = lua_toboolean(L, 4);
+ request->method = lua_tointeger(L, 5);
+ assert(request->method < netbox_method_MAX);
+ lua_pushvalue(L, 6);
+ request->on_push_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ lua_pushvalue(L, 7);
+ request->on_push_ctx_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ if (!lua_isnil(L, 8))
+ request->format = lbox_check_tuple_format(L, 8);
+ else
+ request->format = tuple_format_runtime;
+ tuple_format_ref(request->format);
+ if (netbox_request_register(request, registry) != 0)
+ luaT_error(L);
+ return 1;
+}
+
+/*
+ * Given a request registry, request id (sync), status, and a pointer to a
+ * response body, decodes the response and either completes the request or
+ * invokes the on-push trigger, depending on the status.
+ */
+static int
+netbox_dispatch_response_iproto(struct lua_State *L)
+{
+ struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
+ uint64_t sync = luaL_touint64(L, 2);
+ enum iproto_type status = lua_tointeger(L, 3);
+ uint32_t ctypeid;
+ const char *data = *(const char **)luaL_checkcdata(L, 4, &ctypeid);
+ assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
+ const char *data_end = *(const char **)luaL_checkcdata(L, 5, &ctypeid);
+ assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR);
+ struct netbox_request *request = netbox_registry_lookup(registry, sync);
+ if (request == NULL) {
+ /* Nobody is waiting for the response. */
+ return 0;
+ }
+ if (status > IPROTO_CHUNK) {
+ /* Handle errors. */
+ struct error *error = netbox_decode_error(
+ &data, status & (IPROTO_TYPE_ERROR - 1));
+ if (error == NULL)
+ return luaT_error(L);
+ netbox_request_set_error(request, error);
+ error_unref(error);
+ netbox_request_complete(request);
+ return 0;
+ }
+ if (request->buffer != NULL) {
+ /* Copy xrow.body to user-provided buffer. */
+ if (request->skip_header)
+ netbox_skip_to_data(&data);
+ size_t data_len = data_end - data;
+ void *wpos = ibuf_alloc(request->buffer, data_len);
+ if (wpos == NULL)
+ luaL_error(L, "out of memory");
+ memcpy(wpos, data, data_len);
+ lua_pushinteger(L, data_len);
+ } else {
+ /* Decode xrow.body[DATA] to Lua objects. */
+ if (status == IPROTO_OK) {
+ netbox_decode_method(L, request->method, &data,
+ data_end, request->format);
+ } else {
+ netbox_decode_value(L, &data, data_end,
+ request->format);
+ }
+ assert(data == data_end);
+ }
+ if (status == IPROTO_OK) {
+ /*
+ * We received the final response and pushed it to Lua stack.
+ * Store a reference to it in the request, remove the request
+ * from the registry, and wake up waiters.
+ */
+ netbox_request_set_result(request,
+ luaL_ref(L, LUA_REGISTRYINDEX));
+ netbox_request_complete(request);
+ } else {
+ /* We received a push. Invoke on_push trigger. */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, request->on_push_ref);
+ lua_rawgeti(L, LUA_REGISTRYINDEX, request->on_push_ctx_ref);
+ /* Push the received message as the second argument. */
+ lua_pushvalue(L, -3);
+ lua_call(L, 2, 0);
+ netbox_request_signal(request);
+ }
+ return 0;
+}
+
+/*
+ * Given a request registry, request id (sync), and a response string, assigns
+ * the response to the request and completes it.
+ */
+static int
+netbox_dispatch_response_console(struct lua_State *L)
+{
+ struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
+ uint64_t sync = luaL_touint64(L, 2);
+ struct netbox_request *request = netbox_registry_lookup(registry, sync);
+ if (request == NULL) {
+ /* Nobody is waiting for the response. */
+ return 0;
+ }
+ /*
+ * The response is the last argument of this function so it's already
+ * on the top of the Lua stack.
+ */
+ netbox_request_set_result(request, luaL_ref(L, LUA_REGISTRYINDEX));
+ netbox_request_complete(request);
+ return 0;
+}
+
int
luaopen_net_box(struct lua_State *L)
{
+ static const struct luaL_Reg netbox_registry_meta[] = {
+ { "__gc", luaT_netbox_registry_gc },
+ { "reset", luaT_netbox_registry_reset },
+ { NULL, NULL }
+ };
+ luaL_register_type(L, netbox_registry_typename, netbox_registry_meta);
+
+ static const struct luaL_Reg netbox_request_meta[] = {
+ { "__gc", luaT_netbox_request_gc },
+ { "is_ready", luaT_netbox_request_is_ready },
+ { "result", luaT_netbox_request_result },
+ { "wait_result", luaT_netbox_request_wait_result },
+ { "discard", luaT_netbox_request_discard },
+ { "pairs", luaT_netbox_request_pairs },
+ { NULL, NULL }
+ };
+ luaL_register_type(L, netbox_request_typename, netbox_request_meta);
+
static const luaL_Reg net_box_lib[] = {
{ "encode_auth", netbox_encode_auth },
{ "encode_method", netbox_encode_method },
{ "decode_greeting",netbox_decode_greeting },
- { "decode_method", netbox_decode_method },
- { "decode_error", netbox_decode_error },
{ "send_and_recv_iproto", netbox_send_and_recv_iproto },
{ "send_and_recv_console", netbox_send_and_recv_console },
+ { "new_registry", netbox_new_registry },
+ { "new_request", netbox_new_request },
+ { "dispatch_response_iproto", netbox_dispatch_response_iproto },
+ { "dispatch_response_console",
+ netbox_dispatch_response_console },
{ NULL, NULL}
};
/* luaL_register_module polutes _G */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 0ad6cac022f2..4bc66940ea2a 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -14,9 +14,7 @@ local max = math.max
local fiber_clock = fiber.clock
local fiber_self = fiber.self
local decode = msgpack.decode_unchecked
-local decode_map_header = msgpack.decode_map_header
-local table_new = require('table.new')
local check_iterator_type = box.internal.check_iterator_type
local check_index_arg = box.internal.check_index_arg
local check_space_arg = box.internal.check_space_arg
@@ -25,8 +23,6 @@ local check_primary_index = box.internal.check_primary_index
local encode_auth = internal.encode_auth
local encode_method = internal.encode_method
local decode_greeting = internal.decode_greeting
-local decode_method = internal.decode_method
-local decode_error = internal.decode_error
local TIMEOUT_INFINITY = 500 * 365 * 86400
local VSPACE_ID = 281
@@ -42,13 +38,10 @@ local IPROTO_DATA_KEY = 0x30
local IPROTO_ERROR_24 = 0x31
local IPROTO_ERROR = 0x52
local IPROTO_GREETING_SIZE = 128
-local IPROTO_CHUNK_KEY = 128
-local IPROTO_OK_KEY = 0
-- select errors from box.error
local E_UNKNOWN = box.error.UNKNOWN
local E_NO_CONNECTION = box.error.NO_CONNECTION
-local E_TIMEOUT = box.error.TIMEOUT
local E_PROC_LUA = box.error.PROC_LUA
local E_NO_SUCH_SPACE = box.error.NO_SUCH_SPACE
@@ -191,172 +184,20 @@ local function create_transport(host, port, user, password, callback,
local last_error
local state_cond = fiber.cond() -- signaled when the state changes
- -- Async requests currently 'in flight', keyed by a request
- -- id. Value refs are weak hence if a client dies
- -- unexpectedly, GC cleans the mess.
+ -- The registry stores requests that are currently 'in flight'
+ -- for this connection.
-- Async request can not be timed out completely. Instead a
-- user must decide when he does not want to wait for
-- response anymore.
-- Sync requests are implemented as async call + immediate
-- wait for a result.
- local requests = setmetatable({}, { __mode = 'v' })
+ local requests = internal.new_registry()
local next_request_id = 1
local worker_fiber
local send_buf = buffer.ibuf(buffer.READAHEAD)
local recv_buf = buffer.ibuf(buffer.READAHEAD)
- --
- -- Async request metamethods.
- --
- local request_index = {}
- --
- -- When an async request is finalized (with ok or error - no
- -- matter), its 'id' field is nullified by a response
- -- dispatcher.
- --
- function request_index:is_ready()
- return self.id == nil
- end
- --
- -- When a request is finished, a result can be got from a
- -- future object anytime.
- -- @retval result, nil Success, the response is returned.
- -- @retval nil, error Error occured.
- --
- function request_index:result()
- if self.errno then
- if type(self.response) ~= 'cdata' then
- -- Error could be set by the connection state
- -- machine, and it is usually a string explaining
- -- a reason.
- self.response = box.error.new({code = self.errno,
- reason = self.response})
- end
- return nil, self.response
- elseif not self.id then
- return self.response
- else
- return nil, box.error.new(box.error.PROC_LUA,
- 'Response is not ready')
- end
- end
- --
- -- Get the next message or the final result.
- -- @param iterator Iterator object.
- -- @param i Index to get a next message from.
- --
- -- @retval nil, nil The request is finished.
- -- @retval i + 1, object A message/response and its index.
- -- @retval box.NULL, error An error occured. When this
- -- function is called in 'for k, v in future:pairs()',
- -- `k` becomes box.NULL, and `v` becomes error object.
- -- On error the key becomes exactly box.NULL instead
- -- of nil, because nil is treated by Lua as iteration
- -- end marker. Nil does not participate in iteration,
- -- and does not allow to continue it.
- --
- local function request_iterator_next(iterator, i)
- if i == box.NULL then
- return nil, nil
- else
- i = i + 1
- end
- local request = iterator.request
- local messages = request.on_push_ctx
- ::retry::
- if i <= #messages then
- return i, messages[i]
- end
- if request:is_ready() then
- -- After all the messages are iterated, `i` is equal
- -- to #messages + 1. After response reading `i`
- -- becomes #messages + 2. It is the trigger to finish
- -- the iteration.
- if i > #messages + 1 then
- return nil, nil
- end
- local response, err = request:result()
- if err then
- return box.NULL, err
- end
- return i, response
- end
- local old_message_count = #messages
- local timeout = iterator.timeout
- repeat
- local ts = fiber_clock()
- request.cond:wait(timeout)
- timeout = timeout - (fiber_clock() - ts)
- if request:is_ready() or old_message_count ~= #messages then
- goto retry
- end
- until timeout <= 0
- return box.NULL, box.error.new(E_TIMEOUT)
- end
- --
- -- Iterate over all messages, received by a request. @Sa
- -- request_iterator_next for details what to expect in `for`
- -- key/value pairs.
- -- @param timeout One iteration timeout.
- -- @retval next() callback, iterator, zero key.
- --
- function request_index:pairs(timeout)
- if timeout then
- if type(timeout) ~= 'number' or timeout < 0 then
- error('Usage: future:pairs(timeout)')
- end
- else
- timeout = TIMEOUT_INFINITY
- end
- local iterator = {request = self, timeout = timeout}
- return request_iterator_next, iterator, 0
- end
- --
- -- Wait for a response or error max timeout seconds.
- -- @param timeout Max seconds to wait.
- -- @retval result, nil Success, the response is returned.
- -- @retval nil, error Error occured.
- --
- function request_index:wait_result(timeout)
- if timeout then
- if type(timeout) ~= 'number' or timeout < 0 then
- error('Usage: future:wait_result(timeout)')
- end
- else
- timeout = TIMEOUT_INFINITY
- end
- if not self:is_ready() then
- -- When a response is ready before timeout, the
- -- waiting client is waked up prematurely.
- while timeout > 0 and not self:is_ready() do
- local ts = fiber.clock()
- self.cond:wait(timeout)
- timeout = timeout - (fiber.clock() - ts)
- end
- if not self:is_ready() then
- return nil, box.error.new(E_TIMEOUT)
- end
- end
- return self:result()
- end
- --
- -- Make a connection forget about the response. When it will
- -- be received, it will be ignored. It reduces size of
- -- requests table speeding up other requests.
- --
- function request_index:discard()
- if self.id then
- requests[self.id] = nil
- self.id = nil
- self.errno = box.error.PROC_LUA
- self.response = 'Response is discarded'
- self.cond:broadcast()
- end
- end
-
- local request_mt = { __index = request_index }
-
-- STATE SWITCHING --
local function set_state(new_state, new_errno, new_error)
state = new_state
@@ -366,13 +207,8 @@ local function create_transport(host, port, user, password, callback,
state_cond:broadcast()
if state == 'error' or state == 'error_reconnect' or
state == 'closed' then
- for _, request in pairs(requests) do
- request.id = nil
- request.errno = new_errno
- request.response = new_error
- request.cond:broadcast()
- end
- requests = {}
+ requests:reset(box.error.new({code = new_errno,
+ reason = new_error}))
end
end
@@ -468,20 +304,8 @@ local function create_transport(host, port, user, password, callback,
local id = next_request_id
encode_method(method, send_buf, id, ...)
next_request_id = next_id(id)
- -- Request in most cases has maximum 10 members:
- -- method, buffer, skip_header, id, cond, errno, response,
- -- on_push, on_push_ctx and format.
- local request = setmetatable(table_new(0, 10), request_mt)
- request.method = method
- request.buffer = buffer
- request.skip_header = skip_header
- request.id = id
- request.cond = fiber.cond()
- requests[id] = request
- request.on_push = on_push
- request.on_push_ctx = on_push_ctx
- request.format = format
- return request
+ return internal.new_request(requests, id, buffer, skip_header, method,
+ on_push, on_push_ctx, format)
end
--
@@ -502,76 +326,13 @@ local function create_transport(host, port, user, password, callback,
local function dispatch_response_iproto(hdr, body_rpos, body_end)
local id = hdr[IPROTO_SYNC_KEY]
- local request = requests[id]
- if request == nil then -- nobody is waiting for the response
- return
- end
local status = hdr[IPROTO_STATUS_KEY]
- local body_len = body_end - body_rpos
-
- if status > IPROTO_CHUNK_KEY then
- -- Handle errors
- requests[id] = nil
- request.id = nil
- request.errno = band(status, IPROTO_ERRNO_MASK)
- request.response = decode_error(body_rpos, request.errno)
- request.cond:broadcast()
- return
- end
-
- local buffer = request.buffer
- if buffer ~= nil then
- -- Copy xrow.body to user-provided buffer
- if request.skip_header then
- -- Skip {[IPROTO_DATA_KEY] = ...} wrapper.
- local map_len, key
- map_len, body_rpos = decode_map_header(body_rpos, body_len)
- assert(map_len == 1)
- key, body_rpos = decode(body_rpos)
- assert(key == IPROTO_DATA_KEY)
- body_len = body_end - body_rpos
- end
- local wpos = buffer:alloc(body_len)
- ffi.copy(wpos, body_rpos, body_len)
- body_len = tonumber(body_len)
- if status == IPROTO_OK_KEY then
- request.response = body_len
- requests[id] = nil
- request.id = nil
- else
- request.on_push(request.on_push_ctx, body_len)
- end
- request.cond:broadcast()
- return
- end
-
- local real_end
- -- Decode xrow.body[DATA] to Lua objects
- if status == IPROTO_OK_KEY then
- request.response, real_end = decode_method(request.method,
- body_rpos, body_end,
- request.format)
- assert(real_end == body_end, "invalid body length")
- requests[id] = nil
- request.id = nil
- else
- local msg
- msg, real_end, request.errno = decode_push(body_rpos, body_end)
- assert(real_end == body_end, "invalid body length")
- request.on_push(request.on_push_ctx, msg)
- end
- request.cond:broadcast()
+ internal.dispatch_response_iproto(requests, id, status,
+ body_rpos, body_end)
end
local function dispatch_response_console(rid, response)
- local request = requests[rid]
- if request == nil then -- nobody is waiting for the response
- return
- end
- request.id = nil
- requests[rid] = nil
- request.response = response
- request.cond:broadcast()
+ internal.dispatch_response_console(requests, rid, response)
end
local function new_request_id()
--
2.25.1
More information about the Tarantool-patches
mailing list