[tarantool-patches] [PATCH v2 2/7] box: move checks for key findability from space_vtab

imeevma at tarantool.org imeevma at tarantool.org
Tue Jul 17 16:08:26 MSK 2018


In this patch checks exact_key_validate
for memtx and vy_unique_key_validate
for vinyl were moved from space_vtab
to box.cc.

Part of #3375.
---
Branch: https://github.com/tarantool/tarantool/compare/imeevma/gh-3375-lua-expose-ephemeral-spaces
Issue: https://github.com/tarantool/tarantool/issues/3375

 src/box/box.cc        | 72 ++++++++++++++++++++++++++++++++++++++++++++-------
 src/box/memtx_space.c |  4 ---
 src/box/vinyl.c       | 24 ++---------------
 src/box/vinyl.h       | 26 +++++++++++++++++++
 4 files changed, 90 insertions(+), 36 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 7c6312d..1dcbfbf 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -993,16 +993,50 @@ box_index_id_by_name(uint32_t space_id, const char *name, uint32_t len)
 }
 /** \endcond public */
 
+/**
+ * Check space for writeability.
+ */
+static inline int
+space_check_writable(struct space *space)
+{
+	if (!space_is_temporary(space) &&
+	    space_group_id(space) != GROUP_LOCAL &&
+	    box_check_writable() != 0)
+		return -1;
+	return 0;
+}
+
+/**
+ * Check if key is good enough
+ * to be used to find tuple.
+ */
+static inline int
+key_check_findable(struct space *space, uint32_t index_id, const char *key)
+{
+	if (space_is_memtx(space)) {
+		struct index *pk = index_find_unique(space, index_id);
+		if (pk == NULL)
+			return -1;
+		uint32_t part_count = mp_decode_array(&key);
+		if (exact_key_validate(pk->def->key_def, key, part_count) != 0)
+			return -1;
+	} else if (space_is_vinyl(space)) {
+		struct vy_lsm *lsm = vy_lsm_find_unique(space, index_id);
+		if (lsm == NULL)
+			return -1;
+		uint32_t part_count = mp_decode_array(&key);
+		if (vy_unique_key_validate(lsm, key, part_count))
+			return -1;
+	}
+	return 0;
+}
+
 int
 box_process1(struct request *request, box_tuple_t **result)
 {
 	/* Allow to write to temporary spaces in read-only mode. */
 	struct space *space = space_cache_find(request->space_id);
-	if (space == NULL)
-		return -1;
-	if (!space_is_temporary(space) &&
-	    space_group_id(space) != GROUP_LOCAL &&
-	    box_check_writable() != 0)
+	if (space == NULL || space_check_writable(space) != 0)
 		return -1;
 	return box_process_rw(request, space, result);
 }
@@ -1087,13 +1121,16 @@ box_insert(uint32_t space_id, const char *tuple, const char *tuple_end,
 	   box_tuple_t **result)
 {
 	mp_tuple_assert(tuple, tuple_end);
+	struct space *space = space_cache_find(space_id);
+	if (space == NULL || space_check_writable(space) != 0)
+		return -1;
 	struct request request;
 	memset(&request, 0, sizeof(request));
 	request.type = IPROTO_INSERT;
 	request.space_id = space_id;
 	request.tuple = tuple;
 	request.tuple_end = tuple_end;
-	return box_process1(&request, result);
+	return box_process_rw(&request, space, result);
 }
 
 int
@@ -1101,13 +1138,16 @@ box_replace(uint32_t space_id, const char *tuple, const char *tuple_end,
 	    box_tuple_t **result)
 {
 	mp_tuple_assert(tuple, tuple_end);
+	struct space *space = space_cache_find(space_id);
+	if (space == NULL || space_check_writable(space) != 0)
+		return -1;
 	struct request request;
 	memset(&request, 0, sizeof(request));
 	request.type = IPROTO_REPLACE;
 	request.space_id = space_id;
 	request.tuple = tuple;
 	request.tuple_end = tuple_end;
-	return box_process1(&request, result);
+	return box_process_rw(&request, space, result);
 }
 
 int
@@ -1115,6 +1155,10 @@ box_delete(uint32_t space_id, uint32_t index_id, const char *key,
 	   const char *key_end, box_tuple_t **result)
 {
 	mp_tuple_assert(key, key_end);
+	struct space *space = space_cache_find(space_id);
+	if (space == NULL || space_check_writable(space) != 0 ||
+	    key_check_findable(space, index_id, key) != 0)
+		return -1;
 	struct request request;
 	memset(&request, 0, sizeof(request));
 	request.type = IPROTO_DELETE;
@@ -1122,7 +1166,7 @@ box_delete(uint32_t space_id, uint32_t index_id, const char *key,
 	request.index_id = index_id;
 	request.key = key;
 	request.key_end = key_end;
-	return box_process1(&request, result);
+	return box_process_rw(&request, space, result);
 }
 
 int
@@ -1132,6 +1176,10 @@ box_update(uint32_t space_id, uint32_t index_id, const char *key,
 {
 	mp_tuple_assert(key, key_end);
 	mp_tuple_assert(ops, ops_end);
+	struct space *space = space_cache_find(space_id);
+	if (space == NULL || space_check_writable(space) != 0 ||
+	    key_check_findable(space, index_id, key) != 0)
+		return -1;
 	struct request request;
 	memset(&request, 0, sizeof(request));
 	request.type = IPROTO_UPDATE;
@@ -1143,7 +1191,8 @@ box_update(uint32_t space_id, uint32_t index_id, const char *key,
 	/** Legacy: in case of update, ops are passed in in request tuple */
 	request.tuple = ops;
 	request.tuple_end = ops_end;
-	return box_process1(&request, result);
+
+	return box_process_rw(&request, space, result);
 }
 
 int
@@ -1153,6 +1202,9 @@ box_upsert(uint32_t space_id, uint32_t index_id, const char *tuple,
 {
 	mp_tuple_assert(ops, ops_end);
 	mp_tuple_assert(tuple, tuple_end);
+	struct space *space = space_cache_find(space_id);
+	if (space == NULL || space_check_writable(space) != 0)
+		return -1;
 	struct request request;
 	memset(&request, 0, sizeof(request));
 	request.type = IPROTO_UPSERT;
@@ -1163,7 +1215,7 @@ box_upsert(uint32_t space_id, uint32_t index_id, const char *tuple,
 	request.tuple = tuple;
 	request.tuple_end = tuple_end;
 	request.index_base = index_base;
-	return box_process1(&request, result);
+	return box_process_rw(&request, space, result);
 }
 
 /**
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 185be92..f440951 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -362,8 +362,6 @@ memtx_space_execute_delete(struct space *space, struct txn *txn,
 		return -1;
 	const char *key = request->key;
 	uint32_t part_count = mp_decode_array(&key);
-	if (exact_key_validate(pk->def->key_def, key, part_count) != 0)
-		return -1;
 	if (index_get(pk, key, part_count, &stmt->old_tuple) != 0)
 		return -1;
 	struct tuple *old_tuple = NULL;
@@ -389,8 +387,6 @@ memtx_space_execute_update(struct space *space, struct txn *txn,
 		return -1;
 	const char *key = request->key;
 	uint32_t part_count = mp_decode_array(&key);
-	if (exact_key_validate(pk->def->key_def, key, part_count) != 0)
-		return -1;
 	if (index_get(pk, key, part_count, &stmt->old_tuple) != 0)
 		return -1;
 
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index a3e308e..10ec3ca 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -572,11 +572,7 @@ vy_lsm_find(struct space *space, uint32_t iid)
 	return vy_lsm(index);
 }
 
-/**
- * Wrapper around vy_lsm_find() which ensures that
- * the found index is unique.
- */
-static  struct vy_lsm *
+struct vy_lsm *
 vy_lsm_find_unique(struct space *space, uint32_t index_id)
 {
 	struct vy_lsm *lsm = vy_lsm_find(space, index_id);
@@ -1608,19 +1604,7 @@ error:
 	return -1;
 }
 
-/**
- * Check that the key can be used for search in a unique index
- * LSM tree.
- * @param  lsm        LSM tree for checking.
- * @param  key        MessagePack'ed data, the array without a
- *                    header.
- * @param  part_count Part count of the key.
- *
- * @retval  0 The key is valid.
- * @retval -1 The key is not valid, the appropriate error is set
- *            in the diagnostics area.
- */
-static inline int
+int
 vy_unique_key_validate(struct vy_lsm *lsm, const char *key,
 		       uint32_t part_count)
 {
@@ -1758,8 +1742,6 @@ vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
 	bool has_secondary = space->index_count > 1;
 	const char *key = request->key;
 	uint32_t part_count = mp_decode_array(&key);
-	if (vy_unique_key_validate(lsm, key, part_count))
-		return -1;
 	/*
 	 * There are two cases when need to get the full tuple
 	 * before deletion.
@@ -1850,8 +1832,6 @@ vy_update(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
 		return -1;
 	const char *key = request->key;
 	uint32_t part_count = mp_decode_array(&key);
-	if (vy_unique_key_validate(lsm, key, part_count))
-		return -1;
 
 	if (vy_lsm_full_by_key(lsm, tx, vy_tx_read_view(tx),
 			       key, part_count, &stmt->old_tuple) != 0)
diff --git a/src/box/vinyl.h b/src/box/vinyl.h
index 21f99e4..150ac64 100644
--- a/src/box/vinyl.h
+++ b/src/box/vinyl.h
@@ -33,6 +33,7 @@
 
 #include <stdbool.h>
 #include <stddef.h>
+#include <stdint.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -40,6 +41,8 @@ extern "C" {
 
 struct info_handler;
 struct vinyl_engine;
+struct vy_lsm;
+struct space;
 
 struct vinyl_engine *
 vinyl_engine_new(const char *dir, size_t memory,
@@ -88,6 +91,29 @@ vinyl_engine_set_too_long_threshold(struct vinyl_engine *vinyl,
 void
 vinyl_engine_set_snap_io_rate_limit(struct vinyl_engine *vinyl, double limit);
 
+/**
+ * Wrapper around vy_lsm_find() which ensures that
+ * the found index is unique.
+ */
+struct vy_lsm *
+vy_lsm_find_unique(struct space *space, uint32_t index_id);
+
+/**
+ * Check that the key can be used for search in a unique index
+ * LSM tree.
+ * @param  lsm        LSM tree for checking.
+ * @param  key        MessagePack'ed data, the array without a
+ *                    header.
+ * @param  part_count Part count of the key.
+ *
+ * @retval  0 The key is valid.
+ * @retval -1 The key is not valid, the appropriate error is set
+ *            in the diagnostics area.
+ */
+int
+vy_unique_key_validate(struct vy_lsm *lsm, const char *key,
+		       uint32_t part_count);
+
 #ifdef __cplusplus
 } /* extern "C" */
 
-- 
2.7.4





More information about the Tarantool-patches mailing list