From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id AF7926EC56; Fri, 23 Jul 2021 14:14:03 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org AF7926EC56 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1627038843; bh=D7MZbI7VlcCfQOey9gGibshs8qAoEY8PmUsC19RiEyU=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To: From; b=awxfjxBw9OAuVde8cfZA1eUjmGcISfTnTbLUpMJuL84i4D0TU9d6a8X65Qcj8yGaA NXS4BlZ1333mVV866/pP6NF7KfNAk66gRvu5JTA99B336ZrPRtZedpdAvKqxLUhcO6 /t1S4+VBNVf9UEkRIPbs8rTo56MwlPlpL71uXeo4= Received: from smtpng1.i.mail.ru (smtpng1.i.mail.ru [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id D5E306F3D0 for ; Fri, 23 Jul 2021 14:07:48 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org D5E306F3D0 Received: by smtpng1.m.smailru.net with esmtpa (envelope-from ) id 1m6t1w-0004dl-BP; Fri, 23 Jul 2021 14:07:48 +0300 To: tarantool-patches@dev.tarantool.org Date: Fri, 23 Jul 2021 14:07:23 +0300 Message-Id: <5bc68e715031c04ebb022c70cf4ff27c0939598c.1627024646.git.vdavydov@tarantool.org> X-Mailer: git-send-email 2.25.1 In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-174C08C4: 5188C02AEC42908C481ED7ADC579193296BBA28369E3F2D2713F3D5F7D406D31BCF678C7329BA986 X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD941C43E597735A9C3FDAB68B812060C77E621B90589399EB5182A05F53808504015A8B35AEB750EF0B2A289B76062241694E67BB6024CE55770BB1854DA8DBB6C X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE752E71F0C64B7C834EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006374D78D7F7271F09E9EA1F7E6F0F101C6723150C8DA25C47586E58E00D9D99D84E1BDDB23E98D2D38BBCA57AF85F7723F2F9F8B63DB0CAC0E7C992E0E9592BE066CC7F00164DA146DAFE8445B8C89999728AA50765F7900637F6B57BC7E64490618DEB871D839B7333395957E7521B51C2DFABB839C843B9C08941B15DA834481F8AA50765F7900637F6B57BC7E6449061A352F6E88A58FB86F5D81C698A659EA7E827F84554CEF5019E625A9149C048EE9ECD01F8117BC8BEE2021AF6380DFAD18AA50765F790063735872C767BF85DA227C277FBC8AE2E8B041BD12FB6B4799375ECD9A6C639B01B4E70A05D1297E1BBCB5012B2E24CD356 X-C1DE0DAB: 8BD88D57C5CADBC8B2710865C386751094C72BDDC9A8ED5CA3B1A56EE2B804F6B226C914C9968946695E9D90444CEC264DCC8C77FBA9901322D2CEDE4E95CF1BDBE8DEE28BC9005C095FFBCAB1CFE8AABCA57AF85F7723F2F9F8B63DB0CAC0E7C992E0E9592BE066589120F7DAE46353205367B2BCC23E5BF27146890FA8CFC4BDAD6C7F3747799A X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D341C998A3771F04153ACD13D706691926A4C00DA21DF80377E576073BA0B8F34169D16B6651E6E14611D7E09C32AA3244C8812FC867045AF24CC5E63C18C28A6EF51E887DA02A9F7BF729B2BEF169E0186 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojbL9S8ysBdXiEX0g4jkpDtUKbBN4/54KW X-Mailru-Sender: 689FA8AB762F7393C37E3C1AEC41BA5DCDC4F0FD3CAFB35570E7C69B04076BEE274CEFED1673C562683ABF942079399BFB559BB5D741EB966A65DFF43FF7BE03240331F90058701C67EA787935ED9F1B X-Mras: Ok Subject: [Tarantool-patches] [PATCH 13/20] net.box: rewrite send_and_recv_{iproto, console} in C X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladimir Davydov via Tarantool-patches Reply-To: Vladimir Davydov Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Basically, this patch converts the above function line-by-line from Lua to C. This is a step towards rewriting performance-critical parts of net.box in C. Note, Lua code expects send_and_recv functions return errno, errmsg on error and nil, ... on success while it's more convenient to return an error object from C code so we do the conversion in Lua wrappers. --- src/box/lua/net_box.c | 155 ++++++++++++++++++++++++++++------------ src/box/lua/net_box.lua | 46 +++--------- 2 files changed, 121 insertions(+), 80 deletions(-) diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index e88db6323afa..12d82738a050 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -451,9 +451,9 @@ netbox_decode_greeting(lua_State *L) } /** - * communicate(fd, send_buf, recv_buf, limit_or_boundary, timeout) - * -> errno, error - * -> nil, limit/boundary_pos + * Reads data from the given socket until the limit or boundary is reached. + * Returns 0 and sets *limit_or_boundary_pos to limit/boundary_pos on success. + * On error returns -1 and sets diag. * * The need for this function arises from not wanting to * have more than one watcher for a single fd, and thus issue @@ -465,62 +465,45 @@ netbox_decode_greeting(lua_State *L) * interaction. */ static int -netbox_communicate(lua_State *L) +netbox_communicate(int fd, struct ibuf *send_buf, struct ibuf *recv_buf, + size_t limit, const void *boundary, size_t boundary_len, + double timeout, size_t *limit_or_boundary_pos) { - uint32_t fd = lua_tonumber(L, 1); const int NETBOX_READAHEAD = 16320; - struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2); - struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 3); - - /* limit or boundary */ - size_t limit = SIZE_MAX; - const void *boundary = NULL; - size_t boundary_len; - - if (lua_type(L, 4) == LUA_TSTRING) - boundary = lua_tolstring(L, 4, &boundary_len); - else - limit = lua_tonumber(L, 4); - - /* timeout */ - ev_tstamp timeout = TIMEOUT_INFINITY; - if (lua_type(L, 5) == LUA_TNUMBER) - timeout = lua_tonumber(L, 5); if (timeout < 0) { - lua_pushinteger(L, ER_TIMEOUT); - lua_pushstring(L, "Timeout exceeded"); - return 2; + diag_set(ClientError, ER_TIMEOUT); + return -1; } int revents = COIO_READ; while (true) { /* reader serviced first */ check_limit: if (ibuf_used(recv_buf) >= limit) { - lua_pushnil(L); - lua_pushinteger(L, (lua_Integer)limit); - return 2; + *limit_or_boundary_pos = limit; + return 0; } const char *p; if (boundary != NULL && (p = memmem( recv_buf->rpos, ibuf_used(recv_buf), boundary, boundary_len)) != NULL) { - lua_pushnil(L); - lua_pushinteger(L, (lua_Integer)( - p - recv_buf->rpos)); - return 2; + *limit_or_boundary_pos = p - recv_buf->rpos; + return 0; } while (revents & COIO_READ) { void *p = ibuf_reserve(recv_buf, NETBOX_READAHEAD); - if (p == NULL) - luaL_error(L, "out of memory"); + if (p == NULL) { + diag_set(OutOfMemory, NETBOX_READAHEAD, + "ibuf", "recv_buf"); + return -1; + } ssize_t rc = recv( fd, recv_buf->wpos, ibuf_unused(recv_buf), 0); if (rc == 0) { - lua_pushinteger(L, ER_NO_CONNECTION); - lua_pushstring(L, "Peer closed"); - return 2; + box_error_raise(ER_NO_CONNECTION, + "Peer closed"); + return -1; } if (rc > 0) { recv_buf->wpos += rc; goto check_limit; @@ -545,19 +528,100 @@ check_limit: ERROR_INJECT_YIELD(ERRINJ_NETBOX_IO_DELAY); revents = coio_wait(fd, EV_READ | (ibuf_used(send_buf) != 0 ? EV_WRITE : 0), timeout); - luaL_testcancel(L); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + return -1; + } timeout = deadline - ev_monotonic_now(loop()); timeout = MAX(0.0, timeout); if (revents == 0 && timeout == 0.0) { - lua_pushinteger(L, ER_TIMEOUT); - lua_pushstring(L, "Timeout exceeded"); - return 2; + diag_set(ClientError, ER_TIMEOUT); + return -1; } } handle_error: - lua_pushinteger(L, ER_NO_CONNECTION); - lua_pushstring(L, strerror(errno)); - return 2; + box_error_raise(ER_NO_CONNECTION, "%s", strerror(errno)); + return -1; +} + +/* + * Sends and receives data over an iproto connection. + * Takes socket fd, send_buf (ibuf), recv_buf (ibuf), timeout. + * On success returns header (table), body_rpos (char *), body_end (char *). + * On error returns nil, error. + */ +static int +netbox_send_and_recv_iproto(lua_State *L) +{ + int fd = lua_tointeger(L, 1); + struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2); + struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 3); + double timeout = (!lua_isnoneornil(L, 4) ? + lua_tonumber(L, 4) : TIMEOUT_INFINITY); + while (true) { + size_t required; + size_t data_len = ibuf_used(recv_buf); + size_t fixheader_size = mp_sizeof_uint(UINT32_MAX); + if (data_len < fixheader_size) { + required = fixheader_size; + } else { + /* PWN! insufficient input validation */ + const char *bufpos = recv_buf->rpos; + const char *rpos = bufpos; + size_t len = mp_decode_uint(&rpos); + required = (rpos - bufpos) + len; + if (data_len >= required) { + const char *body_end = rpos + len; + const char *body_rpos = rpos; + luamp_decode(L, cfg, &body_rpos); + *(const char **)luaL_pushcdata( + L, CTID_CONST_CHAR_PTR) = body_rpos; + *(const char **)luaL_pushcdata( + L, CTID_CONST_CHAR_PTR) = body_end; + recv_buf->rpos = (char *)body_end; + return 3; + } + } + size_t unused; + double deadline = fiber_clock() + timeout; + if (netbox_communicate(fd, send_buf, recv_buf, + /*limit=*/required, + /*boundary=*/NULL, + /*boundary_len=*/0, + timeout, &unused) != 0) { + luaL_testcancel(L); + return luaT_push_nil_and_error(L); + } + timeout = deadline - fiber_clock(); + } +} + +/* + * Sends and receives data over a console connection. + * Takes socket fd, send_buf (ibuf), recv_buf (ibuf), timeout. + * On success returns response (string). + * On error returns nil, error. + */ +static int +netbox_send_and_recv_console(lua_State *L) +{ + int fd = lua_tointeger(L, 1); + struct ibuf *send_buf = (struct ibuf *) lua_topointer(L, 2); + struct ibuf *recv_buf = (struct ibuf *) lua_topointer(L, 3); + double timeout = (!lua_isnoneornil(L, 4) ? + lua_tonumber(L, 4) : TIMEOUT_INFINITY); + const char delim[] = "\n...\n"; + size_t delim_len = sizeof(delim) - 1; + size_t delim_pos; + if (netbox_communicate(fd, send_buf, recv_buf, /*limit=*/SIZE_MAX, + delim, delim_len, timeout, &delim_pos) != 0) { + + luaL_testcancel(L); + return luaT_push_nil_and_error(L); + } + lua_pushlstring(L, recv_buf->rpos, delim_pos + delim_len); + recv_buf->rpos += delim_pos + delim_len; + return 1; } static void @@ -1150,7 +1214,8 @@ luaopen_net_box(struct lua_State *L) { "decode_greeting",netbox_decode_greeting }, { "decode_method", netbox_decode_method }, { "decode_error", netbox_decode_error }, - { "communicate", netbox_communicate }, + { "send_and_recv_iproto", netbox_send_and_recv_iproto }, + { "send_and_recv_console", netbox_send_and_recv_console }, { NULL, NULL} }; /* luaL_register_module polutes _G */ diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index d7394b088752..0ad6cac022f2 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -22,7 +22,6 @@ local check_index_arg = box.internal.check_index_arg local check_space_arg = box.internal.check_space_arg local check_primary_index = box.internal.check_primary_index -local communicate = internal.communicate local encode_auth = internal.encode_auth local encode_method = internal.encode_method local decode_greeting = internal.decode_greeting @@ -582,46 +581,23 @@ local function create_transport(host, port, user, password, callback, end -- IO (WORKER FIBER) -- - local function send_and_recv(limit_or_boundary, timeout) - return communicate(connection:fd(), send_buf, recv_buf, - limit_or_boundary, timeout) - end - local function send_and_recv_iproto(timeout) - local data_len = recv_buf.wpos - recv_buf.rpos - local required - if data_len < 5 then - required = 5 - else - -- PWN! insufficient input validation - local bufpos = recv_buf.rpos - local len, rpos = decode(bufpos) - required = (rpos - bufpos) + len - if data_len >= required then - local body_end = rpos + len - local hdr, body_rpos = decode(rpos) - recv_buf.rpos = body_end - return nil, hdr, body_rpos, body_end - end + local hdr, body_rpos, body_end = internal.send_and_recv_iproto( + connection:fd(), send_buf, recv_buf, timeout) + if not hdr then + local err = body_rpos + return err.code, err.message end - local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY) - local err, extra = send_and_recv(required, timeout) - if err then - return err, extra - end - return send_and_recv_iproto(max(0, deadline - fiber_clock())) + return nil, hdr, body_rpos, body_end end local function send_and_recv_console(timeout) - local delim = '\n...\n' - local err, delim_pos = send_and_recv(delim, timeout) - if err then - return err, delim_pos - else - local response = ffi.string(recv_buf.rpos, delim_pos + #delim) - recv_buf.rpos = recv_buf.rpos + delim_pos + #delim - return nil, response + local response, err = internal.send_and_recv_console( + connection:fd(), send_buf, recv_buf, timeout) + if not response then + return err.code, err.message end + return nil, response end -- PROTOCOL STATE MACHINE (WORKER FIBER) -- -- 2.25.1