[tarantool-patches] [PATCH box 1/1] box: create bigrefs for tuples

Mergen Imeev imeevma at tarantool.org
Tue May 29 15:11:13 MSK 2018


From: Mergen Imeev <imeevma at gmail.com>

Due to limitation of number of bigrefs being only 65535 it
was possible to reach this limitation. This patch increases
capacity of referance counter to 4 billions.

Closes #3224
---
 src/box/tuple.c          |  18 +++++++
 src/box/tuple.h          | 135 +++++++++++++++++++++++++++++++++++++++++++----
 test/box/select.result   |   5 +-
 test/box/select.test.lua |   2 +-
 4 files changed, 146 insertions(+), 14 deletions(-)

diff --git a/src/box/tuple.c b/src/box/tuple.c
index 014f374..f4e1829 100644
--- a/src/box/tuple.c
+++ b/src/box/tuple.c
@@ -58,6 +58,12 @@ struct tuple *box_tuple_last;
 
 struct tuple_format *tuple_format_runtime;
 
+/**
+ * All bigrefs of tuples
+ * \sa tuple_ref
+ */
+struct tuple_bigrefs bigrefs;
+
 static void
 runtime_tuple_delete(struct tuple_format *format, struct tuple *tuple);
 
@@ -211,6 +217,16 @@ tuple_init(field_name_hash_f hash)
 
 	box_tuple_last = NULL;
 
+	bigrefs.size = 0;
+	bigrefs.capacity = TUPLE_BIGREF_MIN_CAPACITY;
+	bigrefs.refs = (uint32_t *) malloc(bigrefs.capacity * sizeof(uint32_t));
+	if (bigrefs.refs == NULL) {
+		diag_set(OutOfMemory, bigrefs.capacity *
+			sizeof(uint32_t), "malloc", "bigrefs.refs");
+		return -1;
+	}
+
+
 	if (coll_id_cache_init() != 0)
 		return -1;
 
@@ -265,6 +281,8 @@ tuple_free(void)
 	tuple_format_free();
 
 	coll_id_cache_destroy();
+
+	free(bigrefs.refs);
 }
 
 box_tuple_format_t *
diff --git a/src/box/tuple.h b/src/box/tuple.h
index e2384dd..c7dbe3b 100644
--- a/src/box/tuple.h
+++ b/src/box/tuple.h
@@ -327,6 +327,19 @@ struct PACKED tuple
 	 */
 };
 
+/**
+ * @brief Container for bigrefs.
+ */
+struct tuple_bigrefs
+{
+	/** reference counter */
+	uint32_t *refs;
+	/** index of last non-zero element */
+	uint16_t size;
+	/** capacity of container */
+	uint16_t capacity;
+};
+
 /** Size of the tuple including size of struct tuple. */
 static inline size_t
 tuple_size(const struct tuple *tuple)
@@ -774,7 +787,41 @@ tuple_field_uuid(const struct tuple *tuple, int fieldno,
 	return 0;
 }
 
-enum { TUPLE_REF_MAX = UINT16_MAX };
+enum {
+	TUPLE_BIGREFS_FACTOR = 2,
+	TUPLE_BIGREF_MAX = UINT32_MAX,
+	TUPLE_REF_MAX = UINT16_MAX >> 1,
+	TUPLE_REF_FLAG = UINT16_MAX ^ TUPLE_REF_MAX,
+	TUPLE_BIGREF_MIN_CAPACITY = 16
+};
+
+extern struct tuple_bigrefs bigrefs;
+
+/**
+ * Return index for new bigref
+ * @retval Number less than or equal 0x7fff Success.
+ * @retval Number more than 0x7fff Too many refs error.
+ */
+static inline uint16_t
+get_bigref_index(void)
+{
+	for(uint16_t i = 0; i < bigrefs.size; ++i)
+		if(bigrefs.refs[i] == 0)
+			return i;
+	if(bigrefs.size == bigrefs.capacity - 1) {
+		if(bigrefs.capacity > TUPLE_REF_MAX / TUPLE_BIGREFS_FACTOR)
+			return TUPLE_REF_FLAG;
+		bigrefs.capacity *= TUPLE_BIGREFS_FACTOR;
+		bigrefs.refs = (uint32_t *) realloc(bigrefs.refs,
+				bigrefs.capacity * sizeof(uint32_t));
+		if (bigrefs.refs == NULL) {
+			diag_set(OutOfMemory, bigrefs.capacity *
+				sizeof(uint32_t), "realloc", "bigrefs.refs");
+			return TUPLE_REF_FLAG;
+		}
+	}
+	return bigrefs.size++;
+}
 
 /**
  * Increment tuple reference counter.
@@ -785,15 +832,55 @@ enum { TUPLE_REF_MAX = UINT16_MAX };
 static inline int
 tuple_ref(struct tuple *tuple)
 {
-	if (tuple->refs + 1 > TUPLE_REF_MAX) {
-		diag_set(ClientError, ER_TUPLE_REF_OVERFLOW);
-		return -1;
+	if(tuple->refs == TUPLE_REF_MAX) {
+		uint16_t index = get_bigref_index();
+		if(index > TUPLE_REF_MAX) {
+			diag_set(ClientError, ER_TUPLE_REF_OVERFLOW);
+			return -1;
+		}
+		tuple->refs = TUPLE_REF_FLAG | index;
+		bigrefs.refs[index] = TUPLE_REF_MAX;
 	}
-	tuple->refs++;
+	if((tuple->refs & TUPLE_REF_FLAG) > 0) {
+		uint16_t index = tuple->refs ^ TUPLE_REF_FLAG;
+		if (bigrefs.refs[index] > TUPLE_BIGREF_MAX - 1) {
+			diag_set(ClientError, ER_TUPLE_REF_OVERFLOW);
+			return -1;
+		}
+		bigrefs.refs[index]++;
+	} else tuple->refs++;
 	return 0;
 }
 
 /**
+ * Check if capacity of bigrefs can be reduced
+ */
+static inline void
+bigrefs_check_capacity(void)
+{
+	uint16_t tmp_size;
+	uint16_t tmp_capacity = bigrefs.capacity;
+	for(uint16_t i = 0; i < bigrefs.capacity; ++i)
+		if(bigrefs.refs[i] > 0)
+			tmp_size = i;
+	while(tmp_capacity > TUPLE_BIGREF_MIN_CAPACITY &&
+	      tmp_size < tmp_capacity / TUPLE_BIGREFS_FACTOR) {
+		tmp_capacity /= TUPLE_BIGREFS_FACTOR;
+		if(tmp_capacity < TUPLE_BIGREF_MIN_CAPACITY)
+			tmp_capacity = TUPLE_BIGREF_MIN_CAPACITY;
+	}
+	bigrefs.size = tmp_size;
+	if(tmp_capacity < bigrefs.capacity) {
+		bigrefs.capacity = tmp_capacity;
+		bigrefs.refs = (uint32_t *) realloc(bigrefs.refs,
+				bigrefs.capacity * sizeof(uint32_t));
+		if (bigrefs.refs == NULL)
+			diag_set(OutOfMemory, bigrefs.capacity *
+				 sizeof(uint32_t), "malloc", "bigrefs.refs");
+	}
+}
+
+/**
  * Decrement tuple reference counter. If it has reached zero, free the tuple.
  *
  * @pre tuple->refs + count >= 0
@@ -802,8 +889,20 @@ static inline void
 tuple_unref(struct tuple *tuple)
 {
 	assert(tuple->refs - 1 >= 0);
+	uint16_t index = tuple->refs ^ TUPLE_REF_FLAG;
 
-	tuple->refs--;
+	if((tuple->refs & TUPLE_REF_FLAG) > 0) {
+		bigrefs.refs[index]--;
+	} else {
+		tuple->refs--;
+	}
+
+	if((tuple->refs & TUPLE_REF_FLAG) > 0 &&
+	   (bigrefs.refs[index] < TUPLE_REF_MAX)) {
+		tuple->refs = bigrefs.refs[index];
+		bigrefs.refs[index] = 0;
+		bigrefs_check_capacity();
+	}
 
 	if (tuple->refs == 0)
 		tuple_delete(tuple);
@@ -824,11 +923,27 @@ tuple_bless(struct tuple *tuple)
 {
 	assert(tuple != NULL);
 	/* Ensure tuple can be referenced at least once after return */
-	if (tuple->refs + 2 > TUPLE_REF_MAX) {
-		diag_set(ClientError, ER_TUPLE_REF_OVERFLOW);
-		return NULL;
+
+	if(tuple->refs == TUPLE_REF_MAX) {
+		uint16_t index = get_bigref_index();
+		if(index > TUPLE_REF_MAX) {
+			diag_set(ClientError, ER_TUPLE_REF_OVERFLOW);
+			return NULL;
+		}
+		tuple->refs = TUPLE_REF_FLAG | index;
+		bigrefs.refs[index] = TUPLE_REF_MAX;
+	}
+	if((tuple->refs & TUPLE_REF_FLAG) > 0) {
+		uint16_t index = tuple->refs ^ TUPLE_REF_FLAG;
+		if (bigrefs.refs[index] > TUPLE_BIGREF_MAX - 2) {
+			diag_set(ClientError, ER_TUPLE_REF_OVERFLOW);
+			return NULL;
+		}
+		bigrefs.refs[index]++;
+	} else {
+		tuple->refs++;
 	}
-	tuple->refs++;
+
 	/* Remove previous tuple */
 	if (likely(box_tuple_last != NULL))
 		tuple_unref(box_tuple_last); /* do not throw */
diff --git a/test/box/select.result b/test/box/select.result
index 4aed706..d2beb9e 100644
--- a/test/box/select.result
+++ b/test/box/select.result
@@ -634,13 +634,12 @@ lots_of_links = {}
 ref_count = 0
 ---
 ...
-while (true) do table.insert(lots_of_links, s:get{0}) ref_count = ref_count + 1 end
+while (ref_count < 100000) do table.insert(lots_of_links, s:get{0}) ref_count = ref_count + 1 end
 ---
-- error: Tuple reference counter overflow
 ...
 ref_count
 ---
-- 65531
+- 100000
 ...
 lots_of_links = {}
 ---
diff --git a/test/box/select.test.lua b/test/box/select.test.lua
index 54c2ecc..5e3071d 100644
--- a/test/box/select.test.lua
+++ b/test/box/select.test.lua
@@ -129,7 +129,7 @@ index = s:create_index('primary', { type = 'tree' })
 a = s:insert{0}
 lots_of_links = {}
 ref_count = 0
-while (true) do table.insert(lots_of_links, s:get{0}) ref_count = ref_count + 1 end
+while (ref_count < 100000) do table.insert(lots_of_links, s:get{0}) ref_count = ref_count + 1 end
 ref_count
 lots_of_links = {}
 s:drop()
-- 
2.7.4





More information about the Tarantool-patches mailing list