[server 4/4] Introduce BEFORE trigger
Vladimir Davydov
vdavydov.dev at gmail.com
Tue Jan 23 19:28:58 MSK 2018
To register a BEFORE trigger for a space, call space:before_replace()
function. Similarly to space:on_replace(), this function takes a new
trigger callback as the first argument and a function to remove from
the registered trigger list as the second optional argument.
Trigger callbacks are executed from space_execute_dml(), right before
passing down a request to the engine implementation, but after resolving
the space sequence. Just like on_replace, a before_replace callback is
passed old and new tuples, but it can also return a tuple or nil, which
will affect the current statement as follows:
- If a callback function returns the old tuple, the statement is
ignored and IPROTO_NOP is written to xlog to bump LSN.
- If a callback function returns the new tuple or doesn't return
anything, the statement is executed as is.
- If a callback function returns nil, the statement is turned into
DELETE.
- If a callback function returns a tuple, the statement is turned
into REPLACE for this tuple.
Other return values result in ER_BEFORE_REPLACE_RET error.
Note, the trigger must not change the primary key of the old tuple,
because that would require splitting the resulting statement into two -
DELETE and REPLACE.
The new trigger can be used to resolve asynchronous replication
conflicts as illustrated by replication/before_replace test.
Closes #2993
---
src/box/alter.cc | 1 +
src/box/errcode.h | 2 +-
src/box/lua/sequence.c | 2 +-
src/box/lua/session.c | 8 +-
src/box/lua/space.cc | 56 ++++-
src/box/request.c | 212 +++++++++++++++++++
src/box/request.h | 13 ++
src/box/space.c | 29 ++-
src/box/space.h | 2 +
src/box/tuple.c | 6 +-
src/box/tuple.h | 6 +
src/lua/trigger.c | 18 +-
src/lua/trigger.h | 15 +-
test/box/before_replace.result | 350 +++++++++++++++++++++++++++++++
test/box/before_replace.test.lua | 113 ++++++++++
test/box/misc.result | 57 ++---
test/replication/before_replace.result | 168 +++++++++++++++
test/replication/before_replace.test.lua | 63 ++++++
test/replication/suite.ini | 2 +-
19 files changed, 1072 insertions(+), 51 deletions(-)
create mode 100644 test/box/before_replace.result
create mode 100644 test/box/before_replace.test.lua
create mode 100644 test/replication/before_replace.result
create mode 100644 test/replication/before_replace.test.lua
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 680a051f..fc341d38 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -531,6 +531,7 @@ space_def_new_from_tuple(struct tuple *tuple, uint32_t errcode,
static void
space_swap_triggers(struct space *new_space, struct space *old_space)
{
+ rlist_swap(&new_space->before_replace, &old_space->before_replace);
rlist_swap(&new_space->on_replace, &old_space->on_replace);
rlist_swap(&new_space->on_stmt_begin, &old_space->on_stmt_begin);
}
diff --git a/src/box/errcode.h b/src/box/errcode.h
index 20d11cd1..cdb286ff 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -105,7 +105,7 @@ struct errcode_record {
/* 50 */_(ER_CREATE_FUNCTION, "Failed to create function '%s': %s") \
/* 51 */_(ER_NO_SUCH_FUNCTION, "Function '%s' does not exist") \
/* 52 */_(ER_FUNCTION_EXISTS, "Function '%s' already exists") \
- /* 53 */_(ER_UNUSED3, "") \
+ /* 53 */_(ER_BEFORE_REPLACE_RET, "Invalid return value of space:before_replace trigger: expected tuple or nil") \
/* 54 */_(ER_FUNCTION_MAX, "A limit on the total number of functions has been reached: %u") \
/* 55 */_(ER_UNUSED4, "") \
/* 56 */_(ER_USER_MAX, "A limit on the total number of users has been reached: %u") \
diff --git a/src/box/lua/sequence.c b/src/box/lua/sequence.c
index 9cf35901..2fead2eb 100644
--- a/src/box/lua/sequence.c
+++ b/src/box/lua/sequence.c
@@ -89,7 +89,7 @@ static int
lbox_sequence_on_alter(struct lua_State *L)
{
return lbox_trigger_reset(L, 2, &on_alter_sequence,
- lbox_sequence_push_on_alter_event);
+ lbox_sequence_push_on_alter_event, NULL);
}
void
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index 0bdfb8c7..ad1c6cc2 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -292,7 +292,7 @@ static int
lbox_session_on_connect(struct lua_State *L)
{
return lbox_trigger_reset(L, 2, &session_on_connect,
- lbox_push_on_connect_event);
+ lbox_push_on_connect_event, NULL);
}
static int
@@ -308,7 +308,7 @@ static int
lbox_session_on_disconnect(struct lua_State *L)
{
return lbox_trigger_reset(L, 2, &session_on_disconnect,
- lbox_push_on_connect_event);
+ lbox_push_on_connect_event, NULL);
}
static int
@@ -324,7 +324,7 @@ static int
lbox_session_on_auth(struct lua_State *L)
{
return lbox_trigger_reset(L, 2, &session_on_auth,
- lbox_push_on_auth_event);
+ lbox_push_on_auth_event, NULL);
}
static int
@@ -363,7 +363,7 @@ static int
lbox_session_on_access_denied(struct lua_State *L)
{
return lbox_trigger_reset(L, 2, &on_access_denied,
- lbox_push_on_access_denied_event);
+ lbox_push_on_access_denied_event, NULL);
}
void
diff --git a/src/box/lua/space.cc b/src/box/lua/space.cc
index 3a4fe5b3..e27fe18a 100644
--- a/src/box/lua/space.cc
+++ b/src/box/lua/space.cc
@@ -51,7 +51,7 @@ extern "C" {
* Trigger function for all spaces
*/
static int
-lbox_push_on_replace_event(struct lua_State *L, void *event)
+lbox_push_txn_stmt(struct lua_State *L, void *event)
{
struct txn_stmt *stmt = txn_current_stmt((struct txn *) event);
@@ -70,6 +70,32 @@ lbox_push_on_replace_event(struct lua_State *L, void *event)
return 3;
}
+static int
+lbox_pop_txn_stmt(struct lua_State *L, void *event)
+{
+ struct txn_stmt *stmt = txn_current_stmt((struct txn *) event);
+
+ if (lua_gettop(L) < 1) {
+ /* No return value - nothing to do. */
+ return 0;
+ }
+
+ struct tuple *result = luaT_istuple(L, 1);
+ if (result == NULL && !lua_isnil(L, 1)) {
+ /* Invalid return value - ignore. */
+ diag_set(ClientError, ER_BEFORE_REPLACE_RET);
+ return -1;
+ }
+
+ /* Update the new tuple. */
+ if (result != NULL)
+ tuple_ref(result);
+ if (stmt->new_tuple != NULL)
+ tuple_unref(stmt->new_tuple);
+ stmt->new_tuple = result;
+ return 0;
+}
+
/**
* Set/Reset/Get space.on_replace trigger
*/
@@ -88,7 +114,28 @@ lbox_space_on_replace(struct lua_State *L)
lua_pop(L, 1);
return lbox_trigger_reset(L, 3, &space->on_replace,
- lbox_push_on_replace_event);
+ lbox_push_txn_stmt, NULL);
+}
+
+/**
+ * Set/Reset/Get space.before_replace trigger
+ */
+static int
+lbox_space_before_replace(struct lua_State *L)
+{
+ int top = lua_gettop(L);
+
+ if (top < 1 || !lua_istable(L, 1)) {
+ luaL_error(L,
+ "usage: space:before_replace(function | nil, [function | nil])");
+ }
+ lua_getfield(L, 1, "id"); /* Get space id. */
+ uint32_t id = lua_tonumber(L, lua_gettop(L));
+ struct space *space = space_cache_find_xc(id);
+ lua_pop(L, 1);
+
+ return lbox_trigger_reset(L, 3, &space->before_replace,
+ lbox_push_txn_stmt, lbox_pop_txn_stmt);
}
/**
@@ -136,6 +183,11 @@ lbox_fillspace(struct lua_State *L, struct space *space, int i)
lua_pushcfunction(L, lbox_space_on_replace);
lua_settable(L, i);
+ /* space:before_replace */
+ lua_pushstring(L, "before_replace");
+ lua_pushcfunction(L, lbox_space_before_replace);
+ lua_settable(L, i);
+
lua_getfield(L, i, "index");
if (lua_isnil(L, -1)) {
lua_pop(L, 1);
diff --git a/src/box/request.c b/src/box/request.c
index 9118dcc7..f3eac5dc 100644
--- a/src/box/request.c
+++ b/src/box/request.c
@@ -30,6 +30,7 @@
*/
#include "request.h"
+#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <string.h>
@@ -42,6 +43,9 @@
#include "sequence.h"
#include "key_def.h"
#include "tuple.h"
+#include "tuple_compare.h"
+#include "tuple_update.h"
+#include "txn.h"
#include "xrow.h"
#include "iproto_constants.h"
@@ -151,3 +155,211 @@ request_handle_sequence(struct request *request, struct space *space)
}
return 0;
}
+
+/**
+ * Given old and new tuples, initialize the corresponding
+ * request to be written to WAL.
+ */
+static int
+request_create_from_tuple(struct request *request, struct space *space,
+ struct tuple *old_tuple, struct tuple *new_tuple)
+{
+ memset(request, 0, sizeof(*request));
+ request->space_id = space->def->id;
+ if (old_tuple == new_tuple) {
+ /*
+ * Old and new tuples are the same,
+ * turn this request into no-op.
+ */
+ request->type = IPROTO_NOP;
+ return 0;
+ }
+ if (new_tuple == NULL) {
+ uint32_t size, key_size;
+ const char *data = tuple_data_range(old_tuple, &size);
+ request->key = tuple_extract_key_raw(data, data + size,
+ space->index[0]->def->key_def, &key_size);
+ if (request->key == NULL)
+ return -1;
+ request->key_end = request->key + key_size;
+ request->type = IPROTO_DELETE;
+ } else {
+ uint32_t size;
+ const char *data = tuple_data_range(new_tuple, &size);
+ char *buf = region_alloc(&fiber()->gc, size);
+ if (buf == NULL)
+ return -1;
+ memcpy(buf, data, size);
+ request->tuple = buf;
+ request->tuple_end = buf + size;
+ request->type = IPROTO_REPLACE;
+ }
+ return 0;
+}
+
+int
+request_before_replace(struct request *request, struct space *space,
+ struct txn *txn)
+{
+ if (space->index_count == 0) {
+ /* Empty space, nothing to do. */
+ return 0;
+ }
+
+ struct region *gc = &fiber()->gc;
+ enum iproto_type type = request->type;
+ struct index *pk = space->index[0];
+
+ const char *key;
+ uint32_t part_count;
+ struct index *index;
+
+ /*
+ * Lookup the old tuple.
+ */
+ if (type == IPROTO_UPDATE || type == IPROTO_DELETE) {
+ index = index_find_unique(space, request->index_id);
+ if (index == NULL)
+ return -1;
+ key = request->key;
+ part_count = mp_decode_array(&key);
+ if (exact_key_validate(index->def->key_def,
+ key, part_count) != 0)
+ return -1;
+ } else if (type == IPROTO_INSERT || type == IPROTO_REPLACE ||
+ type == IPROTO_UPSERT) {
+ index = pk;
+ key = tuple_extract_key_raw(request->tuple, request->tuple_end,
+ index->def->key_def, NULL);
+ if (key == NULL)
+ return -1;
+ part_count = mp_decode_array(&key);
+ } else {
+ /* Unknown request type, nothing to do. */
+ return 0;
+ }
+
+ struct tuple *old_tuple;
+ if (index_get(index, key, part_count, &old_tuple) != 0)
+ return -1;
+
+ /*
+ * Create the new tuple.
+ */
+ uint32_t new_size, old_size;
+ const char *new_data, *new_data_end;
+ const char *old_data, *old_data_end;
+
+ switch (request->type) {
+ case IPROTO_INSERT:
+ case IPROTO_REPLACE:
+ new_data = request->tuple;
+ new_data_end = request->tuple_end;
+ break;
+ case IPROTO_UPDATE:
+ if (old_tuple == NULL) {
+ /* Nothing to update. */
+ return 0;
+ }
+ old_data = tuple_data_range(old_tuple, &old_size);
+ old_data_end = old_data + old_size;
+ new_data = tuple_update_execute(region_aligned_alloc_cb, gc,
+ request->tuple, request->tuple_end,
+ old_data, old_data_end, &new_size,
+ request->index_base, NULL);
+ if (new_data == NULL)
+ return -1;
+ new_data_end = new_data + new_size;
+ break;
+ case IPROTO_DELETE:
+ if (old_tuple == NULL) {
+ /* Nothing to delete. */
+ return 0;
+ }
+ new_data = new_data_end = NULL;
+ break;
+ case IPROTO_UPSERT:
+ if (old_tuple == NULL) {
+ /*
+ * Turn UPSERT into INSERT, but still check
+ * provided operations.
+ */
+ new_data = request->tuple;
+ new_data_end = request->tuple_end;
+ if (tuple_update_check_ops(region_aligned_alloc_cb, gc,
+ request->ops, request->ops_end,
+ request->index_base) != 0)
+ return -1;
+ break;
+ }
+ old_data = tuple_data_range(old_tuple, &old_size);
+ old_data_end = old_data + old_size;
+ new_data = tuple_upsert_execute(region_aligned_alloc_cb, gc,
+ request->ops, request->ops_end,
+ old_data, old_data_end, &new_size,
+ request->index_base, false, NULL);
+ new_data_end = new_data + new_size;
+ break;
+ default:
+ unreachable();
+ }
+
+ struct tuple *new_tuple = NULL;
+ if (new_data != NULL) {
+ new_tuple = tuple_new(tuple_format_runtime,
+ new_data, new_data_end);
+ if (new_tuple == NULL)
+ return -1;
+ tuple_ref(new_tuple);
+ }
+
+ assert(old_tuple != NULL || new_tuple != NULL);
+
+ /*
+ * Execute all registered BEFORE triggers.
+ */
+ struct txn_stmt *stmt = txn_current_stmt(txn);
+ assert(stmt->old_tuple == NULL && stmt->new_tuple == NULL);
+ stmt->old_tuple = old_tuple;
+ stmt->new_tuple = new_tuple;
+
+ int rc = trigger_run(&space->before_replace, txn);
+
+ /*
+ * The trigger can't change the old tuple,
+ * but it may replace the new tuple.
+ */
+ bool request_changed = (stmt->new_tuple != new_tuple);
+ new_tuple = stmt->new_tuple;
+ assert(stmt->old_tuple == old_tuple);
+ stmt->old_tuple = NULL;
+ stmt->new_tuple = NULL;
+
+ if (rc != 0)
+ goto out;
+
+ /*
+ * We don't allow to change the value of the primary key
+ * in the same statement.
+ */
+ if (request_changed && old_tuple != NULL && new_tuple != NULL &&
+ tuple_compare(old_tuple, new_tuple, pk->def->key_def) != 0) {
+ diag_set(ClientError, ER_CANT_UPDATE_PRIMARY_KEY,
+ pk->def->name, space->def->name);
+ rc = -1;
+ goto out;
+ }
+
+ /*
+ * The trigger changed the resulting tuple.
+ * Fix the request to conform.
+ */
+ if (request_changed) {
+ rc = request_create_from_tuple(request, space,
+ old_tuple, new_tuple);
+ }
+out:
+ if (new_tuple != NULL)
+ tuple_unref(new_tuple);
+ return rc;
+}
diff --git a/src/box/request.h b/src/box/request.h
index ff1d97e0..aca56ea7 100644
--- a/src/box/request.h
+++ b/src/box/request.h
@@ -37,6 +37,7 @@ extern "C" {
struct request;
struct space;
+struct txn;
struct tuple;
/**
@@ -62,6 +63,18 @@ request_rebind_to_primary_key(struct request *request, struct space *space,
int
request_handle_sequence(struct request *request, struct space *space);
+/**
+ * Run BEFORE triggers registered for a space. If a trigger
+ * changes the current statement, this function updates the
+ * request accordingly.
+ *
+ * @param request - request to fix
+ * @param space - space corresponding to request
+ */
+int
+request_before_replace(struct request *request, struct space *space,
+ struct txn *txn);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/space.c b/src/box/space.c
index d3692094..b59d0984 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -120,6 +120,7 @@ space_create(struct space *space, struct engine *engine,
space->engine = engine;
space->index_count = index_count;
space->index_id_max = index_id_max;
+ rlist_create(&space->before_replace);
rlist_create(&space->on_replace);
rlist_create(&space->on_stmt_begin);
space->run_triggers = true;
@@ -185,6 +186,7 @@ space_delete(struct space *space)
free(space->index_map);
if (space->format != NULL)
tuple_format_unref(space->format);
+ trigger_destroy(&space->before_replace);
trigger_destroy(&space->on_replace);
trigger_destroy(&space->on_stmt_begin);
space_def_delete(space->def);
@@ -312,12 +314,33 @@ int
space_execute_dml(struct space *space, struct txn *txn,
struct request *request, struct tuple **result)
{
+ if (unlikely(space->sequence != NULL) &&
+ (request->type == IPROTO_INSERT ||
+ request->type == IPROTO_REPLACE)) {
+ /*
+ * The space has a sequence associated with it.
+ * If the tuple has 'nil' for the primary key,
+ * we should replace it with the next sequence
+ * value.
+ */
+ if (request_handle_sequence(request, space) != 0)
+ return -1;
+ }
+
+ if (unlikely(!rlist_empty(&space->before_replace) &&
+ space->run_triggers)) {
+ /*
+ * Call BEFORE triggers if any before dispatching
+ * the request. Note, it may change the request
+ * type and arguments.
+ */
+ if (request_before_replace(request, space, txn) != 0)
+ return -1;
+ }
+
switch (request->type) {
case IPROTO_INSERT:
case IPROTO_REPLACE:
- if (space->sequence != NULL &&
- request_handle_sequence(request, space) != 0)
- return -1;
if (space->vtab->execute_replace(space, txn,
request, result) != 0)
return -1;
diff --git a/src/box/space.h b/src/box/space.h
index 69dea2a2..8918dbd8 100644
--- a/src/box/space.h
+++ b/src/box/space.h
@@ -143,6 +143,8 @@ struct space {
struct access access[BOX_USER_MAX];
/** Engine used by this space. */
struct engine *engine;
+ /** Triggers fired before executing a request. */
+ struct rlist before_replace;
/** Triggers fired after space_replace() -- see txn_commit_stmt(). */
struct rlist on_replace;
/** Triggers fired before space statement */
diff --git a/src/box/tuple.c b/src/box/tuple.c
index 54325af4..b41f0039 100644
--- a/src/box/tuple.c
+++ b/src/box/tuple.c
@@ -56,11 +56,7 @@ static const double ALLOC_FACTOR = 1.05;
*/
struct tuple *box_tuple_last;
-/**
- * A format for standalone tuples allocated on runtime arena.
- * \sa tuple_new().
- */
-static struct tuple_format *tuple_format_runtime;
+struct tuple_format *tuple_format_runtime;
static void
runtime_tuple_delete(struct tuple_format *format, struct tuple *tuple);
diff --git a/src/box/tuple.h b/src/box/tuple.h
index e51591ff..27c47505 100644
--- a/src/box/tuple.h
+++ b/src/box/tuple.h
@@ -44,6 +44,12 @@ extern "C" {
struct slab_arena;
struct quota;
+/**
+ * A format for standalone tuples allocated on runtime arena.
+ * \sa tuple_new().
+ */
+extern struct tuple_format *tuple_format_runtime;
+
/** Initialize tuple library */
int
tuple_init(field_name_hash_f hash);
diff --git a/src/lua/trigger.c b/src/lua/trigger.c
index 49e8d26b..c758e47e 100644
--- a/src/lua/trigger.c
+++ b/src/lua/trigger.c
@@ -44,6 +44,12 @@ struct lbox_trigger
* Lua trigger.
*/
lbox_push_event_f push_event;
+ /**
+ * A pointer to a C function which is called
+ * upon successful execution of the trigger
+ * callback.
+ */
+ lbox_pop_event_f pop_event;
};
static void
@@ -78,7 +84,12 @@ lbox_trigger_run(struct trigger *ptr, void *event)
int coro_ref = luaL_ref(tarantool_L, LUA_REGISTRYINDEX);
lua_rawgeti(L, LUA_REGISTRYINDEX, trigger->ref);
int top = trigger->push_event(L, event);
- if (luaT_call(L, top, 0)) {
+ if (luaT_call(L, top, LUA_MULTRET)) {
+ luaL_unref(tarantool_L, LUA_REGISTRYINDEX, coro_ref);
+ diag_raise();
+ }
+ if (trigger->pop_event != NULL &&
+ trigger->pop_event(L, event) != 0) {
luaL_unref(tarantool_L, LUA_REGISTRYINDEX, coro_ref);
diag_raise();
}
@@ -142,8 +153,8 @@ lbox_trigger_check_input(struct lua_State *L, int top)
}
int
-lbox_trigger_reset(struct lua_State *L, int top,
- struct rlist *list, lbox_push_event_f push_event)
+lbox_trigger_reset(struct lua_State *L, int top, struct rlist *list,
+ lbox_push_event_f push_event, lbox_pop_event_f pop_event)
{
/**
* If the stack is empty, pushes nils for optional
@@ -176,6 +187,7 @@ lbox_trigger_reset(struct lua_State *L, int top,
trg->base.destroy = lbox_trigger_destroy;
trg->ref = LUA_NOREF;
trg->push_event = push_event;
+ trg->pop_event = pop_event;
trigger_add(list, &trg->base);
}
/*
diff --git a/src/lua/trigger.h b/src/lua/trigger.h
index ef44e917..8901f5de 100644
--- a/src/lua/trigger.h
+++ b/src/lua/trigger.h
@@ -41,11 +41,20 @@ struct lua_State;
* The job of lbox_push_event_f is to push trigger arguments
* to Lua stack.
*/
-
typedef int
(*lbox_push_event_f)(struct lua_State *L, void *event);
/**
+ * If not NULL, lbox_pop_event_f will be called after successful
+ * execution of the trigger callback. It can be used to parse the
+ * return value of the trigger callback and update the 'event'
+ * accordingly. If this function returns a non-zero value, an
+ * error will be raised for the caller.
+ */
+typedef int
+(*lbox_pop_event_f)(struct lua_State *L, void *event);
+
+/**
* Create a Lua trigger, replace an existing one,
* or delete a trigger.
*
@@ -76,8 +85,8 @@ typedef int
* optional).
*/
int
-lbox_trigger_reset(struct lua_State *L, int top,
- struct rlist *list, lbox_push_event_f push_f);
+lbox_trigger_reset(struct lua_State *L, int top, struct rlist *list,
+ lbox_push_event_f push_f, lbox_pop_event_f pop_f);
#if defined(__cplusplus)
} /* extern "C" */
diff --git a/test/box/before_replace.result b/test/box/before_replace.result
new file mode 100644
index 00000000..a46e3b8b
--- /dev/null
+++ b/test/box/before_replace.result
@@ -0,0 +1,350 @@
+s = box.schema.space.create('test')
+---
+...
+_ = s:create_index('primary')
+---
+...
+_ = s:create_index('secondary', {unique = false, parts = {2, 'unsigned'}})
+---
+...
+function fail(old, new) error('fail') end
+---
+...
+function save(old, new) old_tuple = old new_tuple = new end
+---
+...
+function nop(old, new) return new end
+---
+...
+function ignore(old, new) return old end
+---
+...
+function delete(old, new) return nil end
+---
+...
+function update(old, new) return box.tuple.update(new, {{'+', 3, 1}}) end
+---
+...
+function bad_ret(old, new) return 'test' end
+---
+...
+function bad_update(old, new) return box.tuple.update(new, {{'+', 1, 1}}) end
+---
+...
+-- Exception in trigger.
+type(s:before_replace(fail))
+---
+- function
+...
+s:insert{1, 1}
+---
+- error: '[string "function fail(old, new) error(''fail'') end "]:1: fail'
+...
+s:select()
+---
+- []
+...
+s:before_replace(nil, fail)
+---
+...
+-- Check 'old' and 'new' trigger arguments.
+old_tuple = nil
+---
+...
+new_tuple = nil
+---
+...
+type(s:before_replace(save))
+---
+- function
+...
+s:insert{1, 1}
+---
+- [1, 1]
+...
+old_tuple, new_tuple
+---
+- null
+- [1, 1]
+...
+s:replace{1, 2}
+---
+- [1, 2]
+...
+old_tuple, new_tuple
+---
+- [1, 1]
+- [1, 2]
+...
+s:update(1, {{'+', 2, 1}})
+---
+- [1, 3]
+...
+old_tuple, new_tuple
+---
+- [1, 2]
+- [1, 3]
+...
+s:upsert({1, 1}, {{'=', 2, 1}})
+---
+...
+old_tuple, new_tuple
+---
+- [1, 3]
+- [1, 1]
+...
+s:upsert({2, 2}, {{'=', 2, 2}})
+---
+...
+old_tuple, new_tuple
+---
+- null
+- [2, 2]
+...
+s:select()
+---
+- - [1, 1]
+ - [2, 2]
+...
+s:delete(1)
+---
+- [1, 1]
+...
+old_tuple, new_tuple
+---
+- [1, 1]
+- null
+...
+s:delete(2)
+---
+- [2, 2]
+...
+old_tuple, new_tuple
+---
+- [2, 2]
+- null
+...
+s:select()
+---
+- []
+...
+s:before_replace(nil, save)
+---
+...
+-- Returning 'new' from trigger doesn't affect statement.
+type(s:before_replace(nop))
+---
+- function
+...
+s:insert{1, 1}
+---
+- [1, 1]
+...
+s:update(1, {{'+', 2, 1}})
+---
+- [1, 2]
+...
+s:select()
+---
+- - [1, 2]
+...
+s:delete(1)
+---
+- [1, 2]
+...
+s:select()
+---
+- []
+...
+s:before_replace(nil, nop)
+---
+...
+-- Returning 'old' from trigger skips statement.
+s:insert{1, 1}
+---
+- [1, 1]
+...
+type(s:before_replace(ignore))
+---
+- function
+...
+s:insert{2, 2}
+---
+...
+s:update(1, {{'+', 2, 1}})
+---
+...
+s:delete(1)
+---
+...
+s:select()
+---
+- - [1, 1]
+...
+s:before_replace(nil, ignore)
+---
+...
+s:delete(1)
+---
+- [1, 1]
+...
+-- Returning nil from trigger turns statement into DELETE.
+s:insert{1, 1}
+---
+- [1, 1]
+...
+type(s:before_replace(delete))
+---
+- function
+...
+s:replace{1, 2}
+---
+- [1, 1]
+...
+s:select()
+---
+- []
+...
+s:before_replace(nil, delete)
+---
+...
+-- Update statement from trigger.
+type(s:before_replace(update))
+---
+- function
+...
+s:insert{1, 1, 1}
+---
+- [1, 1, 2]
+...
+s:update(1, {{'+', 2, 1}})
+---
+- [1, 2, 3]
+...
+s:select()
+---
+- - [1, 2, 3]
+...
+s:before_replace(nil, update)
+---
+...
+s:delete(1)
+---
+- [1, 2, 3]
+...
+-- Invalid return value.
+type(s:before_replace(bad_ret))
+---
+- function
+...
+s:insert{1, 1} -- error
+---
+- error: 'Invalid return value of space:before_replace trigger: expected tuple or
+ nil'
+...
+s:select()
+---
+- []
+...
+s:before_replace(nil, bad_ret)
+---
+...
+-- Update of the primary key from trigger is forbidden.
+s:insert{1, 1}
+---
+- [1, 1]
+...
+type(s:before_replace(bad_update))
+---
+- function
+...
+s:replace{1, 2}
+---
+- error: Attempt to modify a tuple field which is part of index 'primary' in space
+ 'test'
+...
+s:before_replace(nil, bad_update)
+---
+...
+s:delete(1)
+---
+- [1, 1]
+...
+-- Stacking triggers.
+old_tuple = nil
+---
+...
+new_tuple = nil
+---
+...
+type(s:before_replace(save))
+---
+- function
+...
+type(s:before_replace(update))
+---
+- function
+...
+s:insert{1, 1, 1}
+---
+- [1, 1, 2]
+...
+old_tuple, new_tuple
+---
+- null
+- [1, 1, 2]
+...
+s:before_replace(nil, save)
+---
+...
+s:before_replace(nil, update)
+---
+...
+s:delete{1}
+---
+- [1, 1, 2]
+...
+-- Issue DML from trigger.
+s2 = box.schema.space.create('test2')
+---
+...
+_ = s2:create_index('pk')
+---
+...
+cb = function(old, new) s2:auto_increment{old, new} end
+---
+...
+type(s:before_replace(cb))
+---
+- function
+...
+s:insert{1, 1}
+---
+- [1, 1]
+...
+s:replace{1, 2}
+---
+- [1, 2]
+...
+s:select()
+---
+- - [1, 2]
+...
+s2:select()
+---
+- - [1, null, [1, 1]]
+ - [2, [1, 1], [1, 2]]
+...
+s:before_replace(nil, cb)
+---
+...
+s:delete{1}
+---
+- [1, 2]
+...
+s2:drop()
+---
+...
+s:drop()
+---
+...
diff --git a/test/box/before_replace.test.lua b/test/box/before_replace.test.lua
new file mode 100644
index 00000000..afcabf51
--- /dev/null
+++ b/test/box/before_replace.test.lua
@@ -0,0 +1,113 @@
+s = box.schema.space.create('test')
+_ = s:create_index('primary')
+_ = s:create_index('secondary', {unique = false, parts = {2, 'unsigned'}})
+
+function fail(old, new) error('fail') end
+function save(old, new) old_tuple = old new_tuple = new end
+function nop(old, new) return new end
+function ignore(old, new) return old end
+function delete(old, new) return nil end
+function update(old, new) return box.tuple.update(new, {{'+', 3, 1}}) end
+function bad_ret(old, new) return 'test' end
+function bad_update(old, new) return box.tuple.update(new, {{'+', 1, 1}}) end
+
+-- Exception in trigger.
+type(s:before_replace(fail))
+s:insert{1, 1}
+s:select()
+s:before_replace(nil, fail)
+
+-- Check 'old' and 'new' trigger arguments.
+old_tuple = nil
+new_tuple = nil
+type(s:before_replace(save))
+s:insert{1, 1}
+old_tuple, new_tuple
+s:replace{1, 2}
+old_tuple, new_tuple
+s:update(1, {{'+', 2, 1}})
+old_tuple, new_tuple
+s:upsert({1, 1}, {{'=', 2, 1}})
+old_tuple, new_tuple
+s:upsert({2, 2}, {{'=', 2, 2}})
+old_tuple, new_tuple
+s:select()
+s:delete(1)
+old_tuple, new_tuple
+s:delete(2)
+old_tuple, new_tuple
+s:select()
+s:before_replace(nil, save)
+
+-- Returning 'new' from trigger doesn't affect statement.
+type(s:before_replace(nop))
+s:insert{1, 1}
+s:update(1, {{'+', 2, 1}})
+s:select()
+s:delete(1)
+s:select()
+s:before_replace(nil, nop)
+
+-- Returning 'old' from trigger skips statement.
+s:insert{1, 1}
+type(s:before_replace(ignore))
+s:insert{2, 2}
+s:update(1, {{'+', 2, 1}})
+s:delete(1)
+s:select()
+s:before_replace(nil, ignore)
+s:delete(1)
+
+-- Returning nil from trigger turns statement into DELETE.
+s:insert{1, 1}
+type(s:before_replace(delete))
+s:replace{1, 2}
+s:select()
+s:before_replace(nil, delete)
+
+-- Update statement from trigger.
+type(s:before_replace(update))
+s:insert{1, 1, 1}
+s:update(1, {{'+', 2, 1}})
+s:select()
+s:before_replace(nil, update)
+s:delete(1)
+
+-- Invalid return value.
+type(s:before_replace(bad_ret))
+s:insert{1, 1} -- error
+s:select()
+s:before_replace(nil, bad_ret)
+
+-- Update of the primary key from trigger is forbidden.
+s:insert{1, 1}
+type(s:before_replace(bad_update))
+s:replace{1, 2}
+s:before_replace(nil, bad_update)
+s:delete(1)
+
+-- Stacking triggers.
+old_tuple = nil
+new_tuple = nil
+type(s:before_replace(save))
+type(s:before_replace(update))
+s:insert{1, 1, 1}
+old_tuple, new_tuple
+s:before_replace(nil, save)
+s:before_replace(nil, update)
+s:delete{1}
+
+-- Issue DML from trigger.
+s2 = box.schema.space.create('test2')
+_ = s2:create_index('pk')
+cb = function(old, new) s2:auto_increment{old, new} end
+type(s:before_replace(cb))
+s:insert{1, 1}
+s:replace{1, 2}
+s:select()
+s2:select()
+s:before_replace(nil, cb)
+s:delete{1}
+s2:drop()
+
+s:drop()
diff --git a/test/box/misc.result b/test/box/misc.result
index cd2f76f5..a2b77eb5 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -332,8 +332,8 @@ t;
- 'box.error.ACCESS_DENIED : 42'
- 'box.error.CANT_CREATE_COLLATION : 150'
- 'box.error.USER_EXISTS : 46'
- - 'box.error.TUPLE_FORMAT_LIMIT : 16'
- - 'box.error.RTREE_RECT : 101'
+ - 'box.error.WAL_IO : 40'
+ - 'box.error.PROC_RET : 21'
- 'box.error.PRIV_GRANTED : 89'
- 'box.error.CREATE_SPACE : 9'
- 'box.error.GRANT : 88'
@@ -344,7 +344,7 @@ t;
- 'box.error.VINYL_MAX_TUPLE_SIZE : 139'
- 'box.error.LOAD_FUNCTION : 99'
- 'box.error.INVALID_XLOG : 74'
- - 'box.error.READ_VIEW_ABORTED : 130'
+ - 'box.error.PRIV_NOT_GRANTED : 91'
- 'box.error.TRANSACTION_CONFLICT : 97'
- 'box.error.GUEST_USER_PASSWORD : 96'
- 'box.error.PROC_C : 102'
@@ -355,7 +355,7 @@ t;
- 'box.error.CFG : 59'
- 'box.error.NO_SUCH_FIELD : 37'
- 'box.error.CONNECTION_TO_SELF : 117'
- - 'box.error.PROC_LUA : 32'
+ - 'box.error.FUNCTION_MAX : 54'
- 'box.error.ILLEGAL_PARAMS : 1'
- 'box.error.PARTIAL_KEY : 136'
- 'box.error.SAVEPOINT_NO_TRANSACTION : 114'
@@ -382,81 +382,82 @@ t;
- 'box.error.FUNCTION_EXISTS : 52'
- 'box.error.UPDATE_ARG_TYPE : 26'
- 'box.error.CROSS_ENGINE_TRANSACTION : 81'
- - 'box.error.IDENTIFIER : 70'
+ - 'box.error.FORMAT_MISMATCH_INDEX_PART : 27'
- 'box.error.FUNCTION_TX_ACTIVE : 30'
- 'box.error.NO_SUCH_ENGINE : 57'
- 'box.error.COMMIT_IN_SUB_STMT : 122'
- 'box.error.injection : table: <address>
- - 'box.error.LAST_DROP : 15'
- 'box.error.NULLABLE_MISMATCH : 153'
+ - 'box.error.LAST_DROP : 15'
+ - 'box.error.NO_SUCH_ROLE : 82'
- 'box.error.DECOMPRESSION : 124'
- 'box.error.CREATE_SEQUENCE : 142'
- 'box.error.CREATE_USER : 43'
- - 'box.error.RELOAD_CFG : 58'
- - 'box.error.INSTANCE_UUID_MISMATCH : 66'
- 'box.error.SPACE_FIELD_IS_DUPLICATE : 149'
+ - 'box.error.INSTANCE_UUID_MISMATCH : 66'
+ - 'box.error.SEQUENCE_OVERFLOW : 147'
- 'box.error.SYSTEM : 115'
- 'box.error.KEY_PART_IS_TOO_LONG : 118'
- - 'box.error.SEQUENCE_OVERFLOW : 147'
- - 'box.error.FUNCTION_MAX : 54'
+ - 'box.error.TUPLE_FORMAT_LIMIT : 16'
+ - 'box.error.BEFORE_REPLACE_RET : 53'
- 'box.error.NO_SUCH_SAVEPOINT : 61'
- - 'box.error.INVALID_MSGPACK : 20'
- 'box.error.TRUNCATE_SYSTEM_SPACE : 137'
+ - 'box.error.VY_QUOTA_TIMEOUT : 135'
- 'box.error.WRONG_INDEX_OPTIONS : 108'
- 'box.error.INVALID_VYLOG_FILE : 133'
- 'box.error.INDEX_FIELD_COUNT_LIMIT : 127'
- - 'box.error.VY_QUOTA_TIMEOUT : 135'
+ - 'box.error.READ_VIEW_ABORTED : 130'
- 'box.error.USER_MAX : 56'
- - 'box.error.PRIV_NOT_GRANTED : 91'
+ - 'box.error.PROTOCOL : 104'
- 'box.error.TUPLE_NOT_ARRAY : 22'
- 'box.error.KEY_PART_COUNT : 31'
- 'box.error.ALTER_SPACE : 12'
- 'box.error.ACTIVE_TRANSACTION : 79'
- 'box.error.EXACT_FIELD_COUNT : 38'
- 'box.error.DROP_SEQUENCE : 144'
- - 'box.error.ITERATOR_TYPE : 72'
- - 'box.error.PROC_RET : 21'
- - 'box.error.UPSERT_UNIQUE_SECONDARY_KEY : 105'
+ - 'box.error.INVALID_MSGPACK : 20'
+ - 'box.error.MORE_THAN_ONE_TUPLE : 41'
+ - 'box.error.RTREE_RECT : 101'
- 'box.error.SUB_STMT_MAX : 121'
- 'box.error.UNKNOWN_REQUEST_TYPE : 48'
- 'box.error.SPACE_EXISTS : 10'
- - 'box.error.FORMAT_MISMATCH_INDEX_PART : 27'
+ - 'box.error.PROC_LUA : 32'
- 'box.error.ROLE_NOT_GRANTED : 92'
- 'box.error.NO_SUCH_SPACE : 36'
- 'box.error.WRONG_INDEX_PARTS : 107'
- - 'box.error.UPDATE_INTEGER_OVERFLOW : 95'
+ - 'box.error.DROP_SPACE : 11'
- 'box.error.MIN_FIELD_COUNT : 39'
- 'box.error.REPLICASET_UUID_MISMATCH : 63'
- 'box.error.UPDATE_FIELD : 29'
- 'box.error.COMPRESSION : 119'
- 'box.error.INVALID_ORDER : 68'
- 'box.error.INDEX_EXISTS : 85'
+ - 'box.error.SPLICE : 25'
- 'box.error.UNKNOWN : 0'
- - 'box.error.MORE_THAN_ONE_TUPLE : 41'
- 'box.error.DROP_PRIMARY_KEY : 17'
- 'box.error.NULLABLE_PRIMARY : 152'
- 'box.error.NO_SUCH_SEQUENCE : 145'
- - 'box.error.DROP_SPACE : 11'
+ - 'box.error.RELOAD_CFG : 58'
- 'box.error.INVALID_UUID : 64'
- 'box.error.INJECTION : 8'
- 'box.error.TIMEOUT : 78'
+ - 'box.error.IDENTIFIER : 70'
+ - 'box.error.ITERATOR_TYPE : 72'
- 'box.error.REPLICA_MAX : 73'
- - 'box.error.SPLICE : 25'
- - 'box.error.UNSUPPORTED : 5'
- 'box.error.MISSING_REQUEST_FIELD : 69'
- 'box.error.MISSING_SNAPSHOT : 93'
- 'box.error.WRONG_SPACE_OPTIONS : 111'
- 'box.error.READONLY : 7'
- - 'box.error.WAL_IO : 40'
- - 'box.error.NO_SUCH_ROLE : 82'
+ - 'box.error.UNSUPPORTED : 5'
+ - 'box.error.UPDATE_INTEGER_OVERFLOW : 95'
- 'box.error.NO_CONNECTION : 77'
- 'box.error.INVALID_XLOG_ORDER : 76'
- - 'box.error.WRONG_SCHEMA_VERSION : 109'
+ - 'box.error.UPSERT_UNIQUE_SECONDARY_KEY : 105'
- 'box.error.ROLLBACK_IN_SUB_STMT : 123'
- - 'box.error.PROTOCOL : 104'
- - 'box.error.INVALID_XLOG_TYPE : 125'
- - 'box.error.INDEX_PART_TYPE_MISMATCH : 24'
+ - 'box.error.WRONG_SCHEMA_VERSION : 109'
- 'box.error.UNSUPPORTED_INDEX_FEATURE : 112'
+ - 'box.error.INDEX_PART_TYPE_MISMATCH : 24'
+ - 'box.error.INVALID_XLOG_TYPE : 125'
...
test_run:cmd("setopt delimiter ''");
---
diff --git a/test/replication/before_replace.result b/test/replication/before_replace.result
new file mode 100644
index 00000000..6865775f
--- /dev/null
+++ b/test/replication/before_replace.result
@@ -0,0 +1,168 @@
+--
+-- Using space:before_replace to resolve replication conflicts.
+--
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+SERVERS = { 'autobootstrap1', 'autobootstrap2', 'autobootstrap3' }
+---
+...
+-- Deploy a cluster.
+test_run:create_cluster(SERVERS)
+---
+...
+test_run:wait_fullmesh(SERVERS)
+---
+...
+-- Setup space:before_replace trigger on all replicas.
+-- The trigger favors tuples with a greater value.
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+test_run:cmd("switch autobootstrap1");
+---
+- true
+...
+_ = box.space.test:before_replace(function(old, new)
+ if old ~= nil and new ~= nil then
+ return new[2] > old[2] and new or old
+ end
+end);
+---
+...
+test_run:cmd("switch autobootstrap2");
+---
+- true
+...
+_ = box.space.test:before_replace(function(old, new)
+ if old ~= nil and new ~= nil then
+ return new[2] > old[2] and new or old
+ end
+end);
+---
+...
+test_run:cmd("switch autobootstrap3");
+---
+- true
+...
+_ = box.space.test:before_replace(function(old, new)
+ if old ~= nil and new ~= nil then
+ return new[2] > old[2] and new or old
+ end
+end);
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Stall replication and generate incompatible data
+-- on the replicas.
+test_run:cmd("switch autobootstrap1")
+---
+- true
+...
+box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.01)
+---
+- ok
+...
+for i = 1, 10 do box.space.test:replace{i, i % 3 == 1 and i * 10 or i} end
+---
+...
+test_run:cmd("switch autobootstrap2")
+---
+- true
+...
+box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.01)
+---
+- ok
+...
+for i = 1, 10 do box.space.test:replace{i, i % 3 == 2 and i * 10 or i} end
+---
+...
+test_run:cmd("switch autobootstrap3")
+---
+- true
+...
+box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.01)
+---
+- ok
+...
+for i = 1, 10 do box.space.test:replace{i, i % 3 == 0 and i * 10 or i} end
+---
+...
+-- Synchronize.
+test_run:cmd("switch default")
+---
+- true
+...
+vclock = test_run:get_cluster_vclock(SERVERS)
+---
+...
+vclock2 = test_run:wait_cluster_vclock(SERVERS, vclock)
+---
+...
+-- Check that all replicas converged to the same data.
+test_run:cmd("switch autobootstrap1")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 10]
+ - [2, 20]
+ - [3, 30]
+ - [4, 40]
+ - [5, 50]
+ - [6, 60]
+ - [7, 70]
+ - [8, 80]
+ - [9, 90]
+ - [10, 100]
+...
+test_run:cmd("switch autobootstrap2")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 10]
+ - [2, 20]
+ - [3, 30]
+ - [4, 40]
+ - [5, 50]
+ - [6, 60]
+ - [7, 70]
+ - [8, 80]
+ - [9, 90]
+ - [10, 100]
+...
+test_run:cmd("switch autobootstrap3")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 10]
+ - [2, 20]
+ - [3, 30]
+ - [4, 40]
+ - [5, 50]
+ - [6, 60]
+ - [7, 70]
+ - [8, 80]
+ - [9, 90]
+ - [10, 100]
+...
+-- Cleanup.
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:drop_cluster(SERVERS)
+---
+...
diff --git a/test/replication/before_replace.test.lua b/test/replication/before_replace.test.lua
new file mode 100644
index 00000000..0e5832b5
--- /dev/null
+++ b/test/replication/before_replace.test.lua
@@ -0,0 +1,63 @@
+--
+-- Using space:before_replace to resolve replication conflicts.
+--
+env = require('test_run')
+test_run = env.new()
+
+SERVERS = { 'autobootstrap1', 'autobootstrap2', 'autobootstrap3' }
+
+-- Deploy a cluster.
+test_run:create_cluster(SERVERS)
+test_run:wait_fullmesh(SERVERS)
+
+-- Setup space:before_replace trigger on all replicas.
+-- The trigger favors tuples with a greater value.
+test_run:cmd("setopt delimiter ';'")
+test_run:cmd("switch autobootstrap1");
+_ = box.space.test:before_replace(function(old, new)
+ if old ~= nil and new ~= nil then
+ return new[2] > old[2] and new or old
+ end
+end);
+test_run:cmd("switch autobootstrap2");
+_ = box.space.test:before_replace(function(old, new)
+ if old ~= nil and new ~= nil then
+ return new[2] > old[2] and new or old
+ end
+end);
+test_run:cmd("switch autobootstrap3");
+_ = box.space.test:before_replace(function(old, new)
+ if old ~= nil and new ~= nil then
+ return new[2] > old[2] and new or old
+ end
+end);
+test_run:cmd("setopt delimiter ''");
+
+-- Stall replication and generate incompatible data
+-- on the replicas.
+test_run:cmd("switch autobootstrap1")
+box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.01)
+for i = 1, 10 do box.space.test:replace{i, i % 3 == 1 and i * 10 or i} end
+test_run:cmd("switch autobootstrap2")
+box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.01)
+for i = 1, 10 do box.space.test:replace{i, i % 3 == 2 and i * 10 or i} end
+test_run:cmd("switch autobootstrap3")
+box.error.injection.set('ERRINJ_RELAY_TIMEOUT', 0.01)
+for i = 1, 10 do box.space.test:replace{i, i % 3 == 0 and i * 10 or i} end
+
+-- Synchronize.
+test_run:cmd("switch default")
+vclock = test_run:get_cluster_vclock(SERVERS)
+vclock2 = test_run:wait_cluster_vclock(SERVERS, vclock)
+
+-- Check that all replicas converged to the same data.
+test_run:cmd("switch autobootstrap1")
+box.space.test:select()
+test_run:cmd("switch autobootstrap2")
+box.space.test:select()
+test_run:cmd("switch autobootstrap3")
+box.space.test:select()
+
+-- Cleanup.
+test_run:cmd("switch default")
+test_run:drop_cluster(SERVERS)
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index b8933346..76043d66 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
script = master.lua
description = tarantool/box, replication
disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua before_replace.test.lua
config = suite.cfg
lua_libs = lua/fast_replica.lua
long_run = prune.test.lua
--
2.11.0
More information about the Tarantool-patches
mailing list