[PATCH v2 12/14] vinyl: zap vy_stmt_new_surrogate_from_key

Vladimir Davydov vdavydov.dev at gmail.com
Wed Mar 13 11:52:58 MSK 2019


This heavy function isn't needed anymore, as we can now insert key
statements into the memory level.
---
 src/box/vinyl.c             |   4 +-
 src/box/vy_lsm.c            |   3 +-
 src/box/vy_mem.c            |   3 +-
 src/box/vy_point_lookup.c   |   2 +-
 src/box/vy_read_iterator.c  |   3 +-
 src/box/vy_run.c            |  31 +++-------
 src/box/vy_run.h            |   9 +--
 src/box/vy_stmt.c           | 147 +++++---------------------------------------
 src/box/vy_stmt.h           |  38 +++++-------
 src/box/vy_write_iterator.c |   2 +-
 test/vinyl/stat.result      |   2 +-
 11 files changed, 53 insertions(+), 191 deletions(-)

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index e9a4ae0d..f2d9ab55 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1719,8 +1719,8 @@ vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
 		}
 	} else {
 		assert(lsm->index_id == 0);
-		delete = vy_stmt_new_surrogate_delete_from_key(request->key,
-						pk->key_def, pk->mem_format);
+		delete = vy_stmt_new_delete(pk->env->key_format,
+					    request->key, request->key_end);
 		if (delete == NULL)
 			return -1;
 		if (space->index_count > 1)
diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c
index 28cd487c..a5e92887 100644
--- a/src/box/vy_lsm.c
+++ b/src/box/vy_lsm.c
@@ -898,7 +898,8 @@ vy_lsm_set(struct vy_lsm *lsm, struct vy_mem *mem,
 	lsm->stat.memory.count.bytes += tuple_size(stmt);
 
 	/* Abort transaction if format was changed by DDL */
-	if (format_id != tuple_format_id(mem->format)) {
+	if (!vy_stmt_is_key(stmt) &&
+	    format_id != tuple_format_id(mem->format)) {
 		diag_set(ClientError, ER_TRANSACTION_CONFLICT);
 		return -1;
 	}
diff --git a/src/box/vy_mem.c b/src/box/vy_mem.c
index c47a3c36..7fa90cbd 100644
--- a/src/box/vy_mem.c
+++ b/src/box/vy_mem.c
@@ -215,7 +215,8 @@ vy_mem_insert(struct vy_mem *mem, const struct tuple *stmt)
 {
 	assert(vy_stmt_type(stmt) != IPROTO_UPSERT);
 	/* Check if the statement can be inserted in the vy_mem. */
-	assert(stmt->format_id == tuple_format_id(mem->format));
+	assert(vy_stmt_is_key(stmt) ||
+	       stmt->format_id == tuple_format_id(mem->format));
 	/* The statement must be from a lsregion. */
 	assert(!vy_stmt_is_refable(stmt));
 	size_t size = tuple_size(stmt);
diff --git a/src/box/vy_point_lookup.c b/src/box/vy_point_lookup.c
index 51226ce8..9f7796eb 100644
--- a/src/box/vy_point_lookup.c
+++ b/src/box/vy_point_lookup.c
@@ -144,7 +144,7 @@ vy_point_lookup_scan_slice(struct vy_lsm *lsm, struct vy_slice *slice,
 	struct vy_run_iterator run_itr;
 	vy_run_iterator_open(&run_itr, &lsm->stat.disk.iterator, slice,
 			     ITER_EQ, key, rv, lsm->cmp_def, lsm->key_def,
-			     lsm->disk_format, lsm->index_id == 0);
+			     lsm->disk_format);
 	struct vy_history slice_history;
 	vy_history_create(&slice_history, &lsm->env->history_node_pool);
 	int rc = vy_run_iterator_next(&run_itr, &slice_history);
diff --git a/src/box/vy_read_iterator.c b/src/box/vy_read_iterator.c
index cc884bec..2864a280 100644
--- a/src/box/vy_read_iterator.c
+++ b/src/box/vy_read_iterator.c
@@ -622,8 +622,7 @@ vy_read_iterator_add_disk(struct vy_read_iterator *itr)
 				     &lsm->stat.disk.iterator, slice,
 				     iterator_type, itr->key,
 				     itr->read_view, lsm->cmp_def,
-				     lsm->key_def, lsm->disk_format,
-				     lsm->index_id == 0);
+				     lsm->key_def, lsm->disk_format);
 	}
 }
 
diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index 928ca57f..699941a8 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -707,22 +707,19 @@ vy_page_xrow(struct vy_page *page, uint32_t stmt_no,
  * Read raw stmt data from the page
  * @param page          Page.
  * @param stmt_no       Statement position in the page.
- * @param cmp_def       Key definition, including primary key parts.
  * @param format        Format for REPLACE/DELETE tuples.
- * @param is_primary    True if the index is primary.
  *
  * @retval not NULL Statement read from page.
  * @retval     NULL Memory error.
  */
 static struct tuple *
 vy_page_stmt(struct vy_page *page, uint32_t stmt_no,
-	     const struct key_def *cmp_def, struct tuple_format *format,
-	     bool is_primary)
+	     struct tuple_format *format)
 {
 	struct xrow_header xrow;
 	if (vy_page_xrow(page, stmt_no, &xrow) != 0)
 		return NULL;
-	return vy_stmt_decode(&xrow, cmp_def, format, is_primary);
+	return vy_stmt_decode(&xrow, format);
 }
 
 /**
@@ -1025,8 +1022,7 @@ vy_run_iterator_read(struct vy_run_iterator *itr,
 	int rc = vy_run_iterator_load_page(itr, pos.page_no, &page);
 	if (rc != 0)
 		return rc;
-	*stmt = vy_page_stmt(page, pos.pos_in_page, itr->cmp_def,
-			     itr->format, itr->is_primary);
+	*stmt = vy_page_stmt(page, pos.pos_in_page, itr->format);
 	if (*stmt == NULL)
 		return -1;
 	return 0;
@@ -1052,9 +1048,7 @@ vy_run_iterator_search_in_page(struct vy_run_iterator *itr,
 			iterator_type == ITER_LE ? -1 : 0);
 	while (beg != end) {
 		uint32_t mid = beg + (end - beg) / 2;
-		struct tuple *fnd_key = vy_page_stmt(page, mid, itr->cmp_def,
-						     itr->format,
-						     itr->is_primary);
+		struct tuple *fnd_key = vy_page_stmt(page, mid, itr->format);
 		if (fnd_key == NULL)
 			return end;
 		int cmp = vy_stmt_compare(fnd_key, key, itr->cmp_def);
@@ -1402,14 +1396,12 @@ vy_run_iterator_open(struct vy_run_iterator *itr,
 		     struct vy_slice *slice, enum iterator_type iterator_type,
 		     const struct tuple *key, const struct vy_read_view **rv,
 		     struct key_def *cmp_def, struct key_def *key_def,
-		     struct tuple_format *format,
-		     bool is_primary)
+		     struct tuple_format *format)
 {
 	itr->stat = stat;
 	itr->cmp_def = cmp_def;
 	itr->key_def = key_def;
 	itr->format = format;
-	itr->is_primary = is_primary;
 	itr->slice = slice;
 
 	itr->iterator_type = iterator_type;
@@ -2393,8 +2385,7 @@ vy_run_rebuild_index(struct vy_run *run, const char *dir,
 				continue;
 			}
 			++page_row_count;
-			struct tuple *tuple = vy_stmt_decode(&xrow, cmp_def,
-							     format, iid == 0);
+			struct tuple *tuple = vy_stmt_decode(&xrow, format);
 			if (tuple == NULL)
 				goto close_err;
 			if (bloom_builder != NULL &&
@@ -2574,8 +2565,7 @@ vy_slice_stream_search(struct vy_stmt_stream *virt_stream)
 	while (beg != end) {
 		uint32_t mid = beg + (end - beg) / 2;
 		struct tuple *fnd_key = vy_page_stmt(stream->page, mid,
-					stream->cmp_def, stream->format,
-					stream->is_primary);
+						     stream->format);
 		if (fnd_key == NULL)
 			return -1;
 		int cmp = vy_stmt_compare(fnd_key, stream->slice->begin,
@@ -2622,8 +2612,7 @@ vy_slice_stream_next(struct vy_stmt_stream *virt_stream, struct tuple **ret)
 
 	/* Read current tuple from the page */
 	struct tuple *tuple = vy_page_stmt(stream->page, stream->pos_in_page,
-					   stream->cmp_def, stream->format,
-					   stream->is_primary);
+					   stream->format);
 	if (tuple == NULL) /* Read or memory error */
 		return -1;
 
@@ -2696,8 +2685,7 @@ static const struct vy_stmt_stream_iface vy_slice_stream_iface = {
 
 void
 vy_slice_stream_open(struct vy_slice_stream *stream, struct vy_slice *slice,
-		     struct key_def *cmp_def, struct tuple_format *format,
-		     bool is_primary)
+		     struct key_def *cmp_def, struct tuple_format *format)
 {
 	stream->base.iface = &vy_slice_stream_iface;
 
@@ -2710,5 +2698,4 @@ vy_slice_stream_open(struct vy_slice_stream *stream, struct vy_slice *slice,
 	stream->cmp_def = cmp_def;
 	stream->format = format;
 	tuple_format_ref(format);
-	stream->is_primary = is_primary;
 }
diff --git a/src/box/vy_run.h b/src/box/vy_run.h
index 18ca1729..beb87b78 100644
--- a/src/box/vy_run.h
+++ b/src/box/vy_run.h
@@ -248,8 +248,6 @@ struct vy_run_iterator {
 	 * pages.
 	 */
 	struct tuple_format *format;
-	/** Set if this iterator is for a primary index. */
-	bool is_primary;
 	/** The run slice to iterate. */
 	struct vy_slice *slice;
 
@@ -523,7 +521,7 @@ vy_run_iterator_open(struct vy_run_iterator *itr,
 		     struct vy_slice *slice, enum iterator_type iterator_type,
 		     const struct tuple *key, const struct vy_read_view **rv,
 		     struct key_def *cmp_def, struct key_def *key_def,
-		     struct tuple_format *format, bool is_primary);
+		     struct tuple_format *format);
 
 /**
  * Advance a run iterator to the next key.
@@ -575,8 +573,6 @@ struct vy_slice_stream {
 	struct key_def *cmp_def;
 	/** Format for allocating REPLACE and DELETE tuples read from pages. */
 	struct tuple_format *format;
-	/** Set if this iterator is for a primary index. */
-	bool is_primary;
 };
 
 /**
@@ -584,8 +580,7 @@ struct vy_slice_stream {
  */
 void
 vy_slice_stream_open(struct vy_slice_stream *stream, struct vy_slice *slice,
-		     struct key_def *cmp_def, struct tuple_format *format,
-		     bool is_primary);
+		     struct key_def *cmp_def, struct tuple_format *format);
 
 /**
  * Run_writer fills a created run with statements one by one,
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index fd7850cc..2e26e843 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -371,6 +371,14 @@ vy_stmt_new_insert(struct tuple_format *format, const char *tuple_begin,
 }
 
 struct tuple *
+vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin,
+		   const char *tuple_end)
+{
+	return vy_stmt_new_with_ops(format, tuple_begin, tuple_end,
+				    NULL, 0, IPROTO_DELETE);
+}
+
+struct tuple *
 vy_stmt_replace_from_upsert(const struct tuple *upsert)
 {
 	assert(vy_stmt_type(upsert) == IPROTO_UPSERT);
@@ -393,122 +401,6 @@ vy_stmt_replace_from_upsert(const struct tuple *upsert)
 	return replace;
 }
 
-static struct tuple *
-vy_stmt_new_surrogate_from_key(const char *key, enum iproto_type type,
-			       const struct key_def *cmp_def,
-			       struct tuple_format *format)
-{
-	/* UPSERT can't be surrogate. */
-	assert(type != IPROTO_UPSERT);
-	struct region *region = &fiber()->gc;
-	size_t region_svp = region_used(region);
-
-	uint32_t field_count = format->index_field_count;
-	uint32_t iov_sz = sizeof(struct iovec) * format->total_field_count;
-	struct iovec *iov = region_alloc(region, iov_sz);
-	if (iov == NULL) {
-		diag_set(OutOfMemory, iov_sz, "region",
-			 "iov for surrogate key");
-		return NULL;
-	}
-	memset(iov, 0, iov_sz);
-	uint32_t part_count = mp_decode_array(&key);
-	assert(part_count == cmp_def->part_count);
-	assert(part_count <= format->total_field_count);
-	/**
-	 * Calculate bsize using format::min_tuple_size tuple
-	 * where parts_count nulls replaced with extracted keys.
-	 */
-	uint32_t bsize = format->min_tuple_size - mp_sizeof_nil() * part_count;
-	for (uint32_t i = 0; i < part_count; ++i) {
-		const struct key_part *part = &cmp_def->parts[i];
-		assert(part->fieldno < field_count);
-		struct tuple_field *field =
-			tuple_format_field_by_path(format, part->fieldno,
-						   part->path, part->path_len);
-		assert(field != NULL);
-		const char *svp = key;
-		iov[field->id].iov_base = (char *) key;
-		mp_next(&key);
-		iov[field->id].iov_len = key - svp;
-		bsize += key - svp;
-	}
-
-	struct tuple *stmt = vy_stmt_alloc(format, bsize);
-	if (stmt == NULL)
-		goto out;
-
-	char *raw = (char *) tuple_data(stmt);
-	uint32_t *field_map = (uint32_t *) raw;
-	memset((char *)field_map - format->field_map_size, 0,
-	       format->field_map_size);
-	char *wpos = mp_encode_array(raw, field_count);
-	struct tuple_field *field;
-	json_tree_foreach_entry_preorder(field, &format->fields.root,
-					 struct tuple_field, token) {
-		/*
-		 * Do not restore fields not involved in index
-		 * (except gaps in the mp array that may be filled
-		 * with nils later).
-		 */
-		if (!field->is_key_part)
-			continue;
-		if (field->token.type == JSON_TOKEN_NUM) {
-			/*
-			 * Write nil istead of omitted array
-			 * members.
-			 */
-			struct json_token **neighbors =
-				field->token.parent->children;
-			for (int i = field->token.sibling_idx - 1; i >= 0; i--) {
-				if (neighbors[i] != NULL &&
-				    json_tree_entry(neighbors[i],
-						    struct tuple_field,
-						    token)->is_key_part)
-					break;
-				wpos = mp_encode_nil(wpos);
-			}
-		} else {
-			/* Write a key string for map member. */
-			assert(field->token.type == JSON_TOKEN_STR);
-			const char *str = field->token.str;
-			uint32_t len = field->token.len;
-			wpos = mp_encode_str(wpos, str, len);
-		}
-		int max_child_idx = field->token.max_child_idx;
-		if (json_token_is_leaf(&field->token)) {
-			if (iov[field->id].iov_len == 0) {
-				wpos = mp_encode_nil(wpos);
-			} else {
-				memcpy(wpos, iov[field->id].iov_base,
-				       iov[field->id].iov_len);
-				uint32_t data_offset = wpos - raw;
-				int32_t slot = field->offset_slot;
-				if (slot != TUPLE_OFFSET_SLOT_NIL)
-					field_map[slot] = data_offset;
-				wpos += iov[field->id].iov_len;
-			}
-		} else if (field->type == FIELD_TYPE_ARRAY) {
-			wpos = mp_encode_array(wpos, max_child_idx + 1);
-		} else if (field->type == FIELD_TYPE_MAP) {
-			wpos = mp_encode_map(wpos, max_child_idx + 1);
-		}
-	}
-	assert(wpos == raw + bsize);
-	vy_stmt_set_type(stmt, type);
-out:
-	region_truncate(region, region_svp);
-	return stmt;
-}
-
-struct tuple *
-vy_stmt_new_surrogate_delete_from_key(const char *key, struct key_def *cmp_def,
-				      struct tuple_format *format)
-{
-	return vy_stmt_new_surrogate_from_key(key, IPROTO_DELETE,
-					      cmp_def, format);
-}
-
 struct tuple *
 vy_stmt_new_surrogate_delete_raw(struct tuple_format *format,
 				 const char *src_data, const char *src_data_end)
@@ -748,8 +640,6 @@ int
 vy_stmt_encode_primary(const struct tuple *value, struct key_def *key_def,
 		       uint32_t space_id, struct xrow_header *xrow)
 {
-	assert(!vy_stmt_is_key(value));
-
 	memset(xrow, 0, sizeof(*xrow));
 	enum iproto_type type = vy_stmt_type(value);
 	xrow->type = type;
@@ -763,8 +653,9 @@ vy_stmt_encode_primary(const struct tuple *value, struct key_def *key_def,
 	const char *extracted = NULL;
 	switch (type) {
 	case IPROTO_DELETE:
-		/* extract key */
-		extracted = tuple_extract_key(value, key_def, &size);
+		extracted = vy_stmt_is_key(value) ?
+			    tuple_data_range(value, &size) :
+			    tuple_extract_key(value, key_def, &size);
 		if (extracted == NULL)
 			return -1;
 		request.key = extracted;
@@ -829,9 +720,9 @@ vy_stmt_encode_secondary(const struct tuple *value, struct key_def *cmp_def,
 }
 
 struct tuple *
-vy_stmt_decode(struct xrow_header *xrow, const struct key_def *key_def,
-	       struct tuple_format *format, bool is_primary)
+vy_stmt_decode(struct xrow_header *xrow, struct tuple_format *format)
 {
+	struct vy_stmt_env *env = format->engine;
 	struct request request;
 	uint64_t key_map = dml_request_key_map(xrow->type);
 	key_map &= ~(1ULL << IPROTO_SPACE_ID); /* space_id is optional */
@@ -841,14 +732,10 @@ vy_stmt_decode(struct xrow_header *xrow, const struct key_def *key_def,
 	struct iovec ops;
 	switch (request.type) {
 	case IPROTO_DELETE:
-		if (is_primary)
-			stmt = vy_stmt_new_surrogate_from_key(request.key,
-							      IPROTO_DELETE,
-							      key_def, format);
-		else
-			stmt = vy_stmt_new_with_ops(format, request.key,
-						    request.key_end,
-						    NULL, 0, IPROTO_DELETE);
+		/* Always use key format for DELETE statements. */
+		stmt = vy_stmt_new_with_ops(env->key_format,
+					    request.key, request.key_end,
+					    NULL, 0, IPROTO_DELETE);
 		break;
 	case IPROTO_INSERT:
 	case IPROTO_REPLACE:
diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
index 7b93883d..187aa8dd 100644
--- a/src/box/vy_stmt.h
+++ b/src/box/vy_stmt.h
@@ -419,27 +419,6 @@ char *
 vy_key_dup(const char *key);
 
 /**
- * Create a new surrogate DELETE from @a key using format.
- *
- * Example:
- * key: {a3, a5}
- * key_def: { 3, 5 }
- * result: {nil, nil, a3, nil, a5}
- *
- * @param key     MessagePack array with key fields.
- * @param cmp_def Key definition of the result statement (incudes
- *                primary key parts).
- * @param format  Target tuple format.
- *
- * @retval not NULL Success.
- * @retval     NULL Memory or format error.
- */
-struct tuple *
-vy_stmt_new_surrogate_delete_from_key(const char *key,
-				      struct key_def *cmp_def,
-				      struct tuple_format *format);
-
-/**
  * Create a new surrogate DELETE from @a tuple using @a format.
  * A surrogate tuple has format->field_count fields from the source
  * with all unindexed fields replaced with MessagePack NIL.
@@ -497,6 +476,20 @@ struct tuple *
 vy_stmt_new_insert(struct tuple_format *format, const char *tuple_begin,
 		   const char *tuple_end);
 
+/**
+ * Create the DELETE statement from raw MessagePack data.
+ * @param format Format of a tuple for offsets generating.
+ * @param tuple_begin MessagePack data that contain an array of fields WITH the
+ *                    array header.
+ * @param tuple_end End of the array that begins from @param tuple_begin.
+ *
+ * @retval NULL     Memory allocation error.
+ * @retval not NULL Success.
+ */
+struct tuple *
+vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin,
+		   const char *tuple_end);
+
  /**
  * Create the UPSERT statement from raw MessagePack data.
  * @param tuple_begin MessagePack data that contain an array of fields WITH the
@@ -649,8 +642,7 @@ vy_stmt_encode_secondary(const struct tuple *value, struct key_def *cmp_def,
  * @retval NULL on error
  */
 struct tuple *
-vy_stmt_decode(struct xrow_header *xrow, const struct key_def *key_def,
-	       struct tuple_format *format, bool is_primary);
+vy_stmt_decode(struct xrow_header *xrow, struct tuple_format *format);
 
 /**
  * Format a statement into string.
diff --git a/src/box/vy_write_iterator.c b/src/box/vy_write_iterator.c
index 36e43bc9..ee2cb7ae 100644
--- a/src/box/vy_write_iterator.c
+++ b/src/box/vy_write_iterator.c
@@ -476,7 +476,7 @@ vy_write_iterator_new_slice(struct vy_stmt_stream *vstream,
 	if (src == NULL)
 		return -1;
 	vy_slice_stream_open(&src->slice_stream, slice, stream->cmp_def,
-			     disk_format, stream->is_primary);
+			     disk_format);
 	return 0;
 }
 
diff --git a/test/vinyl/stat.result b/test/vinyl/stat.result
index 01da5f14..08da1658 100644
--- a/test/vinyl/stat.result
+++ b/test/vinyl/stat.result
@@ -1391,7 +1391,7 @@ st2 = i2:stat()
 ...
 s:bsize()
 ---
-- 107449
+- 107199
 ...
 i1:len(), i2:len()
 ---
-- 
2.11.0




More information about the Tarantool-patches mailing list