From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH v2 6/7] space: get rid of apply_initial_join_row method Date: Mon, 19 Aug 2019 19:53:19 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit To: tarantool-patches@freelists.org List-ID: There's no reason to use a special method instead of the generic space_execute_dml for applying rows received from a master during the initial join stage. Moreover, using the special method results in not running space.before_replace trigger, which makes it impossible to, for example, update space engine on a replica, see the on_schema_init test of the replication test suite. So this patch removes the special method altogether and makes the code that used it switch to space_execute_dml. Closes #4417 --- src/box/applier.cc | 32 ++++++----- src/box/blackhole.c | 1 - src/box/memtx_engine.c | 22 +++++--- src/box/memtx_space.c | 30 ----------- src/box/space.c | 9 ---- src/box/space.h | 16 ------ src/box/sysview.c | 1 - src/box/vinyl.c | 68 ------------------------ test/replication/on_schema_init.result | 6 +++ test/replication/on_schema_init.test.lua | 3 ++ 10 files changed, 43 insertions(+), 145 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index cf03ea16..4304ff05 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -202,23 +202,29 @@ applier_writer_f(va_list ap) static int apply_initial_join_row(struct xrow_header *row) { - struct txn *txn = txn_begin(); - if (txn == NULL) - return -1; - struct request request; - xrow_decode_dml(row, &request, dml_request_key_map(row->type)); - struct space *space = space_cache_find(request.space_id); - if (space == NULL) - goto rollback; - /* no access checks here - applier always works with admin privs */ - if (space_apply_initial_join_row(space, &request)) - goto rollback; int rc; + struct request request; + if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) + return -1; + struct space *space = space_cache_find(request.space_id); + if (space == NULL) + return -1; + struct txn *txn = txn_begin(); + if (txn == NULL) + return -1; + if (txn_begin_stmt(txn, space) != 0) + goto rollback; + /* no access checks here - applier always works with admin privs */ + struct tuple *unused; + if (space_execute_dml(space, txn, &request, &unused) != 0) + goto rollback_stmt; + if (txn_commit_stmt(txn, &request)) + goto rollback; rc = txn_commit(txn); - if (rc < 0) - return -1; fiber_gc(); return rc; +rollback_stmt: + txn_rollback_stmt(txn); rollback: txn_rollback(txn); fiber_gc(); diff --git a/src/box/blackhole.c b/src/box/blackhole.c index b69e543a..22ef324b 100644 --- a/src/box/blackhole.c +++ b/src/box/blackhole.c @@ -111,7 +111,6 @@ blackhole_space_create_index(struct space *space, struct index_def *def) static const struct space_vtab blackhole_space_vtab = { /* .destroy = */ blackhole_space_destroy, /* .bsize = */ generic_space_bsize, - /* .apply_initial_join_row = */ generic_space_apply_initial_join_row, /* .execute_replace = */ blackhole_space_execute_replace, /* .execute_delete = */ blackhole_space_execute_delete, /* .execute_update = */ blackhole_space_execute_update, diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index ea197cad..f6a33282 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -205,7 +205,7 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx, (uint32_t) row->type); return -1; } - + int rc; struct request request; if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) return -1; @@ -220,13 +220,15 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx, struct txn *txn = txn_begin(); if (txn == NULL) return -1; + if (txn_begin_stmt(txn, space) != 0) + goto rollback; /* no access checks here - applier always works with admin privs */ - if (space_apply_initial_join_row(space, &request) != 0) { - txn_rollback(txn); - fiber_gc(); - return -1; - } - int rc = txn_commit(txn); + struct tuple *unused; + if (space_execute_dml(space, txn, &request, &unused) != 0) + goto rollback_stmt; + if (txn_commit_stmt(txn, &request) != 0) + goto rollback; + rc = txn_commit(txn); /* * Don't let gc pool grow too much. Yet to * it before reading the next row, to make @@ -234,6 +236,12 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx, */ fiber_gc(); return rc; +rollback_stmt: + txn_rollback_stmt(txn); +rollback: + txn_rollback(txn); + fiber_gc(); + return -1; } /** Called at start to tell memtx to recover to a given LSN. */ diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c index cf29cf32..05efb45f 100644 --- a/src/box/memtx_space.c +++ b/src/box/memtx_space.c @@ -316,35 +316,6 @@ dup_replace_mode(uint32_t op) return op == IPROTO_INSERT ? DUP_INSERT : DUP_REPLACE_OR_INSERT; } -static int -memtx_space_apply_initial_join_row(struct space *space, struct request *request) -{ - struct memtx_space *memtx_space = (struct memtx_space *)space; - if (request->type != IPROTO_INSERT) { - diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, request->type); - return -1; - } - request->header->replica_id = 0; - struct txn *txn = in_txn(); - if (txn_begin_stmt(txn, space) != 0) - return -1; - struct txn_stmt *stmt = txn_current_stmt(txn); - stmt->new_tuple = memtx_tuple_new(space->format, request->tuple, - request->tuple_end); - if (stmt->new_tuple == NULL) - goto rollback; - tuple_ref(stmt->new_tuple); - if (memtx_space->replace(space, NULL, stmt->new_tuple, - DUP_INSERT, &stmt->old_tuple) != 0) - goto rollback; - return txn_commit_stmt(txn, request); - -rollback: - say_error("rollback: %s", diag_last_error(diag_get())->errmsg); - txn_rollback_stmt(txn); - return -1; -} - static int memtx_space_execute_replace(struct space *space, struct txn *txn, struct request *request, struct tuple **result) @@ -1168,7 +1139,6 @@ memtx_space_prepare_alter(struct space *old_space, struct space *new_space) static const struct space_vtab memtx_space_vtab = { /* .destroy = */ memtx_space_destroy, /* .bsize = */ memtx_space_bsize, - /* .apply_initial_join_row = */ memtx_space_apply_initial_join_row, /* .execute_replace = */ memtx_space_execute_replace, /* .execute_delete = */ memtx_space_execute_delete, /* .execute_update = */ memtx_space_execute_update, diff --git a/src/box/space.c b/src/box/space.c index 0d1ad3b3..226ac9c9 100644 --- a/src/box/space.c +++ b/src/box/space.c @@ -624,15 +624,6 @@ generic_space_bsize(struct space *space) return 0; } -int -generic_space_apply_initial_join_row(struct space *space, - struct request *request) -{ - (void)space; - (void)request; - return 0; -} - int generic_space_ephemeral_replace(struct space *space, const char *tuple, const char *tuple_end) diff --git a/src/box/space.h b/src/box/space.h index 8f593e93..7926aa65 100644 --- a/src/box/space.h +++ b/src/box/space.h @@ -59,8 +59,6 @@ struct space_vtab { /** Return binary size of a space. */ size_t (*bsize)(struct space *); - int (*apply_initial_join_row)(struct space *, struct request *); - int (*execute_replace)(struct space *, struct txn *, struct request *, struct tuple **result); int (*execute_delete)(struct space *, struct txn *, @@ -361,12 +359,6 @@ index_name_by_id(struct space *space, uint32_t id); int access_check_space(struct space *space, user_access_t access); -static inline int -space_apply_initial_join_row(struct space *space, struct request *request) -{ - return space->vtab->apply_initial_join_row(space, request); -} - /** * Execute a DML request on the given space. */ @@ -528,7 +520,6 @@ space_remove_ck_constraint(struct space *space, struct ck_constraint *ck); * Virtual method stubs. */ size_t generic_space_bsize(struct space *); -int generic_space_apply_initial_join_row(struct space *, struct request *); int generic_space_ephemeral_replace(struct space *, const char *, const char *); int generic_space_ephemeral_delete(struct space *, const char *); int generic_space_ephemeral_rowid_next(struct space *, uint64_t *); @@ -598,13 +589,6 @@ index_find_system_xc(struct space *space, uint32_t index_id) return index_find_xc(space, index_id); } -static inline void -space_apply_initial_join_row_xc(struct space *space, struct request *request) -{ - if (space_apply_initial_join_row(space, request) != 0) - diag_raise(); -} - static inline void space_check_index_def_xc(struct space *space, struct index_def *index_def) { diff --git a/src/box/sysview.c b/src/box/sysview.c index 46cf1e13..1fbe3aa2 100644 --- a/src/box/sysview.c +++ b/src/box/sysview.c @@ -490,7 +490,6 @@ sysview_space_create_index(struct space *space, struct index_def *def) static const struct space_vtab sysview_space_vtab = { /* .destroy = */ sysview_space_destroy, /* .bsize = */ generic_space_bsize, - /* .apply_initial_join_row = */ generic_space_apply_initial_join_row, /* .execute_replace = */ sysview_space_execute_replace, /* .execute_delete = */ sysview_space_execute_delete, /* .execute_update = */ sysview_space_execute_update, diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 80ed00a1..de4a06c4 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -3199,73 +3199,6 @@ out: return rc; } -static int -vinyl_space_apply_initial_join_row(struct space *space, struct request *request) -{ - assert(request->header != NULL); - struct vy_env *env = vy_env(space->engine); - - struct vy_tx *tx = vy_tx_begin(env->xm); - if (tx == NULL) - return -1; - - struct txn_stmt stmt; - memset(&stmt, 0, sizeof(stmt)); - - int rc = -1; - switch (request->type) { - case IPROTO_INSERT: - rc = vy_insert(env, tx, &stmt, space, request); - break; - case IPROTO_REPLACE: - rc = vy_replace(env, tx, &stmt, space, request); - break; - case IPROTO_UPSERT: - rc = vy_upsert(env, tx, &stmt, space, request); - break; - case IPROTO_DELETE: - rc = vy_delete(env, tx, &stmt, space, request); - break; - default: - diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, request->type); - break; - } - if (rc != 0) { - vy_tx_rollback(tx); - return -1; - } - - /* - * Account memory quota, see vinyl_engine_prepare() - * and vinyl_engine_commit() for more details about - * quota accounting. - */ - size_t reserved = tx->write_size; - if (vy_quota_use(&env->quota, VY_QUOTA_CONSUMER_TX, - reserved, TIMEOUT_INFINITY) != 0) - unreachable(); - - size_t mem_used_before = lsregion_used(&env->mem_env.allocator); - - rc = vy_tx_prepare(tx); - if (rc == 0) - vy_tx_commit(tx, 0); - else - vy_tx_rollback(tx); - - if (stmt.old_tuple != NULL) - tuple_unref(stmt.old_tuple); - if (stmt.new_tuple != NULL) - tuple_unref(stmt.new_tuple); - - size_t mem_used_after = lsregion_used(&env->mem_env.allocator); - assert(mem_used_after >= mem_used_before); - size_t used = mem_used_after - mem_used_before; - vy_quota_adjust(&env->quota, VY_QUOTA_CONSUMER_TX, reserved, used); - vy_regulator_check_dump_watermark(&env->regulator); - return rc; -} - /* }}} Replication */ /* {{{ Garbage collection */ @@ -4671,7 +4604,6 @@ static const struct engine_vtab vinyl_engine_vtab = { static const struct space_vtab vinyl_space_vtab = { /* .destroy = */ vinyl_space_destroy, /* .bsize = */ vinyl_space_bsize, - /* .apply_initial_join_row = */ vinyl_space_apply_initial_join_row, /* .execute_replace = */ vinyl_space_execute_replace, /* .execute_delete = */ vinyl_space_execute_delete, /* .execute_update = */ vinyl_space_execute_update, diff --git a/test/replication/on_schema_init.result b/test/replication/on_schema_init.result index 3f7ee0bd..6c2857d1 100644 --- a/test/replication/on_schema_init.result +++ b/test/replication/on_schema_init.result @@ -15,6 +15,12 @@ test_run:cmd('create server replica with rpl_master=default, script="replication test_engine = box.schema.space.create('test_engine', {engine='memtx'}) --- ... +-- Make sure that space.before_replace trigger is invoked for rows +-- received during both initial and final join stages. +box.snapshot() +--- +- ok +... test_local = box.schema.space.create('test_local', {is_local=false}) --- ... diff --git a/test/replication/on_schema_init.test.lua b/test/replication/on_schema_init.test.lua index 9bb9e477..016a61c1 100644 --- a/test/replication/on_schema_init.test.lua +++ b/test/replication/on_schema_init.test.lua @@ -9,6 +9,9 @@ test_run = env.new() test_run:cmd('create server replica with rpl_master=default, script="replication/replica_on_schema_init.lua"') test_engine = box.schema.space.create('test_engine', {engine='memtx'}) +-- Make sure that space.before_replace trigger is invoked for rows +-- received during both initial and final join stages. +box.snapshot() test_local = box.schema.space.create('test_local', {is_local=false}) test_engine.engine test_local.is_local -- 2.20.1