[PATCH v2 6/7] space: get rid of apply_initial_join_row method

Vladimir Davydov vdavydov.dev at gmail.com
Mon Aug 19 19:53:19 MSK 2019


There's no reason to use a special method instead of the generic
space_execute_dml for applying rows received from a master during the
initial join stage. Moreover, using the special method results in not
running space.before_replace trigger, which makes it impossible to, for
example, update space engine on a replica, see the on_schema_init test
of the replication test suite.

So this patch removes the special method altogether and makes the code
that used it switch to space_execute_dml.

Closes #4417
---
 src/box/applier.cc                       | 32 ++++++-----
 src/box/blackhole.c                      |  1 -
 src/box/memtx_engine.c                   | 22 +++++---
 src/box/memtx_space.c                    | 30 -----------
 src/box/space.c                          |  9 ----
 src/box/space.h                          | 16 ------
 src/box/sysview.c                        |  1 -
 src/box/vinyl.c                          | 68 ------------------------
 test/replication/on_schema_init.result   |  6 +++
 test/replication/on_schema_init.test.lua |  3 ++
 10 files changed, 43 insertions(+), 145 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index cf03ea16..4304ff05 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -202,23 +202,29 @@ applier_writer_f(va_list ap)
 static int
 apply_initial_join_row(struct xrow_header *row)
 {
-	struct txn *txn = txn_begin();
-	if (txn == NULL)
-		return -1;
-	struct request request;
-	xrow_decode_dml(row, &request, dml_request_key_map(row->type));
-	struct space *space = space_cache_find(request.space_id);
-	if (space == NULL)
-		goto rollback;
-	/* no access checks here - applier always works with admin privs */
-	if (space_apply_initial_join_row(space, &request))
-		goto rollback;
 	int rc;
+	struct request request;
+	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
+		return -1;
+	struct space *space = space_cache_find(request.space_id);
+	if (space == NULL)
+		return -1;
+	struct txn *txn = txn_begin();
+	if (txn == NULL)
+		return -1;
+	if (txn_begin_stmt(txn, space) != 0)
+		goto rollback;
+	/* no access checks here - applier always works with admin privs */
+	struct tuple *unused;
+	if (space_execute_dml(space, txn, &request, &unused) != 0)
+		goto rollback_stmt;
+	if (txn_commit_stmt(txn, &request))
+		goto rollback;
 	rc = txn_commit(txn);
-	if (rc < 0)
-		return -1;
 	fiber_gc();
 	return rc;
+rollback_stmt:
+	txn_rollback_stmt(txn);
 rollback:
 	txn_rollback(txn);
 	fiber_gc();
diff --git a/src/box/blackhole.c b/src/box/blackhole.c
index b69e543a..22ef324b 100644
--- a/src/box/blackhole.c
+++ b/src/box/blackhole.c
@@ -111,7 +111,6 @@ blackhole_space_create_index(struct space *space, struct index_def *def)
 static const struct space_vtab blackhole_space_vtab = {
 	/* .destroy = */ blackhole_space_destroy,
 	/* .bsize = */ generic_space_bsize,
-	/* .apply_initial_join_row = */ generic_space_apply_initial_join_row,
 	/* .execute_replace = */ blackhole_space_execute_replace,
 	/* .execute_delete = */ blackhole_space_execute_delete,
 	/* .execute_update = */ blackhole_space_execute_update,
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index ea197cad..f6a33282 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -205,7 +205,7 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
 			 (uint32_t) row->type);
 		return -1;
 	}
-
+	int rc;
 	struct request request;
 	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
 		return -1;
@@ -220,13 +220,15 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
 	struct txn *txn = txn_begin();
 	if (txn == NULL)
 		return -1;
+	if (txn_begin_stmt(txn, space) != 0)
+		goto rollback;
 	/* no access checks here - applier always works with admin privs */
-	if (space_apply_initial_join_row(space, &request) != 0) {
-		txn_rollback(txn);
-		fiber_gc();
-		return -1;
-	}
-	int rc = txn_commit(txn);
+	struct tuple *unused;
+	if (space_execute_dml(space, txn, &request, &unused) != 0)
+		goto rollback_stmt;
+	if (txn_commit_stmt(txn, &request) != 0)
+		goto rollback;
+	rc = txn_commit(txn);
 	/*
 	 * Don't let gc pool grow too much. Yet to
 	 * it before reading the next row, to make
@@ -234,6 +236,12 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
 	 */
 	fiber_gc();
 	return rc;
+rollback_stmt:
+	txn_rollback_stmt(txn);
+rollback:
+	txn_rollback(txn);
+	fiber_gc();
+	return -1;
 }
 
 /** Called at start to tell memtx to recover to a given LSN. */
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index cf29cf32..05efb45f 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -316,35 +316,6 @@ dup_replace_mode(uint32_t op)
 	return op == IPROTO_INSERT ? DUP_INSERT : DUP_REPLACE_OR_INSERT;
 }
 
-static int
-memtx_space_apply_initial_join_row(struct space *space, struct request *request)
-{
-	struct memtx_space *memtx_space = (struct memtx_space *)space;
-	if (request->type != IPROTO_INSERT) {
-		diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, request->type);
-		return -1;
-	}
-	request->header->replica_id = 0;
-	struct txn *txn = in_txn();
-	if (txn_begin_stmt(txn, space) != 0)
-		return -1;
-	struct txn_stmt *stmt = txn_current_stmt(txn);
-	stmt->new_tuple = memtx_tuple_new(space->format, request->tuple,
-					  request->tuple_end);
-	if (stmt->new_tuple == NULL)
-		goto rollback;
-	tuple_ref(stmt->new_tuple);
-	if (memtx_space->replace(space, NULL, stmt->new_tuple,
-				 DUP_INSERT, &stmt->old_tuple) != 0)
-		goto rollback;
-	return txn_commit_stmt(txn, request);
-
-rollback:
-	say_error("rollback: %s", diag_last_error(diag_get())->errmsg);
-	txn_rollback_stmt(txn);
-	return -1;
-}
-
 static int
 memtx_space_execute_replace(struct space *space, struct txn *txn,
 			    struct request *request, struct tuple **result)
@@ -1168,7 +1139,6 @@ memtx_space_prepare_alter(struct space *old_space, struct space *new_space)
 static const struct space_vtab memtx_space_vtab = {
 	/* .destroy = */ memtx_space_destroy,
 	/* .bsize = */ memtx_space_bsize,
-	/* .apply_initial_join_row = */ memtx_space_apply_initial_join_row,
 	/* .execute_replace = */ memtx_space_execute_replace,
 	/* .execute_delete = */ memtx_space_execute_delete,
 	/* .execute_update = */ memtx_space_execute_update,
diff --git a/src/box/space.c b/src/box/space.c
index 0d1ad3b3..226ac9c9 100644
--- a/src/box/space.c
+++ b/src/box/space.c
@@ -624,15 +624,6 @@ generic_space_bsize(struct space *space)
 	return 0;
 }
 
-int
-generic_space_apply_initial_join_row(struct space *space,
-				     struct request *request)
-{
-	(void)space;
-	(void)request;
-	return 0;
-}
-
 int
 generic_space_ephemeral_replace(struct space *space, const char *tuple,
 				const char *tuple_end)
diff --git a/src/box/space.h b/src/box/space.h
index 8f593e93..7926aa65 100644
--- a/src/box/space.h
+++ b/src/box/space.h
@@ -59,8 +59,6 @@ struct space_vtab {
 	/** Return binary size of a space. */
 	size_t (*bsize)(struct space *);
 
-	int (*apply_initial_join_row)(struct space *, struct request *);
-
 	int (*execute_replace)(struct space *, struct txn *,
 			       struct request *, struct tuple **result);
 	int (*execute_delete)(struct space *, struct txn *,
@@ -361,12 +359,6 @@ index_name_by_id(struct space *space, uint32_t id);
 int
 access_check_space(struct space *space, user_access_t access);
 
-static inline int
-space_apply_initial_join_row(struct space *space, struct request *request)
-{
-	return space->vtab->apply_initial_join_row(space, request);
-}
-
 /**
  * Execute a DML request on the given space.
  */
@@ -528,7 +520,6 @@ space_remove_ck_constraint(struct space *space, struct ck_constraint *ck);
  * Virtual method stubs.
  */
 size_t generic_space_bsize(struct space *);
-int generic_space_apply_initial_join_row(struct space *, struct request *);
 int generic_space_ephemeral_replace(struct space *, const char *, const char *);
 int generic_space_ephemeral_delete(struct space *, const char *);
 int generic_space_ephemeral_rowid_next(struct space *, uint64_t *);
@@ -598,13 +589,6 @@ index_find_system_xc(struct space *space, uint32_t index_id)
 	return index_find_xc(space, index_id);
 }
 
-static inline void
-space_apply_initial_join_row_xc(struct space *space, struct request *request)
-{
-	if (space_apply_initial_join_row(space, request) != 0)
-		diag_raise();
-}
-
 static inline void
 space_check_index_def_xc(struct space *space, struct index_def *index_def)
 {
diff --git a/src/box/sysview.c b/src/box/sysview.c
index 46cf1e13..1fbe3aa2 100644
--- a/src/box/sysview.c
+++ b/src/box/sysview.c
@@ -490,7 +490,6 @@ sysview_space_create_index(struct space *space, struct index_def *def)
 static const struct space_vtab sysview_space_vtab = {
 	/* .destroy = */ sysview_space_destroy,
 	/* .bsize = */ generic_space_bsize,
-	/* .apply_initial_join_row = */ generic_space_apply_initial_join_row,
 	/* .execute_replace = */ sysview_space_execute_replace,
 	/* .execute_delete = */ sysview_space_execute_delete,
 	/* .execute_update = */ sysview_space_execute_update,
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 80ed00a1..de4a06c4 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -3199,73 +3199,6 @@ out:
 	return rc;
 }
 
-static int
-vinyl_space_apply_initial_join_row(struct space *space, struct request *request)
-{
-	assert(request->header != NULL);
-	struct vy_env *env = vy_env(space->engine);
-
-	struct vy_tx *tx = vy_tx_begin(env->xm);
-	if (tx == NULL)
-		return -1;
-
-	struct txn_stmt stmt;
-	memset(&stmt, 0, sizeof(stmt));
-
-	int rc = -1;
-	switch (request->type) {
-	case IPROTO_INSERT:
-		rc = vy_insert(env, tx, &stmt, space, request);
-		break;
-	case IPROTO_REPLACE:
-		rc = vy_replace(env, tx, &stmt, space, request);
-		break;
-	case IPROTO_UPSERT:
-		rc = vy_upsert(env, tx, &stmt, space, request);
-		break;
-	case IPROTO_DELETE:
-		rc = vy_delete(env, tx, &stmt, space, request);
-		break;
-	default:
-		diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, request->type);
-		break;
-	}
-	if (rc != 0) {
-		vy_tx_rollback(tx);
-		return -1;
-	}
-
-	/*
-	 * Account memory quota, see vinyl_engine_prepare()
-	 * and vinyl_engine_commit() for more details about
-	 * quota accounting.
-	 */
-	size_t reserved = tx->write_size;
-	if (vy_quota_use(&env->quota, VY_QUOTA_CONSUMER_TX,
-			 reserved, TIMEOUT_INFINITY) != 0)
-		unreachable();
-
-	size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
-
-	rc = vy_tx_prepare(tx);
-	if (rc == 0)
-		vy_tx_commit(tx, 0);
-	else
-		vy_tx_rollback(tx);
-
-	if (stmt.old_tuple != NULL)
-		tuple_unref(stmt.old_tuple);
-	if (stmt.new_tuple != NULL)
-		tuple_unref(stmt.new_tuple);
-
-	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
-	assert(mem_used_after >= mem_used_before);
-	size_t used = mem_used_after - mem_used_before;
-	vy_quota_adjust(&env->quota, VY_QUOTA_CONSUMER_TX, reserved, used);
-	vy_regulator_check_dump_watermark(&env->regulator);
-	return rc;
-}
-
 /* }}} Replication */
 
 /* {{{ Garbage collection */
@@ -4671,7 +4604,6 @@ static const struct engine_vtab vinyl_engine_vtab = {
 static const struct space_vtab vinyl_space_vtab = {
 	/* .destroy = */ vinyl_space_destroy,
 	/* .bsize = */ vinyl_space_bsize,
-	/* .apply_initial_join_row = */ vinyl_space_apply_initial_join_row,
 	/* .execute_replace = */ vinyl_space_execute_replace,
 	/* .execute_delete = */ vinyl_space_execute_delete,
 	/* .execute_update = */ vinyl_space_execute_update,
diff --git a/test/replication/on_schema_init.result b/test/replication/on_schema_init.result
index 3f7ee0bd..6c2857d1 100644
--- a/test/replication/on_schema_init.result
+++ b/test/replication/on_schema_init.result
@@ -15,6 +15,12 @@ test_run:cmd('create server replica with rpl_master=default, script="replication
 test_engine = box.schema.space.create('test_engine', {engine='memtx'})
 ---
 ...
+-- Make sure that space.before_replace trigger is invoked for rows
+-- received during both initial and final join stages.
+box.snapshot()
+---
+- ok
+...
 test_local =  box.schema.space.create('test_local', {is_local=false})
 ---
 ...
diff --git a/test/replication/on_schema_init.test.lua b/test/replication/on_schema_init.test.lua
index 9bb9e477..016a61c1 100644
--- a/test/replication/on_schema_init.test.lua
+++ b/test/replication/on_schema_init.test.lua
@@ -9,6 +9,9 @@ test_run = env.new()
 test_run:cmd('create server replica with rpl_master=default, script="replication/replica_on_schema_init.lua"')
 
 test_engine = box.schema.space.create('test_engine', {engine='memtx'})
+-- Make sure that space.before_replace trigger is invoked for rows
+-- received during both initial and final join stages.
+box.snapshot()
 test_local =  box.schema.space.create('test_local', {is_local=false})
 test_engine.engine
 test_local.is_local
-- 
2.20.1




More information about the Tarantool-patches mailing list