[Tarantool-patches] [PATCH 15/20] net.box: rewrite request implementation in C

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Aug 3 00:54:43 MSK 2021


Thanks for the patch!

In an early version I saw on the branch you tried to keep the
discarded requests in the hash. It is not so anymore, correct?
Now discarded requests are GCed even their response didn't arrive
yet? (It should be so, but I decided to clarify just in case.)

See 12 comments below.

> diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
> index 122d69e9219e..044f7d337ca7 100644
> --- a/src/box/lua/net_box.c
> +++ b/src/box/lua/net_box.c
> @@ -76,6 +78,253 @@ enum netbox_method {
>  	netbox_method_MAX
>  };
>  
> +struct netbox_registry {
> +	/* sync -> netbox_request */

1. For out of function comments we usually use /**. This includes the
comments for structs and their members.

> +	struct mh_i64ptr_t *requests;
> +};
> +
> +struct netbox_request {
> +	enum netbox_method method;
> +	/*
> +	 * Unique identifier needed for matching the request with its response.
> +	 * Used as a key in the registry.
> +	 */
> +	uint64_t sync;
> +	/*
> +	 * The registry this request belongs to or NULL if the request has been
> +	 * completed.
> +	 */
> +	struct netbox_registry *registry;
> +	/* Format used for decoding the response (ref incremented). */
> +	struct tuple_format *format;
> +	/* Signaled when the response is received. */
> +	struct fiber_cond cond;
> +	/*
> +	 * A user-provided buffer to which the response body should be copied.
> +	 * If NULL, the response will be decoded to Lua stack.
> +	 */
> +	struct ibuf *buffer;
> +	/*
> +	 * Lua reference to the buffer. Used to prevent garbage collection in
> +	 * case the user discards the request.
> +	 */
> +	int buffer_ref;> +	/*
> +	 * Whether to skip MessagePack map header and IPROTO_DATA key when
> +	 * copying the response body to a user-provided buffer. Ignored if
> +	 * buffer is not set.
> +	 */
> +	bool skip_header;
> +	/* Lua references to on_push trigger and its context. */
> +	int on_push_ref;
> +	int on_push_ctx_ref;
> +	/*
> +	 * Reference to the request result or LUA_NOREF if the response hasn't
> +	 * been received yet. If the response was decoded to a user-provided
> +	 * buffer, the result stores the length of the decoded data.

2. Do you mean result_ref stores the length? I can't find it in the code.
It always stores a real Lua ref from what I see.

> +	 */
> +	int result_ref;
> +	/*
> +	 * Error if the request failed (ref incremented). NULL on success or if
> +	 * the response hasn't been received yet.
> +	 */
> +	struct error *error;
> +

3. Extra empty line.

> +};
> +
> +static const char netbox_registry_typename[] = "net.box.registry";
> +static const char netbox_request_typename[] = "net.box.request";
> +
> +static void
> +netbox_request_create(struct netbox_request *request)
> +{
> +	request->method = netbox_method_MAX;
> +	request->sync = -1;
> +	request->registry = NULL;
> +	request->format = NULL;
> +	fiber_cond_create(&request->cond);
> +	request->buffer = NULL;
> +	request->buffer_ref = LUA_REFNIL;
> +	request->skip_header = false;
> +	request->on_push_ref = LUA_REFNIL;
> +	request->on_push_ctx_ref = LUA_REFNIL;
> +	request->result_ref = LUA_NOREF;
> +	request->error = NULL;
> +}
> +
> +static void
> +netbox_request_destroy(struct netbox_request *request)
> +{
> +	assert(request->registry == NULL);
> +	if (request->format != NULL)
> +		tuple_format_unref(request->format);
> +	fiber_cond_destroy(&request->cond);
> +	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->buffer_ref);
> +	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->on_push_ref);
> +	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->on_push_ctx_ref);
> +	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, request->result_ref);
> +	if (request->error != NULL)
> +		error_unref(request->error);
> +}
> +
> +/*
> + * Adds a request to a registry. There must not be a request with the same id
> + * (sync) in the registry. Returns -1 if out of memory.
> + */
> +static int
> +netbox_request_register(struct netbox_request *request,
> +			struct netbox_registry *registry)
> +{
> +	assert(request->registry == NULL);
> +	struct mh_i64ptr_t *h = registry->requests;
> +	struct mh_i64ptr_node_t node = { request->sync, request };
> +	struct mh_i64ptr_node_t *old_node = NULL;
> +	if (mh_i64ptr_put(h, &node, &old_node, NULL) == mh_end(h)) {
> +		diag_set(OutOfMemory, 0, "mhash", "netbox_registry");
> +		return -1;
> +	}
> +	assert(old_node == NULL);
> +	request->registry = registry;
> +	return 0;
> +}
> +
> +/*
> + * Unregisters a previously registered request. Does nothing if the request has
> + * already been unregistered or has never been registered.
> + */
> +static void
> +netbox_request_unregister(struct netbox_request *request)
> +{
> +	struct netbox_registry *registry = request->registry;
> +	if (registry == NULL)
> +		return;
> +	request->registry = NULL;
> +	struct mh_i64ptr_t *h = registry->requests;
> +	mh_int_t k = mh_i64ptr_find(h, request->sync, NULL);
> +	assert(k != mh_end(h));
> +	assert(mh_i64ptr_node(h, k)->val == request);
> +	mh_i64ptr_del(h, k, NULL);
> +}
> +
> +static bool
> +netbox_request_is_ready(struct netbox_request *request)

4. The request can be made 'const'.

> +{
> +	return request->registry == NULL;
> +}
> +
> +static void
> +netbox_request_signal(struct netbox_request *request)
> +{
> +	fiber_cond_broadcast(&request->cond);
> +}
> +
> +static void
> +netbox_request_complete(struct netbox_request *request)
> +{
> +	netbox_request_unregister(request);
> +	netbox_request_signal(request);
> +}
> +
> +/*
> + * Waits on netbox_request::cond. Subtracts the wait time from the timeout.
> + * Returns false on timeout or if the fiber was cancelled.
> + */
> +static bool
> +netbox_request_wait(struct netbox_request *request, double *timeout)
> +{> +	double ts = ev_monotonic_now(loop());

5. fiber_clock() might be a little shorter (does the same). The same below.
Although it is not inline. Up to you. Also see a related comment below.

> +	int rc = fiber_cond_wait_timeout(&request->cond, *timeout);
> +	*timeout -= ev_monotonic_now(loop()) - ts;
> +	return rc == 0;
> +}
> +
> +static void

6. Have you tried making these 1-5 line functions explicitly inline?
I remember with Mergen we saw some actual perf difference in SQL code
when did so. Although that time the functions were in the header vs in
a .c file.

I see you used inline for some functions in this commit. Why not for
the ones like these?

<...>

> +
> +/*
> + * Waits until the response is received for the given request and obtains the
> + * result. Takes an optional timeout argument.
> + *
> + * See the comment to request.result() for the return value format.
> + */
> +static int
> +luaT_netbox_request_wait_result(struct lua_State *L)
> +{
> +	struct netbox_request *request = luaT_check_netbox_request(L, 1);
> +	double timeout = TIMEOUT_INFINITY;
> +	if (!lua_isnoneornil(L, 2)) {
> +		if (lua_type(L, 2) != LUA_TNUMBER ||
> +		    (timeout = lua_tonumber(L, 2)) < 0)
> +			luaL_error(L, "Usage: future:wait_result(timeout)");
> +	}
> +	while (!netbox_request_is_ready(request)) {
> +		if (!netbox_request_wait(request, &timeout)) {
> +			luaL_testcancel(L);
> +			diag_set(ClientError, ER_TIMEOUT);

7. In some places you use box_error_raise(), in others the
explicit diag_set(ClientError). Why? For instance:

	box_error_raise(ER_NO_CONNECTION, "%s", strerror(errno));
	box_error_raise(ER_NO_CONNECTION, "Peer closed");

In others the raise is justified because you do not know the
error code and its message at compile time there. In these 2
I probably do not know something?

> +			return luaT_push_nil_and_error(L);
> +		}
> +	}
> +	return netbox_request_push_result(request, L);
> +}

<...>

> +
> +static int
> +luaT_netbox_request_pairs(struct lua_State *L)
> +{
> +	if (!lua_isnoneornil(L, 2)) {
> +		if (lua_type(L, 2) != LUA_TNUMBER || lua_tonumber(L, 2) < 0)
> +			luaL_error(L, "Usage: future:pairs(timeout)");
> +	} else {
> +		if (lua_isnil(L, 2))
> +			lua_pop(L, 1);
> +		lua_pushnumber(L, TIMEOUT_INFINITY);
> +	}
> +	lua_pushcclosure(L, luaT_netbox_request_iterator_next, 2);

8. Push of cfunctions, especially closures, on a regular basis might
be expensive. Could you please try to make it a non-closure function
and cache its reference like I showed in the proposal about request __gc?

> +	lua_pushnil(L);
> +	lua_pushinteger(L, 0);
> +	return 3;
> +}

<...>

> +
> +/*
> + * Creates a request object (userdata) and pushes it to Lua stack.
> + *
> + * Takes the following arguments:
> + *  - requests: registry to register the new request with
> + *  - id: id (sync) to assign to the new request
> + *  - buffer: buffer (ibuf) to write the result to or nil
> + *  - skip_header: whether to skip header when writing the result to the buffer
> + *  - method: a value from the netbox_method enumeration
> + *  - on_push: on_push trigger function
> + *  - on_push_ctx: on_push trigger function argument
> + *  - format: tuple format to use for decoding the body or nil
> + */
> +static int
> +netbox_new_request(struct lua_State *L)
> +{
> +	struct netbox_request *request = lua_newuserdata(L, sizeof(*request));

9. Does it help perf if the requests are allocated on mempool?

> +	netbox_request_create(request);

10. Below you override a lot of fields initialized in
netbox_request_create(). Do you really need this create(), can you
inline it without the unnecessary assignments?

> +	luaL_getmetatable(L, netbox_request_typename);
> +	lua_setmetatable(L, -2);
> +	struct netbox_registry *registry = luaT_check_netbox_registry(L, 1);
> +	request->sync = luaL_touint64(L, 2);
> +	request->buffer = (struct ibuf *) lua_topointer(L, 3);
> +	lua_pushvalue(L, 3);
> +	request->buffer_ref = luaL_ref(L, LUA_REGISTRYINDEX);
> +	request->skip_header = lua_toboolean(L, 4);
> +	request->method = lua_tointeger(L, 5);
> +	assert(request->method < netbox_method_MAX);
> +	lua_pushvalue(L, 6);
> +	request->on_push_ref = luaL_ref(L, LUA_REGISTRYINDEX);
> +	lua_pushvalue(L, 7);
> +	request->on_push_ctx_ref = luaL_ref(L, LUA_REGISTRYINDEX);
> +	if (!lua_isnil(L, 8))
> +		request->format = lbox_check_tuple_format(L, 8);
> +	else
> +		request->format = tuple_format_runtime;
> +	tuple_format_ref(request->format);
> +	if (netbox_request_register(request, registry) != 0)
> +		luaT_error(L);
> +	return 1;
> +}
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 0ad6cac022f2..4bc66940ea2a 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -468,20 +304,8 @@ local function create_transport(host, port, user, password, callback,
>          local id = next_request_id
>          encode_method(method, send_buf, id, ...)
>          next_request_id = next_id(id)
> -        -- Request in most cases has maximum 10 members:
> -        -- method, buffer, skip_header, id, cond, errno, response,
> -        -- on_push, on_push_ctx and format.
> -        local request = setmetatable(table_new(0, 10), request_mt)
> -        request.method = method
> -        request.buffer = buffer
> -        request.skip_header = skip_header
> -        request.id = id
> -        request.cond = fiber.cond()
> -        requests[id] = request
> -        request.on_push = on_push
> -        request.on_push_ctx = on_push_ctx
> -        request.format = format
> -        return request
> +        return internal.new_request(requests, id, buffer, skip_header, method,
> +                                    on_push, on_push_ctx, format)

11. You might want to try to cache internal.new_request globally,
use it without '.' operator, and re-check the benches. Might be nothing
or not, but '.' is definitely not free. The same for
dispatch_response_iproto, dispatch_response_console.

>      end
>  
>      --
> @@ -502,76 +326,13 @@ local function create_transport(host, port, user, password, callback,
>  
>      local function dispatch_response_iproto(hdr, body_rpos, body_end)
>          local id = hdr[IPROTO_SYNC_KEY]
> -        local request = requests[id]
> -        if request == nil then -- nobody is waiting for the response
> -            return
> -        end
>          local status = hdr[IPROTO_STATUS_KEY]
> -        local body_len = body_end - body_rpos
> -
> -        if status > IPROTO_CHUNK_KEY then
> -            -- Handle errors
> -            requests[id] = nil
> -            request.id = nil
> -            request.errno = band(status, IPROTO_ERRNO_MASK)
> -            request.response = decode_error(body_rpos, request.errno)
> -            request.cond:broadcast()
> -            return
> -        end
> -
> -        local buffer = request.buffer
> -        if buffer ~= nil then
> -            -- Copy xrow.body to user-provided buffer
> -            if request.skip_header then
> -                -- Skip {[IPROTO_DATA_KEY] = ...} wrapper.
> -                local map_len, key
> -                map_len, body_rpos = decode_map_header(body_rpos, body_len)
> -                assert(map_len == 1)
> -                key, body_rpos = decode(body_rpos)
> -                assert(key == IPROTO_DATA_KEY)
> -                body_len = body_end - body_rpos
> -            end
> -            local wpos = buffer:alloc(body_len)
> -            ffi.copy(wpos, body_rpos, body_len)
> -            body_len = tonumber(body_len)
> -            if status == IPROTO_OK_KEY then
> -                request.response = body_len
> -                requests[id] = nil
> -                request.id = nil
> -            else
> -                request.on_push(request.on_push_ctx, body_len)
> -            end
> -            request.cond:broadcast()
> -            return
> -        end
> -
> -        local real_end
> -        -- Decode xrow.body[DATA] to Lua objects
> -        if status == IPROTO_OK_KEY then
> -            request.response, real_end = decode_method(request.method,
> -                                                       body_rpos, body_end,
> -                                                       request.format)
> -            assert(real_end == body_end, "invalid body length")
> -            requests[id] = nil
> -            request.id = nil
> -        else
> -            local msg
> -            msg, real_end, request.errno = decode_push(body_rpos, body_end)

12. decode_push is not unused.

> -            assert(real_end == body_end, "invalid body length")
> -            request.on_push(request.on_push_ctx, msg)
> -        end
> -        request.cond:broadcast()
> +        internal.dispatch_response_iproto(requests, id, status,
> +                                          body_rpos, body_end)
>      end
>  
>      local function dispatch_response_console(rid, response)
> -        local request = requests[rid]
> -        if request == nil then -- nobody is waiting for the response
> -            return
> -        end
> -        request.id = nil
> -        requests[rid] = nil
> -        request.response = response
> -        request.cond:broadcast()
> +        internal.dispatch_response_console(requests, rid, response)
>      end
>  
>      local function new_request_id()
> 


More information about the Tarantool-patches mailing list