From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 57A876EC58; Tue, 3 Aug 2021 00:54:47 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 57A876EC58 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1627941287; bh=WGv07R82e2Uj7FG0+zwmra/sHJFfQDmk4cxQ3V+I2ao=; h=To:References:Date:In-Reply-To:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To: From; b=Z8Edse1VyjcdmJbLW9FyoLsobj/OMyaLubp41smEMmkHeojvBAGHWwRJlv7IuXzMn siG7gs5c0BWmlsqxixos79mkhk+vKr2zxudGhdSbT2eOmUDf5ou0p5ptK92J2SFpGv EX3QZBUPGpGiYM4kleRPc2YXfJ6O48l69xtnFf3k= Received: from smtpng2.i.mail.ru (smtpng2.i.mail.ru [94.100.179.3]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id AAB736EC58 for ; Tue, 3 Aug 2021 00:54:45 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org AAB736EC58 Received: by smtpng2.m.smailru.net with esmtpa (envelope-from ) id 1mAftU-0006Kq-49; Tue, 03 Aug 2021 00:54:44 +0300 To: Vladimir Davydov , tarantool-patches@dev.tarantool.org References: Message-ID: <903f6291-bab5-985e-f922-042453e4d146@tarantool.org> Date: Mon, 2 Aug 2021 23:54:43 +0200 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:78.0) Gecko/20100101 Thunderbird/78.12.0 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 7bit X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD941C43E597735A9C3104FC76DFAAAAF7DA068FE323FAC4379182A05F538085040814DF4C924A77A86FE9780A38C6E2CC5DBA6D0555F979165E15579F8BADE7907 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7922E451CE6E839B1EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637107AA061ADD8C4B78638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D8D69123CD34D9152ED74F0689402E73BF117882F4460429724CE54428C33FAD305F5C1EE8F4F765FCF1175FABE1C0F9B6A471835C12D1D9774AD6D5ED66289B52BA9C0B312567BB23117882F446042972877693876707352033AC447995A7AD182CC0D3CB04F14752D2E47CDBA5A96583BA9C0B312567BB231DD303D21008E29813377AFFFEAFD269A417C69337E82CC2E827F84554CEF50127C277FBC8AE2E8BA83251EDC214901ED5E8D9A59859A8B613439FA09F3DCB32089D37D7C0E48F6C5571747095F342E88FB05168BE4CE3AF X-C1DE0DAB: 0D63561A33F958A5F2746FC46252E27A3C3ADCAD07981DDB1F0140155F609ADDD59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA751B940EDA0DFB0535410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D345C064E16D8ABF3359E7AFB81D517C2F1528B2A35865AC65A2433F02BB246FACE9F137305F15FD4C41D7E09C32AA3244CBF701BC990E73438EF1CA38FAA4DFE7A5595C85A795C7BAE729B2BEF169E0186 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2bioj9N286KAyvN4o/vvx5PjbaA== X-Mailru-Sender: 689FA8AB762F7393C37E3C1AEC41BA5DD48815DC39CA61B1169229ED2C9CB8E43841015FED1DE5223CC9A89AB576DD93FB559BB5D741EB963CF37A108A312F5C27E8A8C3839CE0E267EA787935ED9F1B X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH 15/20] net.box: rewrite request implementation in C X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladislav Shpilevoy via Tarantool-patches Reply-To: Vladislav Shpilevoy Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Thanks for the patch! In an early version I saw on the branch you tried to keep the discarded requests in the hash. It is not so anymore, correct? Now discarded requests are GCed even their response didn't arrive yet? (It should be so, but I decided to clarify just in case.) See 12 comments below. > diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c > index 122d69e9219e..044f7d337ca7 100644 > --- a/src/box/lua/net_box.c > +++ b/src/box/lua/net_box.c > @@ -76,6 +78,253 @@ enum netbox_method { > netbox_method_MAX > }; > > +struct netbox_registry { > + /* sync -> netbox_request */ 1. For out of function comments we usually use /**. This includes the comments for structs and their members. > + struct mh_i64ptr_t *requests; > +}; > + > +struct netbox_request { > + enum netbox_method method; > + /* > + * Unique identifier needed for matching the request with its response. > + * Used as a key in the registry. > + */ > + uint64_t sync; > + /* > + * The registry this request belongs to or NULL if the request has been > + * completed. > + */ > + struct netbox_registry *registry; > + /* Format used for decoding the response (ref incremented). */ > + struct tuple_format *format; > + /* Signaled when the response is received. */ > + struct fiber_cond cond; > + /* > + * A user-provided buffer to which the response body should be copied. > + * If NULL, the response will be decoded to Lua stack. > + */ > + struct ibuf *buffer; > + /* > + * Lua reference to the buffer. Used to prevent garbage collection in > + * case the user discards the request. > + */ > + int buffer_ref;> + /* > + * Whether to skip MessagePack map header and IPROTO_DATA key when > + * copying the response body to a user-provided buffer. Ignored if > + * buffer is not set. > + */ > + bool skip_header; > + /* Lua references to on_push trigger and its context. */ > + int on_push_ref; > + int on_push_ctx_ref; > + /* > + * Reference to the request result or LUA_NOREF if the response hasn't > + * been received yet. If the response was decoded to a user-provided > + * buffer, the result stores the length of the decoded data. 2. Do you mean result_ref stores the length? I can't find it in the code. It always stores a real Lua ref from what I see. > + */ > + int result_ref; > + /* > + * Error if the request failed (ref incremented). NULL on success or if > + * the response hasn't been received yet. > + */ > + struct error *error; > + 3. Extra empty line. > +}; > + > +static const char netbox_registry_typename[] = "net.box.registry"; > +static const char netbox_request_typename[] = "net.box.request"; > + > +static void > +netbox_request_create(struct netbox_request *request) > +{ > + request->method = netbox_method_MAX; > + request->sync = -1; > + request->registry = NULL; > + request->format = NULL; > + fiber_cond_create(&request->cond); > + request->buffer = NULL; > + request->buffer_ref = LUA_REFNIL; > + request->skip_header = false; > + request->on_push_ref = LUA_REFNIL; > + request->on_push_ctx_ref = LUA_REFNIL; > + request->result_ref = LUA_NOREF; > + request->error = NULL; > +} > + > +static void > +netbox_request_destroy(struct netbox_request *request) > +{ > + assert(request->registry == NULL); > + if (request->format != NULL) > + tuple_format_unref(request->format); > + fiber_cond_destroy(&request->cond); > + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->buffer_ref); > + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->on_push_ref); > + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->on_push_ctx_ref); > + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->result_ref); > + if (request->error != NULL) > + error_unref(request->error); > +} > + > +/* > + * Adds a request to a registry. There must not be a request with the same id > + * (sync) in the registry. Returns -1 if out of memory. > + */ > +static int > +netbox_request_register(struct netbox_request *request, > + struct netbox_registry *registry) > +{ > + assert(request->registry == NULL); > + struct mh_i64ptr_t *h = registry->requests; > + struct mh_i64ptr_node_t node = { request->sync, request }; > + struct mh_i64ptr_node_t *old_node = NULL; > + if (mh_i64ptr_put(h, &node, &old_node, NULL) == mh_end(h)) { > + diag_set(OutOfMemory, 0, "mhash", "netbox_registry"); > + return -1; > + } > + assert(old_node == NULL); > + request->registry = registry; > + return 0; > +} > + > +/* > + * Unregisters a previously registered request. Does nothing if the request has > + * already been unregistered or has never been registered. > + */ > +static void > +netbox_request_unregister(struct netbox_request *request) > +{ > + struct netbox_registry *registry = request->registry; > + if (registry == NULL) > + return; > + request->registry = NULL; > + struct mh_i64ptr_t *h = registry->requests; > + mh_int_t k = mh_i64ptr_find(h, request->sync, NULL); > + assert(k != mh_end(h)); > + assert(mh_i64ptr_node(h, k)->val == request); > + mh_i64ptr_del(h, k, NULL); > +} > + > +static bool > +netbox_request_is_ready(struct netbox_request *request) 4. The request can be made 'const'. > +{ > + return request->registry == NULL; > +} > + > +static void > +netbox_request_signal(struct netbox_request *request) > +{ > + fiber_cond_broadcast(&request->cond); > +} > + > +static void > +netbox_request_complete(struct netbox_request *request) > +{ > + netbox_request_unregister(request); > + netbox_request_signal(request); > +} > + > +/* > + * Waits on netbox_request::cond. Subtracts the wait time from the timeout. > + * Returns false on timeout or if the fiber was cancelled. > + */ > +static bool > +netbox_request_wait(struct netbox_request *request, double *timeout) > +{> + double ts = ev_monotonic_now(loop()); 5. fiber_clock() might be a little shorter (does the same). The same below. Although it is not inline. Up to you. Also see a related comment below. > + int rc = fiber_cond_wait_timeout(&request->cond, *timeout); > + *timeout -= ev_monotonic_now(loop()) - ts; > + return rc == 0; > +} > + > +static void 6. Have you tried making these 1-5 line functions explicitly inline? I remember with Mergen we saw some actual perf difference in SQL code when did so. Although that time the functions were in the header vs in a .c file. I see you used inline for some functions in this commit. Why not for the ones like these? <...> > + > +/* > + * Waits until the response is received for the given request and obtains the > + * result. Takes an optional timeout argument. > + * > + * See the comment to request.result() for the return value format. > + */ > +static int > +luaT_netbox_request_wait_result(struct lua_State *L) > +{ > + struct netbox_request *request = luaT_check_netbox_request(L, 1); > + double timeout = TIMEOUT_INFINITY; > + if (!lua_isnoneornil(L, 2)) { > + if (lua_type(L, 2) != LUA_TNUMBER || > + (timeout = lua_tonumber(L, 2)) < 0) > + luaL_error(L, "Usage: future:wait_result(timeout)"); > + } > + while (!netbox_request_is_ready(request)) { > + if (!netbox_request_wait(request, &timeout)) { > + luaL_testcancel(L); > + diag_set(ClientError, ER_TIMEOUT); 7. In some places you use box_error_raise(), in others the explicit diag_set(ClientError). Why? For instance: box_error_raise(ER_NO_CONNECTION, "%s", strerror(errno)); box_error_raise(ER_NO_CONNECTION, "Peer closed"); In others the raise is justified because you do not know the error code and its message at compile time there. In these 2 I probably do not know something? > + return luaT_push_nil_and_error(L); > + } > + } > + return netbox_request_push_result(request, L); > +} <...> > + > +static int > +luaT_netbox_request_pairs(struct lua_State *L) > +{ > + if (!lua_isnoneornil(L, 2)) { > + if (lua_type(L, 2) != LUA_TNUMBER || lua_tonumber(L, 2) < 0) > + luaL_error(L, "Usage: future:pairs(timeout)"); > + } else { > + if (lua_isnil(L, 2)) > + lua_pop(L, 1); > + lua_pushnumber(L, TIMEOUT_INFINITY); > + } > + lua_pushcclosure(L, luaT_netbox_request_iterator_next, 2); 8. Push of cfunctions, especially closures, on a regular basis might be expensive. Could you please try to make it a non-closure function and cache its reference like I showed in the proposal about request __gc? > + lua_pushnil(L); > + lua_pushinteger(L, 0); > + return 3; > +} <...> > + > +/* > + * Creates a request object (userdata) and pushes it to Lua stack. > + * > + * Takes the following arguments: > + * - requests: registry to register the new request with > + * - id: id (sync) to assign to the new request > + * - buffer: buffer (ibuf) to write the result to or nil > + * - skip_header: whether to skip header when writing the result to the buffer > + * - method: a value from the netbox_method enumeration > + * - on_push: on_push trigger function > + * - on_push_ctx: on_push trigger function argument > + * - format: tuple format to use for decoding the body or nil > + */ > +static int > +netbox_new_request(struct lua_State *L) > +{ > + struct netbox_request *request = lua_newuserdata(L, sizeof(*request)); 9. Does it help perf if the requests are allocated on mempool? > + netbox_request_create(request); 10. Below you override a lot of fields initialized in netbox_request_create(). Do you really need this create(), can you inline it without the unnecessary assignments? > + luaL_getmetatable(L, netbox_request_typename); > + lua_setmetatable(L, -2); > + struct netbox_registry *registry = luaT_check_netbox_registry(L, 1); > + request->sync = luaL_touint64(L, 2); > + request->buffer = (struct ibuf *) lua_topointer(L, 3); > + lua_pushvalue(L, 3); > + request->buffer_ref = luaL_ref(L, LUA_REGISTRYINDEX); > + request->skip_header = lua_toboolean(L, 4); > + request->method = lua_tointeger(L, 5); > + assert(request->method < netbox_method_MAX); > + lua_pushvalue(L, 6); > + request->on_push_ref = luaL_ref(L, LUA_REGISTRYINDEX); > + lua_pushvalue(L, 7); > + request->on_push_ctx_ref = luaL_ref(L, LUA_REGISTRYINDEX); > + if (!lua_isnil(L, 8)) > + request->format = lbox_check_tuple_format(L, 8); > + else > + request->format = tuple_format_runtime; > + tuple_format_ref(request->format); > + if (netbox_request_register(request, registry) != 0) > + luaT_error(L); > + return 1; > +} > diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua > index 0ad6cac022f2..4bc66940ea2a 100644 > --- a/src/box/lua/net_box.lua > +++ b/src/box/lua/net_box.lua > @@ -468,20 +304,8 @@ local function create_transport(host, port, user, password, callback, > local id = next_request_id > encode_method(method, send_buf, id, ...) > next_request_id = next_id(id) > - -- Request in most cases has maximum 10 members: > - -- method, buffer, skip_header, id, cond, errno, response, > - -- on_push, on_push_ctx and format. > - local request = setmetatable(table_new(0, 10), request_mt) > - request.method = method > - request.buffer = buffer > - request.skip_header = skip_header > - request.id = id > - request.cond = fiber.cond() > - requests[id] = request > - request.on_push = on_push > - request.on_push_ctx = on_push_ctx > - request.format = format > - return request > + return internal.new_request(requests, id, buffer, skip_header, method, > + on_push, on_push_ctx, format) 11. You might want to try to cache internal.new_request globally, use it without '.' operator, and re-check the benches. Might be nothing or not, but '.' is definitely not free. The same for dispatch_response_iproto, dispatch_response_console. > end > > -- > @@ -502,76 +326,13 @@ local function create_transport(host, port, user, password, callback, > > local function dispatch_response_iproto(hdr, body_rpos, body_end) > local id = hdr[IPROTO_SYNC_KEY] > - local request = requests[id] > - if request == nil then -- nobody is waiting for the response > - return > - end > local status = hdr[IPROTO_STATUS_KEY] > - local body_len = body_end - body_rpos > - > - if status > IPROTO_CHUNK_KEY then > - -- Handle errors > - requests[id] = nil > - request.id = nil > - request.errno = band(status, IPROTO_ERRNO_MASK) > - request.response = decode_error(body_rpos, request.errno) > - request.cond:broadcast() > - return > - end > - > - local buffer = request.buffer > - if buffer ~= nil then > - -- Copy xrow.body to user-provided buffer > - if request.skip_header then > - -- Skip {[IPROTO_DATA_KEY] = ...} wrapper. > - local map_len, key > - map_len, body_rpos = decode_map_header(body_rpos, body_len) > - assert(map_len == 1) > - key, body_rpos = decode(body_rpos) > - assert(key == IPROTO_DATA_KEY) > - body_len = body_end - body_rpos > - end > - local wpos = buffer:alloc(body_len) > - ffi.copy(wpos, body_rpos, body_len) > - body_len = tonumber(body_len) > - if status == IPROTO_OK_KEY then > - request.response = body_len > - requests[id] = nil > - request.id = nil > - else > - request.on_push(request.on_push_ctx, body_len) > - end > - request.cond:broadcast() > - return > - end > - > - local real_end > - -- Decode xrow.body[DATA] to Lua objects > - if status == IPROTO_OK_KEY then > - request.response, real_end = decode_method(request.method, > - body_rpos, body_end, > - request.format) > - assert(real_end == body_end, "invalid body length") > - requests[id] = nil > - request.id = nil > - else > - local msg > - msg, real_end, request.errno = decode_push(body_rpos, body_end) 12. decode_push is not unused. > - assert(real_end == body_end, "invalid body length") > - request.on_push(request.on_push_ctx, msg) > - end > - request.cond:broadcast() > + internal.dispatch_response_iproto(requests, id, status, > + body_rpos, body_end) > end > > local function dispatch_response_console(rid, response) > - local request = requests[rid] > - if request == nil then -- nobody is waiting for the response > - return > - end > - request.id = nil > - requests[rid] = nil > - request.response = response > - request.cond:broadcast() > + internal.dispatch_response_console(requests, rid, response) > end > > local function new_request_id() >