From: Vladimir Davydov <vdavydov.dev@gmail.com> To: tarantool-patches@freelists.org Subject: [PATCH v2 6/7] space: get rid of apply_initial_join_row method Date: Mon, 19 Aug 2019 19:53:19 +0300 [thread overview] Message-ID: <f104e5fc496c87259649f046012fc06156656078.1566233187.git.vdavydov.dev@gmail.com> (raw) In-Reply-To: <cover.1566233187.git.vdavydov.dev@gmail.com> There's no reason to use a special method instead of the generic space_execute_dml for applying rows received from a master during the initial join stage. Moreover, using the special method results in not running space.before_replace trigger, which makes it impossible to, for example, update space engine on a replica, see the on_schema_init test of the replication test suite. So this patch removes the special method altogether and makes the code that used it switch to space_execute_dml. Closes #4417 --- src/box/applier.cc | 32 ++++++----- src/box/blackhole.c | 1 - src/box/memtx_engine.c | 22 +++++--- src/box/memtx_space.c | 30 ----------- src/box/space.c | 9 ---- src/box/space.h | 16 ------ src/box/sysview.c | 1 - src/box/vinyl.c | 68 ------------------------ test/replication/on_schema_init.result | 6 +++ test/replication/on_schema_init.test.lua | 3 ++ 10 files changed, 43 insertions(+), 145 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index cf03ea16..4304ff05 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -202,23 +202,29 @@ applier_writer_f(va_list ap) static int apply_initial_join_row(struct xrow_header *row) { - struct txn *txn = txn_begin(); - if (txn == NULL) - return -1; - struct request request; - xrow_decode_dml(row, &request, dml_request_key_map(row->type)); - struct space *space = space_cache_find(request.space_id); - if (space == NULL) - goto rollback; - /* no access checks here - applier always works with admin privs */ - if (space_apply_initial_join_row(space, &request)) - goto rollback; int rc; + struct request request; + if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) + return -1; + struct space *space = space_cache_find(request.space_id); + if (space == NULL) + return -1; + struct txn *txn = txn_begin(); + if (txn == NULL) + return -1; + if (txn_begin_stmt(txn, space) != 0) + goto rollback; + /* no access checks here - applier always works with admin privs */ + struct tuple *unused; + if (space_execute_dml(space, txn, &request, &unused) != 0) + goto rollback_stmt; + if (txn_commit_stmt(txn, &request)) + goto rollback; rc = txn_commit(txn); - if (rc < 0) - return -1; fiber_gc(); return rc; +rollback_stmt: + txn_rollback_stmt(txn); rollback: txn_rollback(txn); fiber_gc(); diff --git a/src/box/blackhole.c b/src/box/blackhole.c index b69e543a..22ef324b 100644 --- a/src/box/blackhole.c +++ b/src/box/blackhole.c @@ -111,7 +111,6 @@ blackhole_space_create_index(struct space *space, struct index_def *def) static const struct space_vtab blackhole_space_vtab = { /* .destroy = */ blackhole_space_destroy, /* .bsize = */ generic_space_bsize, - /* .apply_initial_join_row = */ generic_space_apply_initial_join_row, /* .execute_replace = */ blackhole_space_execute_replace, /* .execute_delete = */ blackhole_space_execute_delete, /* .execute_update = */ blackhole_space_execute_update, diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index ea197cad..f6a33282 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -205,7 +205,7 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx, (uint32_t) row->type); return -1; } - + int rc; struct request request; if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) return -1; @@ -220,13 +220,15 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx, struct txn *txn = txn_begin(); if (txn == NULL) return -1; + if (txn_begin_stmt(txn, space) != 0) + goto rollback; /* no access checks here - applier always works with admin privs */ - if (space_apply_initial_join_row(space, &request) != 0) { - txn_rollback(txn); - fiber_gc(); - return -1; - } - int rc = txn_commit(txn); + struct tuple *unused; + if (space_execute_dml(space, txn, &request, &unused) != 0) + goto rollback_stmt; + if (txn_commit_stmt(txn, &request) != 0) + goto rollback; + rc = txn_commit(txn); /* * Don't let gc pool grow too much. Yet to * it before reading the next row, to make @@ -234,6 +236,12 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx, */ fiber_gc(); return rc; +rollback_stmt: + txn_rollback_stmt(txn); +rollback: + txn_rollback(txn); + fiber_gc(); + return -1; } /** Called at start to tell memtx to recover to a given LSN. */ diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c index cf29cf32..05efb45f 100644 --- a/src/box/memtx_space.c +++ b/src/box/memtx_space.c @@ -316,35 +316,6 @@ dup_replace_mode(uint32_t op) return op == IPROTO_INSERT ? DUP_INSERT : DUP_REPLACE_OR_INSERT; } -static int -memtx_space_apply_initial_join_row(struct space *space, struct request *request) -{ - struct memtx_space *memtx_space = (struct memtx_space *)space; - if (request->type != IPROTO_INSERT) { - diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, request->type); - return -1; - } - request->header->replica_id = 0; - struct txn *txn = in_txn(); - if (txn_begin_stmt(txn, space) != 0) - return -1; - struct txn_stmt *stmt = txn_current_stmt(txn); - stmt->new_tuple = memtx_tuple_new(space->format, request->tuple, - request->tuple_end); - if (stmt->new_tuple == NULL) - goto rollback; - tuple_ref(stmt->new_tuple); - if (memtx_space->replace(space, NULL, stmt->new_tuple, - DUP_INSERT, &stmt->old_tuple) != 0) - goto rollback; - return txn_commit_stmt(txn, request); - -rollback: - say_error("rollback: %s", diag_last_error(diag_get())->errmsg); - txn_rollback_stmt(txn); - return -1; -} - static int memtx_space_execute_replace(struct space *space, struct txn *txn, struct request *request, struct tuple **result) @@ -1168,7 +1139,6 @@ memtx_space_prepare_alter(struct space *old_space, struct space *new_space) static const struct space_vtab memtx_space_vtab = { /* .destroy = */ memtx_space_destroy, /* .bsize = */ memtx_space_bsize, - /* .apply_initial_join_row = */ memtx_space_apply_initial_join_row, /* .execute_replace = */ memtx_space_execute_replace, /* .execute_delete = */ memtx_space_execute_delete, /* .execute_update = */ memtx_space_execute_update, diff --git a/src/box/space.c b/src/box/space.c index 0d1ad3b3..226ac9c9 100644 --- a/src/box/space.c +++ b/src/box/space.c @@ -624,15 +624,6 @@ generic_space_bsize(struct space *space) return 0; } -int -generic_space_apply_initial_join_row(struct space *space, - struct request *request) -{ - (void)space; - (void)request; - return 0; -} - int generic_space_ephemeral_replace(struct space *space, const char *tuple, const char *tuple_end) diff --git a/src/box/space.h b/src/box/space.h index 8f593e93..7926aa65 100644 --- a/src/box/space.h +++ b/src/box/space.h @@ -59,8 +59,6 @@ struct space_vtab { /** Return binary size of a space. */ size_t (*bsize)(struct space *); - int (*apply_initial_join_row)(struct space *, struct request *); - int (*execute_replace)(struct space *, struct txn *, struct request *, struct tuple **result); int (*execute_delete)(struct space *, struct txn *, @@ -361,12 +359,6 @@ index_name_by_id(struct space *space, uint32_t id); int access_check_space(struct space *space, user_access_t access); -static inline int -space_apply_initial_join_row(struct space *space, struct request *request) -{ - return space->vtab->apply_initial_join_row(space, request); -} - /** * Execute a DML request on the given space. */ @@ -528,7 +520,6 @@ space_remove_ck_constraint(struct space *space, struct ck_constraint *ck); * Virtual method stubs. */ size_t generic_space_bsize(struct space *); -int generic_space_apply_initial_join_row(struct space *, struct request *); int generic_space_ephemeral_replace(struct space *, const char *, const char *); int generic_space_ephemeral_delete(struct space *, const char *); int generic_space_ephemeral_rowid_next(struct space *, uint64_t *); @@ -598,13 +589,6 @@ index_find_system_xc(struct space *space, uint32_t index_id) return index_find_xc(space, index_id); } -static inline void -space_apply_initial_join_row_xc(struct space *space, struct request *request) -{ - if (space_apply_initial_join_row(space, request) != 0) - diag_raise(); -} - static inline void space_check_index_def_xc(struct space *space, struct index_def *index_def) { diff --git a/src/box/sysview.c b/src/box/sysview.c index 46cf1e13..1fbe3aa2 100644 --- a/src/box/sysview.c +++ b/src/box/sysview.c @@ -490,7 +490,6 @@ sysview_space_create_index(struct space *space, struct index_def *def) static const struct space_vtab sysview_space_vtab = { /* .destroy = */ sysview_space_destroy, /* .bsize = */ generic_space_bsize, - /* .apply_initial_join_row = */ generic_space_apply_initial_join_row, /* .execute_replace = */ sysview_space_execute_replace, /* .execute_delete = */ sysview_space_execute_delete, /* .execute_update = */ sysview_space_execute_update, diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 80ed00a1..de4a06c4 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -3199,73 +3199,6 @@ out: return rc; } -static int -vinyl_space_apply_initial_join_row(struct space *space, struct request *request) -{ - assert(request->header != NULL); - struct vy_env *env = vy_env(space->engine); - - struct vy_tx *tx = vy_tx_begin(env->xm); - if (tx == NULL) - return -1; - - struct txn_stmt stmt; - memset(&stmt, 0, sizeof(stmt)); - - int rc = -1; - switch (request->type) { - case IPROTO_INSERT: - rc = vy_insert(env, tx, &stmt, space, request); - break; - case IPROTO_REPLACE: - rc = vy_replace(env, tx, &stmt, space, request); - break; - case IPROTO_UPSERT: - rc = vy_upsert(env, tx, &stmt, space, request); - break; - case IPROTO_DELETE: - rc = vy_delete(env, tx, &stmt, space, request); - break; - default: - diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, request->type); - break; - } - if (rc != 0) { - vy_tx_rollback(tx); - return -1; - } - - /* - * Account memory quota, see vinyl_engine_prepare() - * and vinyl_engine_commit() for more details about - * quota accounting. - */ - size_t reserved = tx->write_size; - if (vy_quota_use(&env->quota, VY_QUOTA_CONSUMER_TX, - reserved, TIMEOUT_INFINITY) != 0) - unreachable(); - - size_t mem_used_before = lsregion_used(&env->mem_env.allocator); - - rc = vy_tx_prepare(tx); - if (rc == 0) - vy_tx_commit(tx, 0); - else - vy_tx_rollback(tx); - - if (stmt.old_tuple != NULL) - tuple_unref(stmt.old_tuple); - if (stmt.new_tuple != NULL) - tuple_unref(stmt.new_tuple); - - size_t mem_used_after = lsregion_used(&env->mem_env.allocator); - assert(mem_used_after >= mem_used_before); - size_t used = mem_used_after - mem_used_before; - vy_quota_adjust(&env->quota, VY_QUOTA_CONSUMER_TX, reserved, used); - vy_regulator_check_dump_watermark(&env->regulator); - return rc; -} - /* }}} Replication */ /* {{{ Garbage collection */ @@ -4671,7 +4604,6 @@ static const struct engine_vtab vinyl_engine_vtab = { static const struct space_vtab vinyl_space_vtab = { /* .destroy = */ vinyl_space_destroy, /* .bsize = */ vinyl_space_bsize, - /* .apply_initial_join_row = */ vinyl_space_apply_initial_join_row, /* .execute_replace = */ vinyl_space_execute_replace, /* .execute_delete = */ vinyl_space_execute_delete, /* .execute_update = */ vinyl_space_execute_update, diff --git a/test/replication/on_schema_init.result b/test/replication/on_schema_init.result index 3f7ee0bd..6c2857d1 100644 --- a/test/replication/on_schema_init.result +++ b/test/replication/on_schema_init.result @@ -15,6 +15,12 @@ test_run:cmd('create server replica with rpl_master=default, script="replication test_engine = box.schema.space.create('test_engine', {engine='memtx'}) --- ... +-- Make sure that space.before_replace trigger is invoked for rows +-- received during both initial and final join stages. +box.snapshot() +--- +- ok +... test_local = box.schema.space.create('test_local', {is_local=false}) --- ... diff --git a/test/replication/on_schema_init.test.lua b/test/replication/on_schema_init.test.lua index 9bb9e477..016a61c1 100644 --- a/test/replication/on_schema_init.test.lua +++ b/test/replication/on_schema_init.test.lua @@ -9,6 +9,9 @@ test_run = env.new() test_run:cmd('create server replica with rpl_master=default, script="replication/replica_on_schema_init.lua"') test_engine = box.schema.space.create('test_engine', {engine='memtx'}) +-- Make sure that space.before_replace trigger is invoked for rows +-- received during both initial and final join stages. +box.snapshot() test_local = box.schema.space.create('test_local', {is_local=false}) test_engine.engine test_local.is_local -- 2.20.1
next prev parent reply other threads:[~2019-08-19 16:53 UTC|newest] Thread overview: 28+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-08-19 16:53 [PATCH v2 0/7] Join replicas off the current read view Vladimir Davydov 2019-08-19 16:53 ` [PATCH v2 1/7] vinyl: don't pin index for iterator lifetime Vladimir Davydov 2019-08-19 20:35 ` [tarantool-patches] " Konstantin Osipov 2019-08-19 16:53 ` [PATCH v2 2/7] vinyl: don't exempt dropped indexes from dump and compaction Vladimir Davydov 2019-08-19 20:47 ` [tarantool-patches] " Konstantin Osipov 2019-08-20 8:12 ` Vladimir Davydov 2019-08-20 9:02 ` Vladimir Davydov 2019-08-20 11:52 ` Konstantin Osipov 2019-08-20 14:16 ` Vladimir Davydov 2019-08-19 16:53 ` [PATCH v2 3/7] vinyl: get rid of vy_env::join_lsn Vladimir Davydov 2019-08-19 20:49 ` [tarantool-patches] " Konstantin Osipov 2019-08-19 16:53 ` [PATCH v2 4/7] memtx: use ref counting to pin indexes for snapshot Vladimir Davydov 2019-08-19 20:50 ` [tarantool-patches] " Konstantin Osipov 2019-08-19 16:53 ` [PATCH v2 5/7] memtx: enter small delayed free mode from snapshot iterator Vladimir Davydov 2019-08-19 20:51 ` [tarantool-patches] " Konstantin Osipov 2019-08-19 16:53 ` Vladimir Davydov [this message] 2019-08-19 20:54 ` [tarantool-patches] Re: [PATCH v2 6/7] space: get rid of apply_initial_join_row method Konstantin Osipov 2019-08-19 16:53 ` [PATCH v2 7/7] relay: join new replicas off read view Vladimir Davydov 2019-08-19 20:57 ` [tarantool-patches] " Konstantin Osipov 2019-08-20 8:16 ` Vladimir Davydov 2019-08-20 11:53 ` Konstantin Osipov 2019-08-20 12:05 ` Vladimir Davydov 2019-08-20 13:50 ` Konstantin Osipov 2019-08-20 14:03 ` Vladimir Davydov 2019-08-21 22:08 ` Konstantin Osipov 2019-08-22 8:05 ` Vladimir Davydov 2019-08-19 16:54 ` [PATCH v2 0/7] Join replicas off the current " Vladimir Davydov 2019-08-20 8:53 ` Vladimir Davydov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=f104e5fc496c87259649f046012fc06156656078.1566233187.git.vdavydov.dev@gmail.com \ --to=vdavydov.dev@gmail.com \ --cc=tarantool-patches@freelists.org \ --subject='Re: [PATCH v2 6/7] space: get rid of apply_initial_join_row method' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox