[PATCH v2 6/7] space: get rid of apply_initial_join_row method
Vladimir Davydov
vdavydov.dev at gmail.com
Mon Aug 19 19:53:19 MSK 2019
There's no reason to use a special method instead of the generic
space_execute_dml for applying rows received from a master during the
initial join stage. Moreover, using the special method results in not
running space.before_replace trigger, which makes it impossible to, for
example, update space engine on a replica, see the on_schema_init test
of the replication test suite.
So this patch removes the special method altogether and makes the code
that used it switch to space_execute_dml.
Closes #4417
---
src/box/applier.cc | 32 ++++++-----
src/box/blackhole.c | 1 -
src/box/memtx_engine.c | 22 +++++---
src/box/memtx_space.c | 30 -----------
src/box/space.c | 9 ----
src/box/space.h | 16 ------
src/box/sysview.c | 1 -
src/box/vinyl.c | 68 ------------------------
test/replication/on_schema_init.result | 6 +++
test/replication/on_schema_init.test.lua | 3 ++
10 files changed, 43 insertions(+), 145 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index cf03ea16..4304ff05 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -202,23 +202,29 @@ applier_writer_f(va_list ap)
static int
apply_initial_join_row(struct xrow_header *row)
{
- struct txn *txn = txn_begin();
- if (txn == NULL)
- return -1;
- struct request request;
- xrow_decode_dml(row, &request, dml_request_key_map(row->type));
- struct space *space = space_cache_find(request.space_id);
- if (space == NULL)
- goto rollback;
- /* no access checks here - applier always works with admin privs */
- if (space_apply_initial_join_row(space, &request))
- goto rollback;
int rc;
+ struct request request;
+ if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
+ return -1;
+ struct space *space = space_cache_find(request.space_id);
+ if (space == NULL)
+ return -1;
+ struct txn *txn = txn_begin();
+ if (txn == NULL)
+ return -1;
+ if (txn_begin_stmt(txn, space) != 0)
+ goto rollback;
+ /* no access checks here - applier always works with admin privs */
+ struct tuple *unused;
+ if (space_execute_dml(space, txn, &request, &unused) != 0)
+ goto rollback_stmt;
+ if (txn_commit_stmt(txn, &request))
+ goto rollback;
rc = txn_commit(txn);
- if (rc < 0)
- return -1;
fiber_gc();
return rc;
+rollback_stmt:
+ txn_rollback_stmt(txn);
rollback:
txn_rollback(txn);
fiber_gc();
diff --git a/src/box/blackhole.c b/src/box/blackhole.c
index b69e543a..22ef324b 100644
--- a/src/box/blackhole.c
+++ b/src/box/blackhole.c
@@ -111,7 +111,6 @@ blackhole_space_create_index(struct space *space, struct index_def *def)
static const struct space_vtab blackhole_space_vtab = {
/* .destroy = */ blackhole_space_destroy,
/* .bsize = */ generic_space_bsize,
- /* .apply_initial_join_row = */ generic_space_apply_initial_join_row,
/* .execute_replace = */ blackhole_space_execute_replace,
/* .execute_delete = */ blackhole_space_execute_delete,
/* .execute_update = */ blackhole_space_execute_update,
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index ea197cad..f6a33282 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -205,7 +205,7 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
(uint32_t) row->type);
return -1;
}
-
+ int rc;
struct request request;
if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
return -1;
@@ -220,13 +220,15 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
struct txn *txn = txn_begin();
if (txn == NULL)
return -1;
+ if (txn_begin_stmt(txn, space) != 0)
+ goto rollback;
/* no access checks here - applier always works with admin privs */
- if (space_apply_initial_join_row(space, &request) != 0) {
- txn_rollback(txn);
- fiber_gc();
- return -1;
- }
- int rc = txn_commit(txn);
+ struct tuple *unused;
+ if (space_execute_dml(space, txn, &request, &unused) != 0)
+ goto rollback_stmt;
+ if (txn_commit_stmt(txn, &request) != 0)
+ goto rollback;
+ rc = txn_commit(txn);
/*
* Don't let gc pool grow too much. Yet to
* it before reading the next row, to make
@@ -234,6 +236,12 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
*/
fiber_gc();
return rc;
+rollback_stmt:
+ txn_rollback_stmt(txn);
+rollback:
+ txn_rollback(txn);
+ fiber_gc();
+ return -1;
}
/** Called at start to tell memtx to recover to a given LSN. */
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index cf29cf32..05efb45f 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -316,35 +316,6 @@ dup_replace_mode(uint32_t op)
return op == IPROTO_INSERT ? DUP_INSERT : DUP_REPLACE_OR_INSERT;
}
-static int
-memtx_space_apply_initial_join_row(struct space *space, struct request *request)
-{
- struct memtx_space *memtx_space = (struct memtx_space *)space;
- if (request->type != IPROTO_INSERT) {
- diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, request->type);
- return -1;
- }
- request->header->replica_id = 0;
- struct txn *txn = in_txn();
- if (txn_begin_stmt(txn, space) != 0)
- return -1;
- struct txn_stmt *stmt = txn_current_stmt(txn);
- stmt->new_tuple = memtx_tuple_new(space->format, request->tuple,
- request->tuple_end);
- if (stmt->new_tuple == NULL)
- goto rollback;
- tuple_ref(stmt->new_tuple);
- if (memtx_space->replace(space, NULL, stmt->new_tuple,
- DUP_INSERT, &stmt->old_tuple) != 0)
- goto rollback;
- return txn_commit_stmt(txn, request);
-
-rollback:
- say_error("rollback: %s", diag_last_error(diag_get())->errmsg);
- txn_rollback_stmt(txn);
- return -1;
-}
-
static int
memtx_space_execute_replace(struct space *space, struct txn *txn,
struct request *request, struct tuple **result)
@@ -1168,7 +1139,6 @@ memtx_space_prepare_alter(struct space *old_space, struct space *new_space)
static const struct space_vtab memtx_space_vtab = {
/* .destroy = */ memtx_space_destroy,
/* .bsize = */ memtx_space_bsize,
- /* .apply_initial_join_row = */ memtx_space_apply_initial_join_row,
/* .execute_replace = */ memtx_space_execute_replace,
/* .execute_delete = */ memtx_space_execute_delete,
/* .execute_update = */ memtx_space_execute_update,
diff --git a/src/box/space.c b/src/box/space.c
index 0d1ad3b3..226ac9c9 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -624,15 +624,6 @@ generic_space_bsize(struct space *space)
return 0;
}
-int
-generic_space_apply_initial_join_row(struct space *space,
- struct request *request)
-{
- (void)space;
- (void)request;
- return 0;
-}
-
int
generic_space_ephemeral_replace(struct space *space, const char *tuple,
const char *tuple_end)
diff --git a/src/box/space.h b/src/box/space.h
index 8f593e93..7926aa65 100644
--- a/src/box/space.h
+++ b/src/box/space.h
@@ -59,8 +59,6 @@ struct space_vtab {
/** Return binary size of a space. */
size_t (*bsize)(struct space *);
- int (*apply_initial_join_row)(struct space *, struct request *);
-
int (*execute_replace)(struct space *, struct txn *,
struct request *, struct tuple **result);
int (*execute_delete)(struct space *, struct txn *,
@@ -361,12 +359,6 @@ index_name_by_id(struct space *space, uint32_t id);
int
access_check_space(struct space *space, user_access_t access);
-static inline int
-space_apply_initial_join_row(struct space *space, struct request *request)
-{
- return space->vtab->apply_initial_join_row(space, request);
-}
-
/**
* Execute a DML request on the given space.
*/
@@ -528,7 +520,6 @@ space_remove_ck_constraint(struct space *space, struct ck_constraint *ck);
* Virtual method stubs.
*/
size_t generic_space_bsize(struct space *);
-int generic_space_apply_initial_join_row(struct space *, struct request *);
int generic_space_ephemeral_replace(struct space *, const char *, const char *);
int generic_space_ephemeral_delete(struct space *, const char *);
int generic_space_ephemeral_rowid_next(struct space *, uint64_t *);
@@ -598,13 +589,6 @@ index_find_system_xc(struct space *space, uint32_t index_id)
return index_find_xc(space, index_id);
}
-static inline void
-space_apply_initial_join_row_xc(struct space *space, struct request *request)
-{
- if (space_apply_initial_join_row(space, request) != 0)
- diag_raise();
-}
-
static inline void
space_check_index_def_xc(struct space *space, struct index_def *index_def)
{
diff --git a/src/box/sysview.c b/src/box/sysview.c
index 46cf1e13..1fbe3aa2 100644
--- a/src/box/sysview.c
+++ b/src/box/sysview.c
@@ -490,7 +490,6 @@ sysview_space_create_index(struct space *space, struct index_def *def)
static const struct space_vtab sysview_space_vtab = {
/* .destroy = */ sysview_space_destroy,
/* .bsize = */ generic_space_bsize,
- /* .apply_initial_join_row = */ generic_space_apply_initial_join_row,
/* .execute_replace = */ sysview_space_execute_replace,
/* .execute_delete = */ sysview_space_execute_delete,
/* .execute_update = */ sysview_space_execute_update,
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 80ed00a1..de4a06c4 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -3199,73 +3199,6 @@ out:
return rc;
}
-static int
-vinyl_space_apply_initial_join_row(struct space *space, struct request *request)
-{
- assert(request->header != NULL);
- struct vy_env *env = vy_env(space->engine);
-
- struct vy_tx *tx = vy_tx_begin(env->xm);
- if (tx == NULL)
- return -1;
-
- struct txn_stmt stmt;
- memset(&stmt, 0, sizeof(stmt));
-
- int rc = -1;
- switch (request->type) {
- case IPROTO_INSERT:
- rc = vy_insert(env, tx, &stmt, space, request);
- break;
- case IPROTO_REPLACE:
- rc = vy_replace(env, tx, &stmt, space, request);
- break;
- case IPROTO_UPSERT:
- rc = vy_upsert(env, tx, &stmt, space, request);
- break;
- case IPROTO_DELETE:
- rc = vy_delete(env, tx, &stmt, space, request);
- break;
- default:
- diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, request->type);
- break;
- }
- if (rc != 0) {
- vy_tx_rollback(tx);
- return -1;
- }
-
- /*
- * Account memory quota, see vinyl_engine_prepare()
- * and vinyl_engine_commit() for more details about
- * quota accounting.
- */
- size_t reserved = tx->write_size;
- if (vy_quota_use(&env->quota, VY_QUOTA_CONSUMER_TX,
- reserved, TIMEOUT_INFINITY) != 0)
- unreachable();
-
- size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
-
- rc = vy_tx_prepare(tx);
- if (rc == 0)
- vy_tx_commit(tx, 0);
- else
- vy_tx_rollback(tx);
-
- if (stmt.old_tuple != NULL)
- tuple_unref(stmt.old_tuple);
- if (stmt.new_tuple != NULL)
- tuple_unref(stmt.new_tuple);
-
- size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
- assert(mem_used_after >= mem_used_before);
- size_t used = mem_used_after - mem_used_before;
- vy_quota_adjust(&env->quota, VY_QUOTA_CONSUMER_TX, reserved, used);
- vy_regulator_check_dump_watermark(&env->regulator);
- return rc;
-}
-
/* }}} Replication */
/* {{{ Garbage collection */
@@ -4671,7 +4604,6 @@ static const struct engine_vtab vinyl_engine_vtab = {
static const struct space_vtab vinyl_space_vtab = {
/* .destroy = */ vinyl_space_destroy,
/* .bsize = */ vinyl_space_bsize,
- /* .apply_initial_join_row = */ vinyl_space_apply_initial_join_row,
/* .execute_replace = */ vinyl_space_execute_replace,
/* .execute_delete = */ vinyl_space_execute_delete,
/* .execute_update = */ vinyl_space_execute_update,
diff --git a/test/replication/on_schema_init.result b/test/replication/on_schema_init.result
index 3f7ee0bd..6c2857d1 100644
--- a/test/replication/on_schema_init.result
+++ b/test/replication/on_schema_init.result
@@ -15,6 +15,12 @@ test_run:cmd('create server replica with rpl_master=default, script="replication
test_engine = box.schema.space.create('test_engine', {engine='memtx'})
---
...
+-- Make sure that space.before_replace trigger is invoked for rows
+-- received during both initial and final join stages.
+box.snapshot()
+---
+- ok
+...
test_local = box.schema.space.create('test_local', {is_local=false})
---
...
diff --git a/test/replication/on_schema_init.test.lua b/test/replication/on_schema_init.test.lua
index 9bb9e477..016a61c1 100644
--- a/test/replication/on_schema_init.test.lua
+++ b/test/replication/on_schema_init.test.lua
@@ -9,6 +9,9 @@ test_run = env.new()
test_run:cmd('create server replica with rpl_master=default, script="replication/replica_on_schema_init.lua"')
test_engine = box.schema.space.create('test_engine', {engine='memtx'})
+-- Make sure that space.before_replace trigger is invoked for rows
+-- received during both initial and final join stages.
+box.snapshot()
test_local = box.schema.space.create('test_local', {is_local=false})
test_engine.engine
test_local.is_local
--
2.20.1
More information about the Tarantool-patches
mailing list