[Tarantool-patches] [PATCH 2/3] merger: fix NULL dereference when called via iproto

Alexander Turenko alexander.turenko at tarantool.org
Mon Jun 1 21:10:12 MSK 2020


A merge source API is designed to be quite abstract: the base structure
and virtual methods do not depend on Lua anyhow. Each source should
implement next() and destroy() virtual methods, which may be called from
C without a Lua state. This design allows to use any source as from C as
well as from Lua. The Lua API is based on the C API and supports any
source. Even merger itself is implemented in pure C according to the
merge source API and so may be used from Lua.

A particular source implementation may use a Lua state internally, but
it is not part of the API and should be hid under hood. In fact all
sources we have now (except merger itself) store some references in
LUA_REGISTRYINDEX and need a temporary Lua stack to work with them in
the next() virtual method.

Before this patch, the sources ('buffer', 'table', 'tuple') assume that
a Lua state always exists in the fiber storage of a fiber, where next()
is called. This looks valid on the first glance, because it may be
called either from a Lua code or from merger, which in turn is called
from a Lua code. However background fibers (they serve binary protocol
requests) do not store a Lua state in the fiber storage even for Lua
call / eval requests.

Possible solution would be always store a Lua state in a fiber storage.
There are two reasons why it is not implemented here:

1. There should be a decision about right balance between speed and
   memory footprint and maybe some eviction strategy for cached Lua
   states. Don't sure we can just always store a state in each
   background fiber. It would be wasteful for instances that serve box
   DQL / DML, SQL and/or C procedure calls.
2. Technically contract of the next() method would assume that a Lua
   state should exist in a fiber storage. Such requirement looks quite
   unnatural for a C API and also looks fragile: what if we'll implement
   some less wasteful Lua state caching strategy and the assumption
   about presence of the Lua state will get broken?

Obviously, next() will spend extra time to create a temporary state when
it is called from a background fiber. We should reuse existing Lua state
at least when a Lua call is performed via a binary protocol. I consider
it as the optimization and will solve in the next commit.

A few words about the implementation. I have added three functions,
which acquire a temporary Lua state, call a function and release the
state. It may be squashed into one function that would accept a function
pointer and variable number of arguments. However GCC does not
devirtualize such calls at -O2 level, so it seems it is better to avoid
this. It maybe possible to write some weird macro that will technically
reduce code duplication, but I prefer to write in C, not some macro
based meta-language.

Fixes #4954
---
 src/box/lua/merger.c                          | 153 +++++++++++++++---
 .../gh-4954-merger-via-net-box.test.lua       | 129 +++++++++++++++
 2 files changed, 261 insertions(+), 21 deletions(-)
 create mode 100755 test/box-tap/gh-4954-merger-via-net-box.test.lua

diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c
index 16814c041..25df18442 100644
--- a/src/box/lua/merger.c
+++ b/src/box/lua/merger.c
@@ -149,6 +149,68 @@ luaT_gettuple(struct lua_State *L, int idx, struct tuple_format *format)
 	return tuple;
 }
 
+/**
+ * Get a temporary Lua state.
+ *
+ * Use case: a function does not accept a Lua state as an argument
+ * to allow using from C code, but uses a Lua value, which is
+ * referenced in LUA_REGISTRYINDEX. A temporary Lua stack is needed
+ * to get and process the value.
+ *
+ * The returned state shares LUA_REGISTRYINDEX with `tarantool_L`.
+ *
+ * This Lua state should be used only from one fiber: otherwise
+ * one fiber may change the stack and another one will access a
+ * wrong stack slot when it will be scheduled for execution after
+ * yield.
+ *
+ * Return a Lua state on success and set @a coro_ref. This
+ * reference should be passed to `luaT_release_temp_luastate()`,
+ * when the state is not needed anymore.
+ *
+ * Return NULL and set a diag at failure.
+ */
+static struct lua_State *
+luaT_temp_luastate(int *coro_ref)
+{
+	if (fiber()->storage.lua.stack != NULL) {
+		*coro_ref = LUA_REFNIL;
+		return fiber()->storage.lua.stack;
+	}
+
+	/*
+	 * luaT_newthread() pops the new Lua state from
+	 * tarantool_L and it is right thing to do: if we'll push
+	 * something to it and yield, then another fiber will not
+	 * know that a stack top is changed and may operate on a
+	 * wrong slot.
+	 *
+	 * Second, many requests that push a value to tarantool_L
+	 * and yield may exhaust available slots on the stack.
+	 */
+	struct lua_State *L = luaT_newthread(tarantool_L);
+	if (L == NULL)
+		return NULL;
+	/*
+	 * The new state is not referenced from anywhere (reasons
+	 * are above), so we should keep a reference to it in the
+	 * registry while it is in use.
+	 */
+	*coro_ref = luaL_ref(tarantool_L, LUA_REGISTRYINDEX);
+	return L;
+}
+
+/**
+ * Release a temporary Lua state.
+ *
+ * It is the other half of `luaT_temp_luastate()`.
+ */
+static void
+luaT_release_temp_luastate(int coro_ref)
+{
+	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, coro_ref);
+}
+
 /* }}} */
 
 /* {{{ Create, destroy structures from Lua */
@@ -396,16 +458,12 @@ luaL_merge_source_buffer_new(struct lua_State *L)
 }
 
 /**
- * Call a user provided function to get a next data chunk (a
- * buffer).
- *
- * Return 1 when a new buffer is received, 0 when a buffers
- * iterator ends and -1 at error and set a diag.
+ * Helper for `merge_source_buffer_fetch()`.
  */
 static int
-merge_source_buffer_fetch(struct merge_source_buffer *source)
+luaL_merge_source_buffer_fetch_impl(struct lua_State *L,
+				    struct merge_source_buffer *source)
 {
-	struct lua_State *L = fiber()->storage.lua.stack;
 	int nresult = luaL_iterator_next(L, source->fetch_it);
 
 	/* Handle a Lua error in a gen function. */
@@ -446,6 +504,25 @@ merge_source_buffer_fetch(struct merge_source_buffer *source)
 	return 1;
 }
 
+/**
+ * Call a user provided function to get a next data chunk (a
+ * buffer).
+ *
+ * Return 1 when a new buffer is received, 0 when a buffers
+ * iterator ends and -1 at error and set a diag.
+ */
+static int
+merge_source_buffer_fetch(struct merge_source_buffer *source)
+{
+	int coro_ref = LUA_REFNIL;
+	struct lua_State *L = luaT_temp_luastate(&coro_ref);
+	if (L == NULL)
+		return -1;
+	int rc = luaL_merge_source_buffer_fetch_impl(L, source);
+	luaT_release_temp_luastate(coro_ref);
+	return rc;
+}
+
 /* Virtual methods */
 
 /**
@@ -655,16 +732,14 @@ merge_source_table_destroy(struct merge_source *base)
 }
 
 /**
- * next() virtual method implementation for a table source.
- *
- * @see struct merge_source_vtab
+ * Helper for `merge_source_table_next()`.
  */
 static int
-merge_source_table_next(struct merge_source *base,
-			struct tuple_format *format,
-			struct tuple **out)
+luaL_merge_source_table_next_impl(struct lua_State *L,
+				  struct merge_source *base,
+				  struct tuple_format *format,
+				  struct tuple **out)
 {
-	struct lua_State *L = fiber()->storage.lua.stack;
 	struct merge_source_table *source = container_of(base,
 		struct merge_source_table, base);
 
@@ -707,6 +782,25 @@ merge_source_table_next(struct merge_source *base,
 	return 0;
 }
 
+/**
+ * next() virtual method implementation for a table source.
+ *
+ * @see struct merge_source_vtab
+ */
+static int
+merge_source_table_next(struct merge_source *base,
+			     struct tuple_format *format,
+			     struct tuple **out)
+{
+	int coro_ref = LUA_REFNIL;
+	struct lua_State *L = luaT_temp_luastate(&coro_ref);
+	if (L == NULL)
+		return -1;
+	int rc = luaL_merge_source_table_next_impl(L, base, format, out);
+	luaT_release_temp_luastate(coro_ref);
+	return rc;
+}
+
 /* Lua functions */
 
 /**
@@ -830,16 +924,14 @@ merge_source_tuple_destroy(struct merge_source *base)
 }
 
 /**
- * next() virtual method implementation for a tuple source.
- *
- * @see struct merge_source_vtab
+ * Helper for `merge_source_tuple_next()`.
  */
 static int
-merge_source_tuple_next(struct merge_source *base,
-			struct tuple_format *format,
-			struct tuple **out)
+luaL_merge_source_tuple_next_impl(struct lua_State *L,
+				  struct merge_source *base,
+				  struct tuple_format *format,
+				  struct tuple **out)
 {
-	struct lua_State *L = fiber()->storage.lua.stack;
 	struct merge_source_tuple *source = container_of(base,
 		struct merge_source_tuple, base);
 
@@ -864,6 +956,25 @@ merge_source_tuple_next(struct merge_source *base,
 	return 0;
 }
 
+/**
+ * next() virtual method implementation for a tuple source.
+ *
+ * @see struct merge_source_vtab
+ */
+static int
+merge_source_tuple_next(struct merge_source *base,
+			struct tuple_format *format,
+			struct tuple **out)
+{
+	int coro_ref = LUA_REFNIL;
+	struct lua_State *L = luaT_temp_luastate(&coro_ref);
+	if (L == NULL)
+		return -1;
+	int rc = luaL_merge_source_tuple_next_impl(L, base, format, out);
+	luaT_release_temp_luastate(coro_ref);
+	return rc;
+}
+
 /* Lua functions */
 
 /**
diff --git a/test/box-tap/gh-4954-merger-via-net-box.test.lua b/test/box-tap/gh-4954-merger-via-net-box.test.lua
new file mode 100755
index 000000000..e2bd6f8b9
--- /dev/null
+++ b/test/box-tap/gh-4954-merger-via-net-box.test.lua
@@ -0,0 +1,129 @@
+#!/usr/bin/env tarantool
+
+local merger_lib = require('merger')
+local buffer = require('buffer')
+local msgpack = require('msgpack')
+local net_box = require('net.box')
+local fiber = require('fiber')
+local tap = require('tap')
+
+
+-- {{{ Helpers
+
+-- Lua iterator generator function to iterate over an array.
+local function array_next(arr, idx)
+    idx = idx or 1
+    local item = arr[idx]
+    if item == nil then
+        return
+    end
+    return idx + 1, item
+end
+
+-- Lua iterator generator to iterate over an array with yields.
+local function array_yield_next(arr, idx)
+    fiber.sleep(0)
+    return array_next(arr, idx)
+end
+
+-- }}}
+
+-- {{{ Code that is run in a background fiber (via net.box)
+
+local function use_table_source(tuples)
+    local source = merger_lib.new_source_fromtable(tuples)
+    return source:select()
+end
+_G.use_table_source = use_table_source
+
+local function use_buffer_source(tuples)
+    local buf = buffer.ibuf()
+    msgpack.encode(tuples, buf)
+    local source = merger_lib.new_source_frombuffer(buf)
+    return source:select()
+end
+_G.use_buffer_source = use_buffer_source
+
+local function use_tuple_source(tuples)
+    local source = merger_lib.new_tuple_source(array_next, tuples)
+    return source:select()
+end
+_G.use_tuple_source = use_tuple_source
+
+local function use_table_source_yield(tuples)
+    local chunks = {}
+    for i, t in ipairs(tuples) do
+        chunks[i] = {t}
+    end
+    local source = merger_lib.new_table_source(array_yield_next, chunks)
+    return source:select()
+end
+_G.use_table_source_yield = use_table_source_yield
+
+local function use_buffer_source_yield(tuples)
+    local buffers = {}
+    for i, t in ipairs(tuples) do
+        buffers[i] = buffer.ibuf()
+        msgpack.encode({t}, buffers[i])
+    end
+    local source = merger_lib.new_buffer_source(array_yield_next, buffers)
+    return source:select()
+end
+_G.use_buffer_source_yield = use_buffer_source_yield
+
+local function use_tuple_source_yield(tuples)
+    local source = merger_lib.new_tuple_source(array_yield_next, tuples)
+    return source:select()
+end
+_G.use_tuple_source_yield = use_tuple_source_yield
+
+-- }}}
+
+box.cfg({
+    listen = os.getenv('LISTEN') or 'localhost:3301'
+})
+box.schema.user.grant('guest', 'execute', 'universe', nil,
+                      {if_not_exists = true})
+
+local test = tap.test('gh-4954-merger-via-net-box.test.lua')
+test:plan(6)
+
+local tuples = {
+    {1},
+    {2},
+    {3},
+}
+
+local connection = net_box.connect(box.cfg.listen)
+
+local res = connection:call('use_table_source', {tuples})
+test:is_deeply(res, tuples, 'verify table source')
+local res = connection:call('use_buffer_source', {tuples})
+test:is_deeply(res, tuples, 'verify buffer source')
+local res = connection:call('use_tuple_source', {tuples})
+test:is_deeply(res, tuples, 'verify tuple source')
+
+local function test_verify_source_async(test, func_name, request_count)
+    test:plan(request_count)
+
+    local futures = {}
+    for _ = 1, request_count do
+        local future = connection:call(func_name, {tuples}, {is_async = true})
+        table.insert(futures, future)
+    end
+    for i = 1, request_count do
+        local res = unpack(futures[i]:wait_result())
+        test:is_deeply(res, tuples, ('verify request %d'):format(i))
+    end
+end
+
+test:test('verify table source, which yields', test_verify_source_async,
+          'use_table_source_yield', 100)
+test:test('verify buffer source, which yields', test_verify_source_async,
+          'use_buffer_source_yield', 100)
+test:test('verify tuple source, which yields', test_verify_source_async,
+          'use_tuple_source_yield', 100)
+
+box.schema.user.revoke('guest', 'execute', 'universe')
+
+os.exit(test:check() and 0 or 1)
-- 
2.25.0



More information about the Tarantool-patches mailing list