[PATCH 1/1] box: expose on_commit/rollback triggers for Lua

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Thu Aug 16 20:57:10 MSK 2018


On commit/rollback triggers are already implemented
within Tarantool internals. The patch just exposes
them for Lua. Below the API is described, which
deserves an attention though.

Closes #857

@TarantoolBot document
Title: Document box.on_commit/on_rollback triggers
On commit/rollback triggers can be set similar to
space:on_replace triggers:

    box.on_commit/rollback(new_trigger, old_trigger)

A trigger can be set only inside an active
transaction. When a trigger is called, it takes 1
parameter: an iterator over the transaction
statements.

    box.on_commit(function(iterator)
        for i, old_tuple, new_tuple, space_id in iterator() do
            -- Do something with tuples and space ...
        end
    end)

On each step the iterator returns 4 values: statement
number (grows from 1 to statement count), old tuple or
nil, new tuple or nil and space id. Old tuple is not
nil when the statement updated or deleted the existing
tuple. New tuple is not nil when the statement updated
or inserted the tuple.

The iterator can not be used outside of the trigger.
Otherwise it throws an error.

On_commit/rollback triggers shall not fail, otherwise
Tarantool exits with panic.
---
https://github.com/tarantool/tarantool/tree/gerold103/gh-857-lua-on_commit-rollback
https://github.com/tarantool/tarantool/issues/857

A few words about real goal of the patch and why the issue became
so important. Vshard (https://github.com/tarantool/vshard/issues/73)
needs a way how to track committed statements for each space.
Committed means already written to WAL. On_replace is not suitable
since it does not guarantee that the statement will be commited.

What is more, on_commit should provide a way to get all the
committed statements right inside the trigger. It is needed due to
the same reason - if vshard collected statements via on_replace
and send them from on_commit, not all of them could be committed
actually. An example: vshard on_replace trigger saves a tuple,
then it is rolled back via rollback to savepoint. Or via next
on_replace trigger. And vshard has no way to detect it. All the
same is fair for any application that wants to track commits.

 src/box/lua/init.c               | 122 ++++++++++++++++++
 test/box/misc.result             |   2 +
 test/engine/transaction.result   | 270 +++++++++++++++++++++++++++++++++++++++
 test/engine/transaction.test.lua | 126 ++++++++++++++++++
 4 files changed, 520 insertions(+)

diff --git a/src/box/lua/init.c b/src/box/lua/init.c
index 02b4b56a2..303ed2e97 100644
--- a/src/box/lua/init.c
+++ b/src/box/lua/init.c
@@ -35,6 +35,7 @@
 #include <lualib.h>
 
 #include "lua/utils.h" /* luaT_error() */
+#include "lua/trigger.h"
 
 #include "box/box.h"
 #include "box/txn.h"
@@ -56,6 +57,7 @@
 #include "box/lua/cfg.h"
 #include "box/lua/xlog.h"
 #include "box/lua/console.h"
+#include "box/lua/tuple.h"
 
 extern char session_lua[],
 	tuple_lua[],
@@ -99,6 +101,124 @@ lbox_rollback(lua_State *L)
 	return 0;
 }
 
+/**
+ * Get a next txn statement from the current transaction. This is
+ * a C closure and 2 upvalues should be available: first is a
+ * transaction id, second is a previous statement. This function
+ * works only inside on commit trigger of the concrete
+ * transaction.
+ * It takes two parameters according to Lua 'for' semantics: the
+ * first is iterator (that here is nil and unused), the second is
+ * key of iteration - here it is integer growing from 1 to
+ * txn->n_rows.
+ * It returns values with respect to Lua 'for' as well: the first
+ * is the next key (previous + 1), the 2th - 4th are a statement
+ * attributes: old tuple or nil, new tuple or nil, space id.
+ */
+static int
+lbox_txn_iterator_next(struct lua_State *L)
+{
+	struct txn *txn = in_txn();
+	int64_t txn_id = luaL_toint64(L, lua_upvalueindex(1));
+	if (txn == NULL || txn->id != txn_id) {
+		diag_set(ClientError, ER_CURSOR_NO_TRANSACTION);
+		return luaT_error(L);
+	}
+	struct txn_stmt *stmt =
+		(struct txn_stmt *) lua_topointer(L, lua_upvalueindex(2));
+	if (stmt == NULL)
+		return 0;
+	while (stmt->row == NULL) {
+		stmt = stailq_next_entry(stmt, next);
+		if (stmt == NULL) {
+			lua_pushnil(L);
+			lua_replace(L, lua_upvalueindex(2));
+			return 0;
+		}
+	}
+	lua_pushinteger(L, lua_tointeger(L, 2) + 1);
+	if (stmt->old_tuple != NULL)
+		luaT_pushtuple(L, stmt->old_tuple);
+	else
+		lua_pushnil(L);
+	if (stmt->new_tuple != NULL)
+		luaT_pushtuple(L, stmt->new_tuple);
+	else
+		lua_pushnil(L);
+	lua_pushinteger(L, space_id(stmt->space));
+	/* Prepare a statement to the next call. */
+	stmt = stailq_next_entry(stmt, next);
+	lua_pushlightuserdata(L, stmt);
+	lua_replace(L, lua_upvalueindex(2));
+	return 4;
+}
+
+/**
+ * Open an iterator over the transaction statements. This is a C
+ * closure and 1 upvalue should be available - id of the
+ * transaction to iterate over.
+ * It returns 3 values which can be used in Lua 'for': iterator
+ * generator function, unused nil and the zero key.
+ */
+static int
+lbox_txn_pairs(struct lua_State *L)
+{
+	int64_t txn_id = luaL_toint64(L, lua_upvalueindex(1));
+	struct txn *txn = in_txn();
+	if (txn == NULL || txn->id != txn_id) {
+		diag_set(ClientError, ER_CURSOR_NO_TRANSACTION);
+		return luaT_error(L);
+	}
+	luaL_pushint64(L, txn_id);
+	lua_pushlightuserdata(L, stailq_first_entry(&txn->stmts,
+						    struct txn_stmt, next));
+	lua_pushcclosure(L, lbox_txn_iterator_next, 2);
+	lua_pushnil(L);
+	lua_pushinteger(L, 0);
+	return 3;
+}
+
+/**
+ * Push an argument for on_commit Lua trigger. The argument is
+ * a function to open an iterator over the transaction statements.
+ */
+static int
+lbox_push_txn(struct lua_State *L, void *event)
+{
+	struct txn *txn = (struct txn *) event;
+	luaL_pushint64(L, txn->id);
+	lua_pushcclosure(L, lbox_txn_pairs, 1);
+	return 1;
+}
+
+/**
+ * Update the transaction on_commit/rollback triggers.
+ * @sa lbox_trigger_reset.
+ */
+#define lbox_on_txn_end(name) do {                                             \
+	struct txn *txn = in_txn();                                            \
+	int top = lua_gettop(L);                                               \
+	if (top > 2 || txn == NULL) {                                          \
+		return luaL_error(L, "Usage inside a transaction: "            \
+				  "box.on_" #name "([function | nil, "         \
+				  "[function | nil]])");                       \
+	}                                                                      \
+	txn_init_triggers(txn);                                                \
+	return lbox_trigger_reset(L, 2, &txn->on_##name, lbox_push_txn, NULL); \
+} while (0)
+
+static int
+lbox_on_commit(struct lua_State *L)
+{
+	lbox_on_txn_end(commit);
+}
+
+static int
+lbox_on_rollback(struct lua_State *L)
+{
+	lbox_on_txn_end(rollback);
+}
+
 static int
 lbox_snapshot(struct lua_State *L)
 {
@@ -157,6 +277,8 @@ lbox_backup_stop(struct lua_State *L)
 static const struct luaL_Reg boxlib[] = {
 	{"commit", lbox_commit},
 	{"rollback", lbox_rollback},
+	{"on_commit", lbox_on_commit},
+	{"on_rollback", lbox_on_rollback},
 	{"snapshot", lbox_snapshot},
 	{NULL, NULL}
 };
diff --git a/test/box/misc.result b/test/box/misc.result
index 8934e0c87..62376754e 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -67,6 +67,8 @@ t
   - index
   - info
   - internal
+  - on_commit
+  - on_rollback
   - once
   - priv
   - rollback
diff --git a/test/engine/transaction.result b/test/engine/transaction.result
index cc6345da0..b18b025cc 100644
--- a/test/engine/transaction.result
+++ b/test/engine/transaction.result
@@ -257,3 +257,273 @@ inspector:cmd("setopt delimiter ''");
 space:drop()
 ---
 ...
+--
+-- gh-857: on_commit/rollback triggers in Lua.
+--
+s = box.schema.create_space('test')
+---
+...
+_ = s:create_index('pk')
+---
+...
+call_list = {}
+---
+...
+on_commit = {}
+---
+...
+inspector:cmd("setopt delimiter ';'")
+---
+- true
+...
+for i = 1, 3 do
+    table.insert(on_commit, function()
+        table.insert(call_list, 'on_commit_'..tostring(i))
+    end)
+end;
+---
+...
+function on_rollback()
+    table.insert(call_list, 'on_rollback')
+end;
+---
+...
+inspector:cmd("setopt delimiter ''");
+---
+- true
+...
+-- A basic test: when a transaction is committed, the appropriate
+-- trigger is called, in opposite to its on_rollback vis-a-vis.
+box.begin() s:replace{1} box.on_commit(on_commit[1]) box.on_rollback(on_rollback) box.commit()
+---
+...
+call_list
+---
+- - on_commit_1
+...
+call_list = {}
+---
+...
+-- The same vice versa.
+box.begin() s:replace{2} box.on_commit(on_commit[1]) box.on_rollback(on_rollback) box.rollback()
+---
+...
+call_list
+---
+- - on_rollback
+...
+call_list = {}
+---
+...
+-- Incorrect usage outside of a transaction.
+box.on_commit()
+---
+- error: 'Usage inside a transaction: box.on_commit([function | nil, [function | nil]])'
+...
+-- Incorrect usage inside a transaction.
+box.begin() s:replace{3} box.on_rollback(on_rollback) box.on_commit(100)
+---
+- error: '[string "box.begin() s:replace{3} box.on_rollback(on_r..."]:1: trigger reset:
+    incorrect arguments'
+...
+box.rollback()
+---
+...
+call_list
+---
+- - on_rollback
+...
+call_list = {}
+---
+...
+-- Multiple triggers.
+funcs = {}
+---
+...
+inspector:cmd("setopt delimiter ';'")
+---
+- true
+...
+box.begin()
+table.insert(funcs, box.on_commit())
+box.on_commit(on_commit[1])
+table.insert(funcs, box.on_commit())
+box.on_commit(on_commit[2])
+table.insert(funcs, box.on_commit())
+box.on_commit(on_commit[3])
+box.on_commit(on_commit[3])
+table.insert(funcs, box.on_commit())
+box.on_commit(on_commit[3], on_commit[2])
+table.insert(funcs, box.on_commit())
+box.commit();
+---
+...
+for _, list in pairs(funcs) do
+    for i, f1 in pairs(list) do
+        for j, f2 in pairs(on_commit) do
+            if f1 == f2 then
+                list[i] = 'on_commit_'..tostring(j)
+                break
+            end
+        end
+    end
+end;
+---
+...
+inspector:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Yes, the order is reversed. Like all other Lua triggers.
+call_list
+---
+- - on_commit_3
+  - on_commit_3
+  - on_commit_3
+  - on_commit_1
+...
+funcs
+---
+- - []
+  - - on_commit_1
+  - - on_commit_1
+    - on_commit_2
+  - - on_commit_1
+    - on_commit_2
+    - on_commit_3
+    - on_commit_3
+  - - on_commit_1
+    - on_commit_3
+    - on_commit_3
+    - on_commit_3
+...
+--
+-- Test on_commit/rollback txn iterators.
+--
+iter = nil
+---
+...
+function steal_iterator(iter_) iter = iter_ end
+---
+...
+box.begin() s:replace{1, 1} box.on_commit(steal_iterator) box.commit()
+---
+...
+-- Fail. No transaction.
+iter()
+---
+- error: The transaction the cursor belongs to has ended
+...
+-- Fail. Not the same transaction.
+box.begin() iter()
+---
+- error: The transaction the cursor belongs to has ended
+...
+box.rollback()
+---
+...
+-- Test the same for the iterator generator.
+a = nil
+---
+...
+b = nil
+---
+...
+c = nil
+---
+...
+function steal_iterator(iter) a, b, c = iter() end
+---
+...
+box.begin() s:replace{1, 1} box.on_commit(steal_iterator) box.commit()
+---
+...
+a(b, c)
+---
+- error: The transaction the cursor belongs to has ended
+...
+box.begin() a(b, c)
+---
+- error: The transaction the cursor belongs to has ended
+...
+box.rollback()
+---
+...
+-- Test appropriate usage.
+for i = 1, 5 do s:replace{i} end
+---
+...
+stmts = nil
+---
+...
+inspector:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_commit(iter)
+    stmts = {}
+    for i, old, new, space in iter() do
+        stmts[i] = {old, new, space == s.id or space}
+    end
+end;
+---
+...
+-- Test read and rolled back statements.
+-- Test save point rollbacks.
+box.begin()
+s:replace{1, 1}
+s:replace{2, 2}
+s:delete{3}
+s:replace{6, 6}
+s:replace{0, 0}
+s:get{3}
+s:get{1}
+pcall(s.insert, s, {1})
+box.on_commit(on_commit)
+s:replace{7, 7}
+sv = box.savepoint()
+s:replace{8, 8}
+box.rollback_to_savepoint(sv)
+s:replace{9, 9}
+box.commit();
+---
+...
+inspector:cmd("setopt delimiter ''");
+---
+- true
+...
+stmts
+---
+- - - [1]
+    - [1, 1]
+    - true
+  - - [2]
+    - [2, 2]
+    - true
+  - - [3]
+    - null
+    - true
+  - - null
+    - [6, 6]
+    - true
+  - - null
+    - [0, 0]
+    - true
+  - - null
+    - [7, 7]
+    - true
+  - - null
+    - [9, 9]
+    - true
+...
+-- Check empty transaction iteration.
+box.begin() box.on_commit(on_commit) box.commit()
+---
+...
+stmts
+---
+- []
+...
+s:drop()
+---
+...
diff --git a/test/engine/transaction.test.lua b/test/engine/transaction.test.lua
index b781a785a..51756f933 100644
--- a/test/engine/transaction.test.lua
+++ b/test/engine/transaction.test.lua
@@ -102,3 +102,129 @@ space:get({0})
 box.rollback();
 inspector:cmd("setopt delimiter ''");
 space:drop()
+
+--
+-- gh-857: on_commit/rollback triggers in Lua.
+--
+s = box.schema.create_space('test')
+_ = s:create_index('pk')
+call_list = {}
+on_commit = {}
+inspector:cmd("setopt delimiter ';'")
+for i = 1, 3 do
+    table.insert(on_commit, function()
+        table.insert(call_list, 'on_commit_'..tostring(i))
+    end)
+end;
+function on_rollback()
+    table.insert(call_list, 'on_rollback')
+end;
+inspector:cmd("setopt delimiter ''");
+
+-- A basic test: when a transaction is committed, the appropriate
+-- trigger is called, in opposite to its on_rollback vis-a-vis.
+box.begin() s:replace{1} box.on_commit(on_commit[1]) box.on_rollback(on_rollback) box.commit()
+call_list
+call_list = {}
+
+-- The same vice versa.
+box.begin() s:replace{2} box.on_commit(on_commit[1]) box.on_rollback(on_rollback) box.rollback()
+call_list
+call_list = {}
+
+-- Incorrect usage outside of a transaction.
+box.on_commit()
+
+-- Incorrect usage inside a transaction.
+box.begin() s:replace{3} box.on_rollback(on_rollback) box.on_commit(100)
+box.rollback()
+call_list
+call_list = {}
+
+-- Multiple triggers.
+funcs = {}
+inspector:cmd("setopt delimiter ';'")
+box.begin()
+table.insert(funcs, box.on_commit())
+box.on_commit(on_commit[1])
+table.insert(funcs, box.on_commit())
+box.on_commit(on_commit[2])
+table.insert(funcs, box.on_commit())
+box.on_commit(on_commit[3])
+box.on_commit(on_commit[3])
+table.insert(funcs, box.on_commit())
+box.on_commit(on_commit[3], on_commit[2])
+table.insert(funcs, box.on_commit())
+box.commit();
+
+for _, list in pairs(funcs) do
+    for i, f1 in pairs(list) do
+        for j, f2 in pairs(on_commit) do
+            if f1 == f2 then
+                list[i] = 'on_commit_'..tostring(j)
+                break
+            end
+        end
+    end
+end;
+inspector:cmd("setopt delimiter ''");
+-- Yes, the order is reversed. Like all other Lua triggers.
+call_list
+funcs
+--
+-- Test on_commit/rollback txn iterators.
+--
+iter = nil
+function steal_iterator(iter_) iter = iter_ end
+box.begin() s:replace{1, 1} box.on_commit(steal_iterator) box.commit()
+-- Fail. No transaction.
+iter()
+-- Fail. Not the same transaction.
+box.begin() iter()
+box.rollback()
+-- Test the same for the iterator generator.
+a = nil
+b = nil
+c = nil
+function steal_iterator(iter) a, b, c = iter() end
+box.begin() s:replace{1, 1} box.on_commit(steal_iterator) box.commit()
+a(b, c)
+box.begin() a(b, c)
+box.rollback()
+-- Test appropriate usage.
+for i = 1, 5 do s:replace{i} end
+stmts = nil
+inspector:cmd("setopt delimiter ';'")
+function on_commit(iter)
+    stmts = {}
+    for i, old, new, space in iter() do
+        stmts[i] = {old, new, space == s.id or space}
+    end
+end;
+
+box.begin()
+s:replace{1, 1}
+s:replace{2, 2}
+s:delete{3}
+s:replace{6, 6}
+s:replace{0, 0}
+-- Test read and rolled back statements.
+s:get{3}
+s:get{1}
+pcall(s.insert, s, {1})
+box.on_commit(on_commit)
+s:replace{7, 7}
+-- Test save point rollbacks.
+sv = box.savepoint()
+s:replace{8, 8}
+box.rollback_to_savepoint(sv)
+s:replace{9, 9}
+box.commit();
+inspector:cmd("setopt delimiter ''");
+stmts
+
+-- Check empty transaction iteration.
+box.begin() box.on_commit(on_commit) box.commit()
+stmts
+
+s:drop()
-- 
2.15.2 (Apple Git-101.1)




More information about the Tarantool-patches mailing list