From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 8A41B6EC57; Fri, 23 Jul 2021 14:15:03 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 8A41B6EC57 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1627038903; bh=wkD2FeiUYiwaPxPa6OpGMebltjd/HLe/dNrsRENDqAY=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To: From; b=QNIXFOm0RqvuMWopg61jS+FObPGNRhGThs9CICoyHZrvM4s/LoG+cVOYvPBm4r4kC CFIN8wjKQHIydqRcHB5e5XgtP3NOgvBeFPS07ukwPELt+KuK1MG6tL1Ic6LFrvLNN9 kd7z74pYCwT82C3y6EIcRGl6QxgGHKEZpLzZxmpQ= Received: from smtpng1.i.mail.ru (smtpng1.i.mail.ru [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 615C26F3F2 for ; Fri, 23 Jul 2021 14:07:52 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 615C26F3F2 Received: by smtpng1.m.smailru.net with esmtpa (envelope-from ) id 1m6t1z-0004dl-Qi; Fri, 23 Jul 2021 14:07:52 +0300 To: tarantool-patches@dev.tarantool.org Date: Fri, 23 Jul 2021 14:07:25 +0300 Message-Id: X-Mailer: git-send-email 2.25.1 In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-174C08C4: 5188C02AEC42908C481ED7ADC579193296BBA28369E3F2D2713F3D5F7D406D31BCF678C7329BA986 X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD941C43E597735A9C3A9514C5AE4B3B389A94BDFA06D40730D182A05F538085040CE607D6D38DF6736E129B338B38340BC85733125DA68C7B7F3E9AFA13A624D44 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7F65C230EDDCD559EEA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006374B6C65B7367884A58638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D8FA32DBB362F81E5CC63711BBA2F4EF52117882F4460429724CE54428C33FAD305F5C1EE8F4F765FCAA867293B0326636D2E47CDBA5A96583BD4B6F7A4D31EC0BC014FD901B82EE079FA2833FD35BB23D27C277FBC8AE2E8BAA867293B0326636D2E47CDBA5A96583BA9C0B312567BB231DD303D21008E29813377AFFFEAFD269A417C69337E82CC2E827F84554CEF50127C277FBC8AE2E8BA83251EDC214901ED5E8D9A59859A8B6045A9A90E9EED90B089D37D7C0E48F6C5571747095F342E88FB05168BE4CE3AF X-C1DE0DAB: 8BD88D57C5CADBC8B2710865C38675108D126DDD7DC6F381A3B1A56EE2B804F6B226C914C9968946695E9D90444CEC264DCC8C77FBA9901322D2CEDE4E95CF1BDBE8DEE28BC9005C095FFBCAB1CFE8AABCA57AF85F7723F26FB49DA6D604A9D566736D56087BE5D6589120F7DAE46353205367B2BCC23E5BF1F547F318071A92BDAD6C7F3747799A X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D349379E7F8541B6C9A01E085285766728BFDF4DFEE66CD07D098C0C850DB8A831008E749744EB73F4D1D7E09C32AA3244C176865A3B21D54ACA10CA331DEA6DD9C39C99C45E8D137E9729B2BEF169E0186 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojbL9S8ysBdXiEX0g4jkpDtZOPitbdAcvh X-Mailru-Sender: 689FA8AB762F7393C37E3C1AEC41BA5DCF7F53FC1A341C6399EAFC22DB3CCC72274CEFED1673C562683ABF942079399BFB559BB5D741EB966A65DFF43FF7BE03240331F90058701C67EA787935ED9F1B X-Mras: Ok Subject: [Tarantool-patches] [PATCH 15/20] net.box: rewrite request implementation in C X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladimir Davydov via Tarantool-patches Reply-To: Vladimir Davydov Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" This patch turns 'request' object and 'requests' table into userdata and rewrites all their methods in C. Needed to rewrite performance-critical parts of net.box in C. --- src/box/lua/net_box.c | 685 +++++++++++++++++++++++++++++++++++++--- src/box/lua/net_box.lua | 259 +-------------- 2 files changed, 657 insertions(+), 287 deletions(-) diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index 122d69e9219e..044f7d337ca7 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -46,7 +46,9 @@ #include "lua/msgpack.h" #include +#include "assoc.h" #include "coio.h" +#include "fiber_cond.h" #include "box/errcode.h" #include "lua/fiber.h" #include "mpstream/mpstream.h" @@ -76,6 +78,253 @@ enum netbox_method { netbox_method_MAX }; +struct netbox_registry { + /* sync -> netbox_request */ + struct mh_i64ptr_t *requests; +}; + +struct netbox_request { + enum netbox_method method; + /* + * Unique identifier needed for matching the request with its response. + * Used as a key in the registry. + */ + uint64_t sync; + /* + * The registry this request belongs to or NULL if the request has been + * completed. + */ + struct netbox_registry *registry; + /* Format used for decoding the response (ref incremented). */ + struct tuple_format *format; + /* Signaled when the response is received. */ + struct fiber_cond cond; + /* + * A user-provided buffer to which the response body should be copied. + * If NULL, the response will be decoded to Lua stack. + */ + struct ibuf *buffer; + /* + * Lua reference to the buffer. Used to prevent garbage collection in + * case the user discards the request. + */ + int buffer_ref; + /* + * Whether to skip MessagePack map header and IPROTO_DATA key when + * copying the response body to a user-provided buffer. Ignored if + * buffer is not set. + */ + bool skip_header; + /* Lua references to on_push trigger and its context. */ + int on_push_ref; + int on_push_ctx_ref; + /* + * Reference to the request result or LUA_NOREF if the response hasn't + * been received yet. If the response was decoded to a user-provided + * buffer, the result stores the length of the decoded data. + */ + int result_ref; + /* + * Error if the request failed (ref incremented). NULL on success or if + * the response hasn't been received yet. + */ + struct error *error; + +}; + +static const char netbox_registry_typename[] = "net.box.registry"; +static const char netbox_request_typename[] = "net.box.request"; + +static void +netbox_request_create(struct netbox_request *request) +{ + request->method = netbox_method_MAX; + request->sync = -1; + request->registry = NULL; + request->format = NULL; + fiber_cond_create(&request->cond); + request->buffer = NULL; + request->buffer_ref = LUA_REFNIL; + request->skip_header = false; + request->on_push_ref = LUA_REFNIL; + request->on_push_ctx_ref = LUA_REFNIL; + request->result_ref = LUA_NOREF; + request->error = NULL; +} + +static void +netbox_request_destroy(struct netbox_request *request) +{ + assert(request->registry == NULL); + if (request->format != NULL) + tuple_format_unref(request->format); + fiber_cond_destroy(&request->cond); + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->buffer_ref); + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->on_push_ref); + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->on_push_ctx_ref); + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->result_ref); + if (request->error != NULL) + error_unref(request->error); +} + +/* + * Adds a request to a registry. There must not be a request with the same id + * (sync) in the registry. Returns -1 if out of memory. + */ +static int +netbox_request_register(struct netbox_request *request, + struct netbox_registry *registry) +{ + assert(request->registry == NULL); + struct mh_i64ptr_t *h = registry->requests; + struct mh_i64ptr_node_t node = { request->sync, request }; + struct mh_i64ptr_node_t *old_node = NULL; + if (mh_i64ptr_put(h, &node, &old_node, NULL) == mh_end(h)) { + diag_set(OutOfMemory, 0, "mhash", "netbox_registry"); + return -1; + } + assert(old_node == NULL); + request->registry = registry; + return 0; +} + +/* + * Unregisters a previously registered request. Does nothing if the request has + * already been unregistered or has never been registered. + */ +static void +netbox_request_unregister(struct netbox_request *request) +{ + struct netbox_registry *registry = request->registry; + if (registry == NULL) + return; + request->registry = NULL; + struct mh_i64ptr_t *h = registry->requests; + mh_int_t k = mh_i64ptr_find(h, request->sync, NULL); + assert(k != mh_end(h)); + assert(mh_i64ptr_node(h, k)->val == request); + mh_i64ptr_del(h, k, NULL); +} + +static bool +netbox_request_is_ready(struct netbox_request *request) +{ + return request->registry == NULL; +} + +static void +netbox_request_signal(struct netbox_request *request) +{ + fiber_cond_broadcast(&request->cond); +} + +static void +netbox_request_complete(struct netbox_request *request) +{ + netbox_request_unregister(request); + netbox_request_signal(request); +} + +/* + * Waits on netbox_request::cond. Subtracts the wait time from the timeout. + * Returns false on timeout or if the fiber was cancelled. + */ +static bool +netbox_request_wait(struct netbox_request *request, double *timeout) +{ + double ts = ev_monotonic_now(loop()); + int rc = fiber_cond_wait_timeout(&request->cond, *timeout); + *timeout -= ev_monotonic_now(loop()) - ts; + return rc == 0; +} + +static void +netbox_request_set_result(struct netbox_request *request, int result_ref) +{ + assert(request->result_ref == LUA_NOREF); + request->result_ref = result_ref; +} + +static void +netbox_request_set_error(struct netbox_request *request, struct error *error) +{ + assert(request->error == NULL); + request->error = error; + error_ref(error); +} + +/* + * Pushes the result or error to Lua stack. See the comment to request.result() + * for more information about the values pushed. + */ +static int +netbox_request_push_result(struct netbox_request *request, struct lua_State *L) +{ + if (!netbox_request_is_ready(request)) { + diag_set(ClientError, ER_PROC_LUA, "Response is not ready"); + return luaT_push_nil_and_error(L); + } + if (request->error != NULL) { + assert(request->result_ref == LUA_NOREF); + diag_set_error(diag_get(), request->error); + return luaT_push_nil_and_error(L); + } else { + assert(request->result_ref != LUA_NOREF); + lua_rawgeti(L, LUA_REGISTRYINDEX, request->result_ref); + } + return 1; +} + +static int +netbox_registry_create(struct netbox_registry *registry) +{ + registry->requests = mh_i64ptr_new(); + if (registry->requests == NULL) { + diag_set(OutOfMemory, 0, "mhash", "netbox_registry"); + return -1; + } + return 0; +} + +static void +netbox_registry_destroy(struct netbox_registry *registry) +{ + struct mh_i64ptr_t *h = registry->requests; + assert(mh_size(h) == 0); + mh_i64ptr_delete(h); +} + +/* + * Looks up a request by id (sync). Returns NULL if not found. + */ +static struct netbox_request * +netbox_registry_lookup(struct netbox_registry *registry, uint64_t sync) +{ + struct mh_i64ptr_t *h = registry->requests; + mh_int_t k = mh_i64ptr_find(h, sync, NULL); + if (k == mh_end(h)) + return NULL; + return mh_i64ptr_node(h, k)->val; +} + +/* + * Completes all requests in the registry with the given error and cleans up + * the hash. Called when the associated connection is closed. + */ +static void +netbox_registry_reset(struct netbox_registry *registry, struct error *error) +{ + struct mh_i64ptr_t *h = registry->requests; + mh_int_t k; + mh_foreach(h, k) { + struct netbox_request *request = mh_i64ptr_node(h, k)->val; + request->registry = NULL; + netbox_request_set_error(request, error); + netbox_request_signal(request); + } + mh_i64ptr_clear(h); +} + static inline size_t netbox_begin_encode(struct mpstream *stream, uint64_t sync, enum iproto_type type) @@ -1096,17 +1345,13 @@ netbox_decode_prepare(struct lua_State *L, const char **data, } /* - * Decodes a response body for the specified method. Pushes the result and the - * end of the decoded data to Lua stack. - * - * Takes the following arguments: - * - method: a value from the netbox_method enumeration - * - data: pointer to the data to decode (char ptr) - * - data_end: pointer to the end of the data (char ptr) - * - format: tuple format to use for decoding the body or nil + * Decodes a response body for the specified method and pushes the result to + * Lua stack. */ -static int -netbox_decode_method(struct lua_State *L) +static void +netbox_decode_method(struct lua_State *L, enum netbox_method method, + const char **data, const char *data_end, + struct tuple_format *format) { typedef void (*method_decoder_f)(struct lua_State *L, const char **data, const char *data_end, @@ -1131,34 +1376,16 @@ netbox_decode_method(struct lua_State *L) [NETBOX_COUNT] = netbox_decode_value, [NETBOX_INJECT] = netbox_decode_table, }; - enum netbox_method method = lua_tointeger(L, 1); - assert(method < netbox_method_MAX); - uint32_t ctypeid; - const char *data = *(const char **)luaL_checkcdata(L, 2, &ctypeid); - assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR); - const char *data_end = *(const char **)luaL_checkcdata(L, 3, &ctypeid); - assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR); - struct tuple_format *format; - if (!lua_isnil(L, 4)) - format = lbox_check_tuple_format(L, 4); - else - format = tuple_format_runtime; - method_decoder[method](L, &data, data_end, format); - *(const char **)luaL_pushcdata(L, CTID_CONST_CHAR_PTR) = data; - return 2; + method_decoder[method](L, data, data_end, format); } /* - * Decodes an error from raw data and pushes it to Lua stack. Takes a pointer - * to the data (char ptr) and an error code. + * Decodes an error from raw data. On success returns the decoded error object + * with ref counter incremented. On failure returns NULL. */ -static int -netbox_decode_error(struct lua_State *L) +static struct error * +netbox_decode_error(const char **data, uint32_t errcode) { - uint32_t ctypeid; - const char **data = luaL_checkcdata(L, 1, &ctypeid); - assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR); - uint32_t errcode = lua_tointeger(L, 2); struct error *error = NULL; assert(mp_typeof(**data) == MP_MAP); uint32_t map_size = mp_decode_map(data); @@ -1169,7 +1396,7 @@ netbox_decode_error(struct lua_State *L) error_unref(error); error = error_unpack_unsafe(data); if (error == NULL) - return luaT_error(L); + return NULL; error_ref(error); /* * IPROTO_ERROR comprises error encoded with @@ -1200,22 +1427,404 @@ netbox_decode_error(struct lua_State *L) error = box_error_last(); error_ref(error); } - luaT_pusherror(L, error); - error_unref(error); + return error; +} + +static inline struct netbox_registry * +luaT_check_netbox_registry(struct lua_State *L, int idx) +{ + return luaL_checkudata(L, idx, netbox_registry_typename); +} + +static int +luaT_netbox_registry_gc(struct lua_State *L) +{ + struct netbox_registry *registry = luaT_check_netbox_registry(L, 1); + netbox_registry_destroy(registry); + return 0; +} + +static int +luaT_netbox_registry_reset(struct lua_State *L) +{ + struct netbox_registry *registry = luaT_check_netbox_registry(L, 1); + struct error *error = luaL_checkerror(L, 2); + netbox_registry_reset(registry, error); + return 0; +} + +static inline struct netbox_request * +luaT_check_netbox_request(struct lua_State *L, int idx) +{ + return luaL_checkudata(L, idx, netbox_request_typename); +} + +static int +luaT_netbox_request_gc(struct lua_State *L) +{ + struct netbox_request *request = luaT_check_netbox_request(L, 1); + netbox_request_unregister(request); + netbox_request_destroy(request); + return 0; +} + +/* + * Returns true if the response was received for the given request. + */ +static int +luaT_netbox_request_is_ready(struct lua_State *L) +{ + struct netbox_request *request = luaT_check_netbox_request(L, 1); + lua_pushboolean(L, netbox_request_is_ready(request)); return 1; } +/* + * Obtains the result of the given request. + * + * Returns: + * - nil, error if the response failed or not ready + * - response body (table) if the response is ready and buffer is nil + * - body length in bytes if the response was written to the buffer + */ +static int +luaT_netbox_request_result(struct lua_State *L) +{ + struct netbox_request *request = luaT_check_netbox_request(L, 1); + return netbox_request_push_result(request, L); +} + +/* + * Waits until the response is received for the given request and obtains the + * result. Takes an optional timeout argument. + * + * See the comment to request.result() for the return value format. + */ +static int +luaT_netbox_request_wait_result(struct lua_State *L) +{ + struct netbox_request *request = luaT_check_netbox_request(L, 1); + double timeout = TIMEOUT_INFINITY; + if (!lua_isnoneornil(L, 2)) { + if (lua_type(L, 2) != LUA_TNUMBER || + (timeout = lua_tonumber(L, 2)) < 0) + luaL_error(L, "Usage: future:wait_result(timeout)"); + } + while (!netbox_request_is_ready(request)) { + if (!netbox_request_wait(request, &timeout)) { + luaL_testcancel(L); + diag_set(ClientError, ER_TIMEOUT); + return luaT_push_nil_and_error(L); + } + } + return netbox_request_push_result(request, L); +} + +/* + * Makes the connection forget about the given request. When the response is + * received, it will be ignored. It reduces the size of the request registry + * speeding up other requests. + */ +static int +luaT_netbox_request_discard(struct lua_State *L) +{ + struct netbox_request *request = luaT_check_netbox_request(L, 1); + if (!netbox_request_is_ready(request)) { + diag_set(ClientError, ER_PROC_LUA, "Response is discarded"); + netbox_request_set_error(request, diag_last_error(diag_get())); + netbox_request_complete(request); + } + return 0; +} + +/* + * Gets the next message or the final result. Takes the index of the last + * returned message as a second argument (the first argument is ignored). + * The request and timeout are passed as up-values (see request.pairs()). + * + * On success returns the index of the current message (used by the iterator + * implementation to continue iteration) and an object, which is either the + * message pushed with box.session.push() or the final response. If there's no + * more messages left for the request, returns nil, nil. + * + * On error returns box.NULL, error. We return box.NULL instead of nil to + * distinguish end of iteration from error when this function is called in + * `for k, v in future:pairs()`, because nil is treated by Lua as end of + * iteration marker. + */ +static int +luaT_netbox_request_iterator_next(struct lua_State *L) +{ + struct netbox_request *request = luaT_check_netbox_request( + L, lua_upvalueindex(1)); + double timeout = lua_tonumber(L, lua_upvalueindex(2)); + if (luaL_isnull(L, 2)) { + /* The previous call returned an error. */ + goto stop; + } + int i = lua_tointeger(L, 2) + 1; + /* + * In the async mode (and this is the async mode, because 'future' + * objects are not available to the user in the sync mode), on_push_ctx + * refers to a table that contains received messages. We iterate over + * the content of the table. + */ + lua_rawgeti(L, LUA_REGISTRYINDEX, request->on_push_ctx_ref); + int messages_idx = lua_gettop(L); + assert(lua_istable(L, messages_idx)); + int message_count = lua_objlen(L, messages_idx); +retry: + if (i <= message_count) { + lua_pushinteger(L, i); + lua_rawgeti(L, messages_idx, i); + return 2; + } + if (netbox_request_is_ready(request)) { + /* + * After all the messages are iterated, `i` is equal to + * #messages + 1. After we return the response, `i` becomes + * #messages + 2. It is the trigger to finish the iteration. + */ + if (i > message_count + 1) + goto stop; + int n = netbox_request_push_result(request, L); + if (n == 2) + goto error; + /* Success. Return i, response. */ + assert(n == 1); + lua_pushinteger(L, i); + lua_insert(L, -2); + return 2; + } + int old_message_count = message_count; + do { + if (netbox_request_wait(request, &timeout) != 0) { + luaL_testcancel(L); + diag_set(ClientError, ER_TIMEOUT); + luaT_push_nil_and_error(L); + goto error; + } + message_count = lua_objlen(L, messages_idx); + } while (!netbox_request_is_ready(request) && + message_count == old_message_count); + goto retry; +stop: + lua_pushnil(L); + lua_pushnil(L); + return 2; +error: + /* + * When we get here, the top two elements on the stack are nil, error. + * We need to replace nil with box.NULL. + */ + luaL_pushnull(L); + lua_replace(L, -3); + return 2; +} + +static int +luaT_netbox_request_pairs(struct lua_State *L) +{ + if (!lua_isnoneornil(L, 2)) { + if (lua_type(L, 2) != LUA_TNUMBER || lua_tonumber(L, 2) < 0) + luaL_error(L, "Usage: future:pairs(timeout)"); + } else { + if (lua_isnil(L, 2)) + lua_pop(L, 1); + lua_pushnumber(L, TIMEOUT_INFINITY); + } + lua_pushcclosure(L, luaT_netbox_request_iterator_next, 2); + lua_pushnil(L); + lua_pushinteger(L, 0); + return 3; +} + +/* + * Creates a request registry object (userdata) and pushes it to Lua stack. + */ +static int +netbox_new_registry(struct lua_State *L) +{ + struct netbox_registry *registry = lua_newuserdata( + L, sizeof(*registry)); + if (netbox_registry_create(registry) != 0) + luaT_error(L); + luaL_getmetatable(L, netbox_registry_typename); + lua_setmetatable(L, -2); + return 1; +} + +/* + * Creates a request object (userdata) and pushes it to Lua stack. + * + * Takes the following arguments: + * - requests: registry to register the new request with + * - id: id (sync) to assign to the new request + * - buffer: buffer (ibuf) to write the result to or nil + * - skip_header: whether to skip header when writing the result to the buffer + * - method: a value from the netbox_method enumeration + * - on_push: on_push trigger function + * - on_push_ctx: on_push trigger function argument + * - format: tuple format to use for decoding the body or nil + */ +static int +netbox_new_request(struct lua_State *L) +{ + struct netbox_request *request = lua_newuserdata(L, sizeof(*request)); + netbox_request_create(request); + luaL_getmetatable(L, netbox_request_typename); + lua_setmetatable(L, -2); + struct netbox_registry *registry = luaT_check_netbox_registry(L, 1); + request->sync = luaL_touint64(L, 2); + request->buffer = (struct ibuf *) lua_topointer(L, 3); + lua_pushvalue(L, 3); + request->buffer_ref = luaL_ref(L, LUA_REGISTRYINDEX); + request->skip_header = lua_toboolean(L, 4); + request->method = lua_tointeger(L, 5); + assert(request->method < netbox_method_MAX); + lua_pushvalue(L, 6); + request->on_push_ref = luaL_ref(L, LUA_REGISTRYINDEX); + lua_pushvalue(L, 7); + request->on_push_ctx_ref = luaL_ref(L, LUA_REGISTRYINDEX); + if (!lua_isnil(L, 8)) + request->format = lbox_check_tuple_format(L, 8); + else + request->format = tuple_format_runtime; + tuple_format_ref(request->format); + if (netbox_request_register(request, registry) != 0) + luaT_error(L); + return 1; +} + +/* + * Given a request registry, request id (sync), status, and a pointer to a + * response body, decodes the response and either completes the request or + * invokes the on-push trigger, depending on the status. + */ +static int +netbox_dispatch_response_iproto(struct lua_State *L) +{ + struct netbox_registry *registry = luaT_check_netbox_registry(L, 1); + uint64_t sync = luaL_touint64(L, 2); + enum iproto_type status = lua_tointeger(L, 3); + uint32_t ctypeid; + const char *data = *(const char **)luaL_checkcdata(L, 4, &ctypeid); + assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR); + const char *data_end = *(const char **)luaL_checkcdata(L, 5, &ctypeid); + assert(ctypeid == CTID_CHAR_PTR || ctypeid == CTID_CONST_CHAR_PTR); + struct netbox_request *request = netbox_registry_lookup(registry, sync); + if (request == NULL) { + /* Nobody is waiting for the response. */ + return 0; + } + if (status > IPROTO_CHUNK) { + /* Handle errors. */ + struct error *error = netbox_decode_error( + &data, status & (IPROTO_TYPE_ERROR - 1)); + if (error == NULL) + return luaT_error(L); + netbox_request_set_error(request, error); + error_unref(error); + netbox_request_complete(request); + return 0; + } + if (request->buffer != NULL) { + /* Copy xrow.body to user-provided buffer. */ + if (request->skip_header) + netbox_skip_to_data(&data); + size_t data_len = data_end - data; + void *wpos = ibuf_alloc(request->buffer, data_len); + if (wpos == NULL) + luaL_error(L, "out of memory"); + memcpy(wpos, data, data_len); + lua_pushinteger(L, data_len); + } else { + /* Decode xrow.body[DATA] to Lua objects. */ + if (status == IPROTO_OK) { + netbox_decode_method(L, request->method, &data, + data_end, request->format); + } else { + netbox_decode_value(L, &data, data_end, + request->format); + } + assert(data == data_end); + } + if (status == IPROTO_OK) { + /* + * We received the final response and pushed it to Lua stack. + * Store a reference to it in the request, remove the request + * from the registry, and wake up waiters. + */ + netbox_request_set_result(request, + luaL_ref(L, LUA_REGISTRYINDEX)); + netbox_request_complete(request); + } else { + /* We received a push. Invoke on_push trigger. */ + lua_rawgeti(L, LUA_REGISTRYINDEX, request->on_push_ref); + lua_rawgeti(L, LUA_REGISTRYINDEX, request->on_push_ctx_ref); + /* Push the received message as the second argument. */ + lua_pushvalue(L, -3); + lua_call(L, 2, 0); + netbox_request_signal(request); + } + return 0; +} + +/* + * Given a request registry, request id (sync), and a response string, assigns + * the response to the request and completes it. + */ +static int +netbox_dispatch_response_console(struct lua_State *L) +{ + struct netbox_registry *registry = luaT_check_netbox_registry(L, 1); + uint64_t sync = luaL_touint64(L, 2); + struct netbox_request *request = netbox_registry_lookup(registry, sync); + if (request == NULL) { + /* Nobody is waiting for the response. */ + return 0; + } + /* + * The response is the last argument of this function so it's already + * on the top of the Lua stack. + */ + netbox_request_set_result(request, luaL_ref(L, LUA_REGISTRYINDEX)); + netbox_request_complete(request); + return 0; +} + int luaopen_net_box(struct lua_State *L) { + static const struct luaL_Reg netbox_registry_meta[] = { + { "__gc", luaT_netbox_registry_gc }, + { "reset", luaT_netbox_registry_reset }, + { NULL, NULL } + }; + luaL_register_type(L, netbox_registry_typename, netbox_registry_meta); + + static const struct luaL_Reg netbox_request_meta[] = { + { "__gc", luaT_netbox_request_gc }, + { "is_ready", luaT_netbox_request_is_ready }, + { "result", luaT_netbox_request_result }, + { "wait_result", luaT_netbox_request_wait_result }, + { "discard", luaT_netbox_request_discard }, + { "pairs", luaT_netbox_request_pairs }, + { NULL, NULL } + }; + luaL_register_type(L, netbox_request_typename, netbox_request_meta); + static const luaL_Reg net_box_lib[] = { { "encode_auth", netbox_encode_auth }, { "encode_method", netbox_encode_method }, { "decode_greeting",netbox_decode_greeting }, - { "decode_method", netbox_decode_method }, - { "decode_error", netbox_decode_error }, { "send_and_recv_iproto", netbox_send_and_recv_iproto }, { "send_and_recv_console", netbox_send_and_recv_console }, + { "new_registry", netbox_new_registry }, + { "new_request", netbox_new_request }, + { "dispatch_response_iproto", netbox_dispatch_response_iproto }, + { "dispatch_response_console", + netbox_dispatch_response_console }, { NULL, NULL} }; /* luaL_register_module polutes _G */ diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 0ad6cac022f2..4bc66940ea2a 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -14,9 +14,7 @@ local max = math.max local fiber_clock = fiber.clock local fiber_self = fiber.self local decode = msgpack.decode_unchecked -local decode_map_header = msgpack.decode_map_header -local table_new = require('table.new') local check_iterator_type = box.internal.check_iterator_type local check_index_arg = box.internal.check_index_arg local check_space_arg = box.internal.check_space_arg @@ -25,8 +23,6 @@ local check_primary_index = box.internal.check_primary_index local encode_auth = internal.encode_auth local encode_method = internal.encode_method local decode_greeting = internal.decode_greeting -local decode_method = internal.decode_method -local decode_error = internal.decode_error local TIMEOUT_INFINITY = 500 * 365 * 86400 local VSPACE_ID = 281 @@ -42,13 +38,10 @@ local IPROTO_DATA_KEY = 0x30 local IPROTO_ERROR_24 = 0x31 local IPROTO_ERROR = 0x52 local IPROTO_GREETING_SIZE = 128 -local IPROTO_CHUNK_KEY = 128 -local IPROTO_OK_KEY = 0 -- select errors from box.error local E_UNKNOWN = box.error.UNKNOWN local E_NO_CONNECTION = box.error.NO_CONNECTION -local E_TIMEOUT = box.error.TIMEOUT local E_PROC_LUA = box.error.PROC_LUA local E_NO_SUCH_SPACE = box.error.NO_SUCH_SPACE @@ -191,172 +184,20 @@ local function create_transport(host, port, user, password, callback, local last_error local state_cond = fiber.cond() -- signaled when the state changes - -- Async requests currently 'in flight', keyed by a request - -- id. Value refs are weak hence if a client dies - -- unexpectedly, GC cleans the mess. + -- The registry stores requests that are currently 'in flight' + -- for this connection. -- Async request can not be timed out completely. Instead a -- user must decide when he does not want to wait for -- response anymore. -- Sync requests are implemented as async call + immediate -- wait for a result. - local requests = setmetatable({}, { __mode = 'v' }) + local requests = internal.new_registry() local next_request_id = 1 local worker_fiber local send_buf = buffer.ibuf(buffer.READAHEAD) local recv_buf = buffer.ibuf(buffer.READAHEAD) - -- - -- Async request metamethods. - -- - local request_index = {} - -- - -- When an async request is finalized (with ok or error - no - -- matter), its 'id' field is nullified by a response - -- dispatcher. - -- - function request_index:is_ready() - return self.id == nil - end - -- - -- When a request is finished, a result can be got from a - -- future object anytime. - -- @retval result, nil Success, the response is returned. - -- @retval nil, error Error occured. - -- - function request_index:result() - if self.errno then - if type(self.response) ~= 'cdata' then - -- Error could be set by the connection state - -- machine, and it is usually a string explaining - -- a reason. - self.response = box.error.new({code = self.errno, - reason = self.response}) - end - return nil, self.response - elseif not self.id then - return self.response - else - return nil, box.error.new(box.error.PROC_LUA, - 'Response is not ready') - end - end - -- - -- Get the next message or the final result. - -- @param iterator Iterator object. - -- @param i Index to get a next message from. - -- - -- @retval nil, nil The request is finished. - -- @retval i + 1, object A message/response and its index. - -- @retval box.NULL, error An error occured. When this - -- function is called in 'for k, v in future:pairs()', - -- `k` becomes box.NULL, and `v` becomes error object. - -- On error the key becomes exactly box.NULL instead - -- of nil, because nil is treated by Lua as iteration - -- end marker. Nil does not participate in iteration, - -- and does not allow to continue it. - -- - local function request_iterator_next(iterator, i) - if i == box.NULL then - return nil, nil - else - i = i + 1 - end - local request = iterator.request - local messages = request.on_push_ctx - ::retry:: - if i <= #messages then - return i, messages[i] - end - if request:is_ready() then - -- After all the messages are iterated, `i` is equal - -- to #messages + 1. After response reading `i` - -- becomes #messages + 2. It is the trigger to finish - -- the iteration. - if i > #messages + 1 then - return nil, nil - end - local response, err = request:result() - if err then - return box.NULL, err - end - return i, response - end - local old_message_count = #messages - local timeout = iterator.timeout - repeat - local ts = fiber_clock() - request.cond:wait(timeout) - timeout = timeout - (fiber_clock() - ts) - if request:is_ready() or old_message_count ~= #messages then - goto retry - end - until timeout <= 0 - return box.NULL, box.error.new(E_TIMEOUT) - end - -- - -- Iterate over all messages, received by a request. @Sa - -- request_iterator_next for details what to expect in `for` - -- key/value pairs. - -- @param timeout One iteration timeout. - -- @retval next() callback, iterator, zero key. - -- - function request_index:pairs(timeout) - if timeout then - if type(timeout) ~= 'number' or timeout < 0 then - error('Usage: future:pairs(timeout)') - end - else - timeout = TIMEOUT_INFINITY - end - local iterator = {request = self, timeout = timeout} - return request_iterator_next, iterator, 0 - end - -- - -- Wait for a response or error max timeout seconds. - -- @param timeout Max seconds to wait. - -- @retval result, nil Success, the response is returned. - -- @retval nil, error Error occured. - -- - function request_index:wait_result(timeout) - if timeout then - if type(timeout) ~= 'number' or timeout < 0 then - error('Usage: future:wait_result(timeout)') - end - else - timeout = TIMEOUT_INFINITY - end - if not self:is_ready() then - -- When a response is ready before timeout, the - -- waiting client is waked up prematurely. - while timeout > 0 and not self:is_ready() do - local ts = fiber.clock() - self.cond:wait(timeout) - timeout = timeout - (fiber.clock() - ts) - end - if not self:is_ready() then - return nil, box.error.new(E_TIMEOUT) - end - end - return self:result() - end - -- - -- Make a connection forget about the response. When it will - -- be received, it will be ignored. It reduces size of - -- requests table speeding up other requests. - -- - function request_index:discard() - if self.id then - requests[self.id] = nil - self.id = nil - self.errno = box.error.PROC_LUA - self.response = 'Response is discarded' - self.cond:broadcast() - end - end - - local request_mt = { __index = request_index } - -- STATE SWITCHING -- local function set_state(new_state, new_errno, new_error) state = new_state @@ -366,13 +207,8 @@ local function create_transport(host, port, user, password, callback, state_cond:broadcast() if state == 'error' or state == 'error_reconnect' or state == 'closed' then - for _, request in pairs(requests) do - request.id = nil - request.errno = new_errno - request.response = new_error - request.cond:broadcast() - end - requests = {} + requests:reset(box.error.new({code = new_errno, + reason = new_error})) end end @@ -468,20 +304,8 @@ local function create_transport(host, port, user, password, callback, local id = next_request_id encode_method(method, send_buf, id, ...) next_request_id = next_id(id) - -- Request in most cases has maximum 10 members: - -- method, buffer, skip_header, id, cond, errno, response, - -- on_push, on_push_ctx and format. - local request = setmetatable(table_new(0, 10), request_mt) - request.method = method - request.buffer = buffer - request.skip_header = skip_header - request.id = id - request.cond = fiber.cond() - requests[id] = request - request.on_push = on_push - request.on_push_ctx = on_push_ctx - request.format = format - return request + return internal.new_request(requests, id, buffer, skip_header, method, + on_push, on_push_ctx, format) end -- @@ -502,76 +326,13 @@ local function create_transport(host, port, user, password, callback, local function dispatch_response_iproto(hdr, body_rpos, body_end) local id = hdr[IPROTO_SYNC_KEY] - local request = requests[id] - if request == nil then -- nobody is waiting for the response - return - end local status = hdr[IPROTO_STATUS_KEY] - local body_len = body_end - body_rpos - - if status > IPROTO_CHUNK_KEY then - -- Handle errors - requests[id] = nil - request.id = nil - request.errno = band(status, IPROTO_ERRNO_MASK) - request.response = decode_error(body_rpos, request.errno) - request.cond:broadcast() - return - end - - local buffer = request.buffer - if buffer ~= nil then - -- Copy xrow.body to user-provided buffer - if request.skip_header then - -- Skip {[IPROTO_DATA_KEY] = ...} wrapper. - local map_len, key - map_len, body_rpos = decode_map_header(body_rpos, body_len) - assert(map_len == 1) - key, body_rpos = decode(body_rpos) - assert(key == IPROTO_DATA_KEY) - body_len = body_end - body_rpos - end - local wpos = buffer:alloc(body_len) - ffi.copy(wpos, body_rpos, body_len) - body_len = tonumber(body_len) - if status == IPROTO_OK_KEY then - request.response = body_len - requests[id] = nil - request.id = nil - else - request.on_push(request.on_push_ctx, body_len) - end - request.cond:broadcast() - return - end - - local real_end - -- Decode xrow.body[DATA] to Lua objects - if status == IPROTO_OK_KEY then - request.response, real_end = decode_method(request.method, - body_rpos, body_end, - request.format) - assert(real_end == body_end, "invalid body length") - requests[id] = nil - request.id = nil - else - local msg - msg, real_end, request.errno = decode_push(body_rpos, body_end) - assert(real_end == body_end, "invalid body length") - request.on_push(request.on_push_ctx, msg) - end - request.cond:broadcast() + internal.dispatch_response_iproto(requests, id, status, + body_rpos, body_end) end local function dispatch_response_console(rid, response) - local request = requests[rid] - if request == nil then -- nobody is waiting for the response - return - end - request.id = nil - requests[rid] = nil - request.response = response - request.cond:broadcast() + internal.dispatch_response_console(requests, rid, response) end local function new_request_id() -- 2.25.1