[tarantool-patches] Re: [tarantool-patches] Re: [PATCH v7 1/1] box: create bigrefs for tuples

Мерген Имеев imeevma at tarantool.org
Sat Jun 9 21:41:04 MSK 2018




>Суббота,  9 июня 2018, 18:15 +03:00 от Vladislav Shpilevoy <v.shpilevoy at tarantool.org>:
>
>Thanks for the patch! I have pushed a separate commit with my fixes, but
>it does not pass the test. Please, fix it.
>
>And use unit.h helpers: ok, isnt, is, fail etc to test something instead
>of assertions. Assert is omitted in Release build, and the test will test
>nothing. 
Done.
>
>
>On 09/06/2018 17:40,  imeevma at tarantool.org wrote:
>> Due to limitation of reference counters for tuple being only
>> 65535 it was possible to reach this limitation. This patch
>> increases capacity of reference counters to 4 billions.
>> 
>> Closes #3224
>> ---
>> Branch:  https://github.com/tarantool/tarantool/commits/imeevma/gh-3224-tuple-bigrefs
>> Issue:  https://github.com/tarantool/tarantool/issues/3224
>> 
>>   src/box/box.cc                |  20 +++---
>>   src/box/errcode.h             |   2 +-
>>   src/box/index.cc              |  20 +++---
>>   src/box/lua/tuple.c           |   5 +-
>>   src/box/port.c                |   8 +--
>>   src/box/tuple.c               | 143 ++++++++++++++++++++++++++++++++++++++++--
>>   src/box/tuple.h               | 105 ++++++++++++-------------------
>>   src/box/vinyl.c               |  38 +++++------
>>   test/box/misc.result          |  39 ++++++------
>>   test/box/select.result        |  43 +++++++++++--
>>   test/box/select.test.lua      |  21 +++++--
>>   test/unit/CMakeLists.txt      |   3 +
>>   test/unit/tuple_bigref.c      | 117 ++++++++++++++++++++++++++++++++++
>>   test/unit/tuple_bigref.result |   5 ++
>>   14 files changed, 413 insertions(+), 156 deletions(-)
>>   create mode 100644 test/unit/tuple_bigref.c
>>   create mode 100644 test/unit/tuple_bigref.result
>> 
>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index c728a4c..4257861 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -174,20 +174,22 @@ process_rw(struct request *request, struct space *space, struct tuple **result)
>>   		txn_rollback_stmt();
>>   		return -1;
>>   	}
>> +	if (result == NULL)
>> +		return txn_commit_stmt(txn, request);
>> +	*result = tuple;
>> +	if (tuple == NULL)
>> +		return txn_commit_stmt(txn, request);
>>   	/*
>>   	 * Pin the tuple locally before the commit,
>>   	 * otherwise it may go away during yield in
>>   	 * when WAL is written in autocommit mode.
>>   	 */
>> -	TupleRefNil ref(tuple);
>> -	if (txn_commit_stmt(txn, request) != 0)
>> -		return -1;
>> -	if (result != NULL) {
>> -		if (tuple != NULL && tuple_bless(tuple) == NULL)
>> -			return -1;
>> -		*result = tuple;
>> -	}
>> -	return 0;
>> +	tuple_ref(tuple);
>> +	int rc = txn_commit_stmt(txn, request);
>> +	if (rc == 0)
>> +		tuple_bless(tuple);
>> +	tuple_unref(tuple);
>> +	return rc;
>>   }
>> 
>>   void
>> diff --git a/src/box/errcode.h b/src/box/errcode.h
>> index a0759f8..e009524 100644
>> --- a/src/box/errcode.h
>> +++ b/src/box/errcode.h
>> @@ -138,7 +138,7 @@ struct errcode_record {
>>   	/* 83 */_(ER_ROLE_EXISTS,		"Role '%s' already exists") \
>>   	/* 84 */_(ER_CREATE_ROLE,		"Failed to create role '%s': %s") \
>>   	/* 85 */_(ER_INDEX_EXISTS,		"Index '%s' already exists") \
>> -	/* 86 */_(ER_TUPLE_REF_OVERFLOW,	"Tuple reference counter overflow") \
>> +	/* 86 */_(ER_UNUSED6,			"") \
>>   	/* 87 */_(ER_ROLE_LOOP,			"Granting role '%s' to role '%s' would create a loop") \
>>   	/* 88 */_(ER_GRANT,			"Incorrect grant arguments: %s") \
>>   	/* 89 */_(ER_PRIV_GRANTED,		"User '%s' already has %s access on %s '%s'") \
>> diff --git a/src/box/index.cc b/src/box/index.cc
>> index 3c62ec1..f992bc9 100644
>> --- a/src/box/index.cc
>> +++ b/src/box/index.cc
>> @@ -220,8 +220,8 @@ box_index_random(uint32_t space_id, uint32_t index_id, uint32_t rnd,
>>   	/* No tx management, random() is for approximation anyway. */
>>   	if (index_random(index, rnd, result) != 0)
>>   		return -1;
>> -	if (*result != NULL && tuple_bless(*result) == NULL)
>> -		return -1;
>> +	if (*result != NULL)
>> +		tuple_bless(*result);
>>   	return 0;
>>   }
>> 
>> @@ -253,8 +253,8 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key,
>>   	txn_commit_ro_stmt(txn);
>>   	/* Count statistics. */
>>   	rmean_collect(rmean_box, IPROTO_SELECT, 1);
>> -	if (*result != NULL && tuple_bless(*result) == NULL)
>> -		return -1;
>> +	if (*result != NULL)
>> +		tuple_bless(*result);
>>   	return 0;
>>   }
>> 
>> @@ -285,8 +285,8 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key,
>>   		return -1;
>>   	}
>>   	txn_commit_ro_stmt(txn);
>> -	if (*result != NULL && tuple_bless(*result) == NULL)
>> -		return -1;
>> +	if (*result != NULL)
>> +		tuple_bless(*result);
>>   	return 0;
>>   }
>> 
>> @@ -317,8 +317,8 @@ box_index_max(uint32_t space_id, uint32_t index_id, const char *key,
>>   		return -1;
>>   	}
>>   	txn_commit_ro_stmt(txn);
>> -	if (*result != NULL && tuple_bless(*result) == NULL)
>> -		return -1;
>> +	if (*result != NULL)
>> +		tuple_bless(*result);
>>   	return 0;
>>   }
>> 
>> @@ -397,8 +397,8 @@ box_iterator_next(box_iterator_t *itr, box_tuple_t **result)
>>   	assert(result != NULL);
>>   	if (iterator_next(itr, result) != 0)
>>   		return -1;
>> -	if (*result != NULL && tuple_bless(*result) == NULL)
>> -		return -1;
>> +	if (*result != NULL)
>> +		tuple_bless(*result);
>>   	return 0;
>>   }
>> 
>> diff --git a/src/box/lua/tuple.c b/src/box/lua/tuple.c
>> index  8057331 ..22fe696 100644
>> --- a/src/box/lua/tuple.c
>> +++ b/src/box/lua/tuple.c
>> @@ -496,10 +496,7 @@ luaT_pushtuple(struct lua_State *L, box_tuple_t *tuple)
>>   		luaL_pushcdata(L, CTID_CONST_STRUCT_TUPLE_REF);
>>   	*ptr = tuple;
>>   	/* The order is important - first reference tuple, next set gc */
>> -	if (box_tuple_ref(tuple) != 0) {
>> -		luaT_error(L);
>> -		return;
>> -	}
>> +	box_tuple_ref(tuple);
>>   	lua_pushcfunction(L, lbox_tuple_gc);
>>   	luaL_setcdatagc(L, -2);
>>   }
>> diff --git a/src/box/port.c b/src/box/port.c
>> index 03f6be7..9b6b858 100644
>> --- a/src/box/port.c
>> +++ b/src/box/port.c
>> @@ -45,8 +45,7 @@ port_tuple_add(struct port *base, struct tuple *tuple)
>>   	struct port_tuple *port = port_tuple(base);
>>   	struct port_tuple_entry *e;
>>   	if (port->size == 0) {
>> -		if (tuple_ref(tuple) != 0)
>> -			return -1;
>> +		tuple_ref(tuple);
>>   		e = &port->first_entry;
>>   		port->first = port->last = e;
>>   	} else {
>> @@ -55,10 +54,7 @@ port_tuple_add(struct port *base, struct tuple *tuple)
>>   			diag_set(OutOfMemory, sizeof(*e), "mempool_alloc", "e");
>>   			return -1;
>>   		}
>> -		if (tuple_ref(tuple) != 0) {
>> -			mempool_free(&port_tuple_entry_pool, e);
>> -			return -1;
>> -		}
>> +		tuple_ref(tuple);
>>   		port->last->next = e;
>>   		port->last = e;
>>   	}
>> diff --git a/src/box/tuple.c b/src/box/tuple.c
>> index 014f374..b632183 100644
>> --- a/src/box/tuple.c
>> +++ b/src/box/tuple.c
>> @@ -48,6 +48,28 @@ enum {
>>   	OBJSIZE_MIN = 16,
>>   };
>> 
>> +/**
>> + * Container for big reference counters. Contains array of big
>> + * reference counters, size of this array and number of non-zero
>> + * big reference counters. When reference counter of tuple becomes
>> + * more than 32767, field refs of this tuple becomes index of big
>> + * reference counter in big reference counter array and field
>> + * is_bigref is set true. The moment big reference becomes equal
>> + * 32767 it is set to 0, refs of the tuple becomes 32767 and
>> + * is_bigref becomes false. Big reference counter can be equal to
>> + * 0 or be more than 32767.
>> + */
>> +static struct bigref_list {
>> +	/** Array of big reference counters. */
>> +	uint32_t *refs;
>> +	/** Number of non-zero elements in the array. */
>> +	uint16_t size;
>> +	/** Capacity of the array. */
>> +	uint16_t capacity;
>> +	/** Index of first free element. */
>> +	uint16_t vacant_index;
>> +} bigref_list;
>> +
>>   static const double ALLOC_FACTOR = 1.05;
>> 
>>   /**
>> @@ -151,6 +173,13 @@ tuple_validate_raw(struct tuple_format *format, const char *tuple)
>>   	return 0;
>>   }
>> 
>> +/** Initialize big references container. */
>> +static inline void
>> +bigref_list_create()
>> +{
>> +	memset(&bigref_list, 0, sizeof(bigref_list));
>> +}
>> +
>>   /**
>>    * Incremented on every snapshot and is used to distinguish tuples
>>    * which were created after start of a snapshot (these tuples can
>> @@ -211,6 +240,8 @@ tuple_init(field_name_hash_f hash)
>> 
>>   	box_tuple_last = NULL;
>> 
>> +	bigref_list_create();
>> +
>>   	if (coll_id_cache_init() != 0)
>>   		return -1;
>> 
>> @@ -244,6 +275,107 @@ tuple_arena_create(struct slab_arena *arena, struct quota *quota,
>>   	}
>>   }
>> 
>> +enum {
>> +	BIGREF_FACTOR = 2,
>> +	BIGREF_MAX = UINT32_MAX,
>> +	BIGREF_MIN_CAPACITY = 16,
>> +	/**
>> +	 * Only 15 bits are available for bigref list index in
>> +	 * struct tuple.
>> +	 */
>> +	BIGREF_MAX_CAPACITY = UINT16_MAX >> 1
>> +};
>> +
>> +/** Destroy big references and free memory that was allocated. */
>> +static inline void
>> +bigref_list_reset()
>> +{
>> +	free(bigref_list.refs);
>> +	bigref_list_create();
>> +}
>> +
>> +/**
>> + * Increase capacity of bigref_list.
>> + */
>> +static inline void
>> +bigref_list_increase_capacity()
>> +{
>> +	assert(bigref_list.capacity == bigref_list.vacant_index);
>> +	uint32_t *refs = bigref_list.refs;
>> +	uint16_t capacity = bigref_list.capacity;
>> +	if (capacity == 0)
>> +		capacity = BIGREF_MIN_CAPACITY;
>> +	else if (capacity < BIGREF_MAX_CAPACITY)
>> +		capacity = MIN(capacity * BIGREF_FACTOR, BIGREF_MAX_CAPACITY);
>> +	else
>> +		panic("Too many big references");
>> +	refs = (uint32_t *) realloc(refs, capacity * sizeof(*refs));
>> +	if (refs == NULL) {
>> +		panic("failed to reallocate %zu bytes: Cannot allocate "\
>> +		      "memory.", capacity * sizeof(*refs));
>> +	}
>> +	for(uint16_t i = bigref_list.capacity; i < capacity; ++i) {
>> +		refs[i] = i + 1;
>> +	}
>> +	bigref_list.refs = refs;
>> +	bigref_list.capacity = capacity;
>> +}
>> +
>> +/**
>> + * Return index for new big reference counter and allocate memory
>> + * if needed.
>> + * @retval index for new big reference counter.
>> + */
>> +static inline uint16_t
>> +bigref_list_new_index()
>> +{
>> +	if (bigref_list.size == bigref_list.capacity)
>> +		bigref_list_increase_capacity();
>> +	++bigref_list.size;
>> +	uint16_t vacant_index = bigref_list.vacant_index;
>> +	bigref_list.vacant_index = bigref_list.refs[vacant_index];
>> +	return vacant_index;
>> +}
>> +
>> +void
>> +tuple_ref_slow(struct tuple *tuple)
>> +{
>> +	assert(tuple->is_bigref || tuple->refs == TUPLE_REF_MAX);
>> +	if (! tuple->is_bigref) {
>> +		tuple->ref_index = bigref_list_new_index();
>> +		tuple->is_bigref = true;
>> +		bigref_list.refs[tuple->ref_index] = TUPLE_REF_MAX;
>> +	} else if (bigref_list.refs[tuple->ref_index] == BIGREF_MAX) {
>> +		panic("Tuple big reference counter overflow");
>> +	}
>> +	bigref_list.refs[tuple->ref_index]++;
>> +}
>> +
>> +/**
>> + * Free index and add it to free-index-list. Free memory
>> + * when size == 0.
>> + */
>> +static inline void
>> +bigref_list_delete_index(uint16_t index)
>> +{
>> +	bigref_list.refs[index] = bigref_list.vacant_index;
>> +	bigref_list.vacant_index = index;
>> +	if (--bigref_list.size == 0)
>> +		bigref_list_reset();
>> +}
>> +
>> +void
>> +tuple_unref_slow(struct tuple *tuple)
>> +{
>> +	assert(tuple->is_bigref &&
>> +	       bigref_list.refs[tuple->ref_index] > TUPLE_REF_MAX);
>> +	if(--bigref_list.refs[tuple->ref_index] == TUPLE_REF_MAX) {
>> +		bigref_list_delete_index(tuple->ref_index);
>> +		tuple->ref_index = TUPLE_REF_MAX;
>> +		tuple->is_bigref = false;
>> +	}
>> +}
>> +
>>   void
>>   tuple_arena_destroy(struct slab_arena *arena)
>>   {
>> @@ -265,6 +397,8 @@ tuple_free(void)
>>   	tuple_format_free();
>> 
>>   	coll_id_cache_destroy();
>> +
>> +	bigref_list_reset();
>>   }
>> 
>>   box_tuple_format_t *
>> @@ -288,7 +422,8 @@ int
>>   box_tuple_ref(box_tuple_t *tuple)
>>   {
>>   	assert(tuple != NULL);
>> -	return tuple_ref(tuple);
>> +	tuple_ref(tuple);
>> +	return 0;
>>   }
>> 
>>   void
>> @@ -357,10 +492,7 @@ box_tuple_iterator(box_tuple_t *tuple)
>>   			 "mempool", "new slab");
>>   		return NULL;
>>   	}
>> -	if (tuple_ref(tuple) != 0) {
>> -		mempool_free(&tuple_iterator_pool, it);
>> -		return NULL;
>> -	}
>> +	tuple_ref(tuple);
>>   	tuple_rewind(it, tuple);
>>   	return it;
>>   }
>> @@ -451,7 +583,6 @@ box_tuple_new(box_tuple_format_t *format, const char *data, const char *end)
>>   	struct tuple *ret = tuple_new(format, data, end);
>>   	if (ret == NULL)
>>   		return NULL;
>> -	/* Can't fail on zero refs. */
>>   	return tuple_bless(ret);
>>   }
>> 
>> diff --git a/src/box/tuple.h b/src/box/tuple.h
>> index e2384dd..14dbd40 100644
>> --- a/src/box/tuple.h
>> +++ b/src/box/tuple.h
>> @@ -105,8 +105,7 @@ typedef struct tuple box_tuple_t;
>>    * tuple will leak.
>>    *
>>    * \param tuple a tuple
>> - * \retval -1 on error (check box_error_last())
>> - * \retval 0 on success
>> + * \retval 0 always
>>    * \sa box_tuple_unref()
>>    */
>>   int
>> @@ -269,8 +268,7 @@ box_tuple_next(box_tuple_iterator_t *it);
>>    * Use box_tuple_format_default() to create space-independent tuple.
>>    * \param data tuple data in MsgPack Array format ([field1, field2, ...]).
>>    * \param end the end of \a data
>> - * \retval NULL on out of memory
>> - * \retval tuple otherwise
>> + * \retval tuple
>>    * \pre data, end is valid MsgPack Array
>>    * \sa \code box.tuple.new(data) \endcode
>>    */
>> @@ -307,9 +305,17 @@ box_tuple_upsert(const box_tuple_t *tuple, const char *expr, const
>>    */
>>   struct PACKED tuple
>>   {
>> -	/** reference counter */
>> -	uint16_t refs;
>> -	/** format identifier */
>> +	union {
>> +		/** Reference counter. */
>> +		uint16_t refs;
>> +		struct {
>> +			/** Index of big reference counter. */
>> +			uint16_t ref_index : 15;
>> +			/** Big reference flag. */
>> +			bool is_bigref : 1;
>> +		};
>> +	};
>> +	/** Format identifier. */
>>   	uint16_t format_id;
>>   	/**
>>   	 * Length of the MessagePack data in raw part of the
>> @@ -774,26 +780,36 @@ tuple_field_uuid(const struct tuple *tuple, int fieldno,
>>   	return 0;
>>   }
>> 
>> -enum { TUPLE_REF_MAX = UINT16_MAX };
>> +enum { TUPLE_REF_MAX = UINT16_MAX >> 1 };
>> +
>> +/**
>> + * Increase tuple big reference counter.
>> + * @param tuple Tuple to reference.
>> + */
>> +void
>> +tuple_ref_slow(struct tuple *tuple);
>> 
>>   /**
>>    * Increment tuple reference counter.
>>    * @param tuple Tuple to reference.
>> - * @retval  0 Success.
>> - * @retval -1 Too many refs error.
>>    */
>> -static inline int
>> +static inline void
>>   tuple_ref(struct tuple *tuple)
>>   {
>> -	if (tuple->refs + 1 > TUPLE_REF_MAX) {
>> -		diag_set(ClientError, ER_TUPLE_REF_OVERFLOW);
>> -		return -1;
>> -	}
>> -	tuple->refs++;
>> -	return 0;
>> +	if (unlikely(tuple->refs >= TUPLE_REF_MAX))
>> +		tuple_ref_slow(tuple);
>> +	else
>> +		tuple->refs++;
>>   }
>> 
>>   /**
>> + * Decrease tuple big reference counter.
>> + * @param tuple Tuple to reference.
>> + */
>> +void
>> +tuple_unref_slow(struct tuple *tuple);
>> +
>> +/**
>>    * Decrement tuple reference counter. If it has reached zero, free the tuple.
>>    *
>>    * @pre tuple->refs + count >= 0
>> @@ -802,10 +818,9 @@ static inline void
>>   tuple_unref(struct tuple *tuple)
>>   {
>>   	assert(tuple->refs - 1 >= 0);
>> -
>> -	tuple->refs--;
>> -
>> -	if (tuple->refs == 0)
>> +	if (unlikely(tuple->is_bigref))
>> +		tuple_unref_slow(tuple);
>> +	else if (--tuple->refs == 0)
>>   		tuple_delete(tuple);
>>   }
>> 
>> @@ -813,25 +828,18 @@ extern struct tuple *box_tuple_last;
>> 
>>   /**
>>    * Convert internal `struct tuple` to public `box_tuple_t`.
>> - * \retval tuple on success
>> - * \retval NULL on error, check diag
>> + * \retval tuple
>>    * \post \a tuple ref counted until the next call.
>> - * \post tuple_ref() doesn't fail at least once
>>    * \sa tuple_ref
>>    */
>>   static inline box_tuple_t *
>>   tuple_bless(struct tuple *tuple)
>>   {
>>   	assert(tuple != NULL);
>> -	/* Ensure tuple can be referenced at least once after return */
>> -	if (tuple->refs + 2 > TUPLE_REF_MAX) {
>> -		diag_set(ClientError, ER_TUPLE_REF_OVERFLOW);
>> -		return NULL;
>> -	}
>> -	tuple->refs++;
>> +	tuple_ref(tuple);
>>   	/* Remove previous tuple */
>>   	if (likely(box_tuple_last != NULL))
>> -		tuple_unref(box_tuple_last); /* do not throw */
>> +		tuple_unref(box_tuple_last);
>>   	/* Remember current tuple */
>>   	box_tuple_last = tuple;
>>   	return tuple;
>> @@ -849,41 +857,6 @@ tuple_to_buf(const struct tuple *tuple, char *buf, size_t size);
>>   #include "tuple_update.h"
>>   #include "errinj.h"
>> 
>> -/**
>> - * \copydoc tuple_ref()
>> - * \throws if overflow detected.
>> - */
>> -static inline void
>> -tuple_ref_xc(struct tuple *tuple)
>> -{
>> -	if (tuple_ref(tuple))
>> -		diag_raise();
>> -}
>> -
>> -/**
>> - * \copydoc tuple_bless
>> - * \throw ER_TUPLE_REF_OVERFLOW
>> - */
>> -static inline box_tuple_t *
>> -tuple_bless_xc(struct tuple *tuple)
>> -{
>> -	box_tuple_t *blessed = tuple_bless(tuple);
>> -	if (blessed == NULL)
>> -		diag_raise();
>> -	return blessed;
>> -}
>> -
>> -/** Make tuple references exception-friendly in absence of @finally. */
>> -struct TupleRefNil {
>> -	struct tuple *tuple;
>> -	TupleRefNil (struct tuple *arg) :tuple(arg)
>> -	{ if (tuple) tuple_ref_xc(tuple); }
>> -	~TupleRefNil() { if (tuple) tuple_unref(tuple); }
>> -
>> -	TupleRefNil(const TupleRefNil&) = delete;
>> -	void operator=(const TupleRefNil&) = delete;
>> -};
>> -
>>   /* @copydoc tuple_field_with_type() */
>>   static inline const char *
>>   tuple_field_with_type_xc(const struct tuple *tuple, uint32_t fieldno,
>> diff --git a/src/box/vinyl.c b/src/box/vinyl.c
>> index f0d2687..dc0d020 100644
>> --- a/src/box/vinyl.c
>> +++ b/src/box/vinyl.c
>> @@ -3822,7 +3822,6 @@ vinyl_iterator_primary_next(struct iterator *base, struct tuple **ret)
>>   	assert(base->next = vinyl_iterator_primary_next);
>>   	struct vinyl_iterator *it = (struct vinyl_iterator *)base;
>>   	assert(it->lsm->index_id == 0);
>> -	struct tuple *tuple;
>> 
>>   	if (it->tx == NULL) {
>>   		diag_set(ClientError, ER_CURSOR_NO_TRANSACTION);
>> @@ -3833,18 +3832,15 @@ vinyl_iterator_primary_next(struct iterator *base, struct tuple **ret)
>>   		goto fail;
>>   	}
>> 
>> -	if (vy_read_iterator_next(&it->iterator, &tuple) != 0)
>> +	if (vy_read_iterator_next(&it->iterator, ret) != 0)
>>   		goto fail;
>> -
>> -	if (tuple == NULL) {
>> +	if (*ret == NULL) {
>>   		/* EOF. Close the iterator immediately. */
>>   		vinyl_iterator_close(it);
>> -		*ret = NULL;
>> -		return 0;
>> +	} else {
>> +		tuple_bless(*ret);
>>   	}
>> -	*ret = tuple_bless(tuple);
>> -	if (*ret != NULL)
>> -		return 0;
>> +	return 0;
>>   fail:
>>   	vinyl_iterator_close(it);
>>   	return -1;
>> @@ -3890,11 +3886,10 @@ next:
>>   	 * Note, there's no need in vy_tx_track() as the
>>   	 * tuple is already tracked in the secondary index.
>>   	 */
>> -	struct tuple *full_tuple;
>>   	if (vy_point_lookup(it->lsm->pk, it->tx, vy_tx_read_view(it->tx),
>> -			    tuple, &full_tuple) != 0)
>> +			    tuple, ret) != 0)
>>   		goto fail;
>> -	if (full_tuple == NULL) {
>> +	if (*ret == NULL) {
>>   		/*
>>   		 * All indexes of a space must be consistent, i.e.
>>   		 * if a tuple is present in one index, it must be
>> @@ -3908,10 +3903,9 @@ next:
>>   			 vy_lsm_name(it->lsm), vy_stmt_str(tuple));
>>   		goto next;
>>   	}
>> -	*ret = tuple_bless(full_tuple);
>> -	tuple_unref(full_tuple);
>> -	if (*ret != NULL)
>> -		return 0;
>> +	tuple_bless(*ret);
>> +	tuple_unref(*ret);
>> +	return 0;
>>   fail:
>>   	vinyl_iterator_close(it);
>>   	return -1;
>> @@ -3997,16 +3991,12 @@ vinyl_index_get(struct index *index, const char *key,
>>   	const struct vy_read_view **rv = (tx != NULL ? vy_tx_read_view(tx) :
>>   					  &env->xm->p_global_read_view);
>> 
>> -	struct tuple *tuple;
>> -	if (vy_lsm_full_by_key(lsm, tx, rv, key, part_count, &tuple) != 0)
>> +	if (vy_lsm_full_by_key(lsm, tx, rv, key, part_count, ret) != 0)
>>   		return -1;
>> -
>> -	if (tuple != NULL) {
>> -		*ret = tuple_bless(tuple);
>> -		tuple_unref(tuple);
>> -		return *ret == NULL ? -1 : 0;
>> +	if (*ret != NULL) {
>> +		tuple_bless(*ret);
>> +		tuple_unref(*ret);
>>   	}
>> -	*ret = NULL;
>>   	return 0;
>>   }
>> 
>> diff --git a/test/box/misc.result b/test/box/misc.result
>> index 8f94f55..f7703ba 100644
>> --- a/test/box/misc.result
>> +++ b/test/box/misc.result
>> @@ -332,7 +332,6 @@ t;
>>     - 'box.error.UNKNOWN_UPDATE_OP : 28'
>>     - 'box.error.WRONG_COLLATION_OPTIONS : 151'
>>     - 'box.error.CURSOR_NO_TRANSACTION : 80'
>> -  - 'box.error.TUPLE_REF_OVERFLOW : 86'
>>     - 'box.error.ALTER_SEQUENCE : 143'
>>     - 'box.error.INVALID_XLOG_NAME : 75'
>>     - 'box.error.SAVEPOINT_EMPTY_TX : 60'
>> @@ -360,7 +359,7 @@ t;
>>     - 'box.error.VINYL_MAX_TUPLE_SIZE : 139'
>>     - 'box.error.LOAD_FUNCTION : 99'
>>     - 'box.error.INVALID_XLOG : 74'
>> -  - 'box.error.PRIV_NOT_GRANTED : 91'
>> +  - 'box.error.READ_VIEW_ABORTED : 130'
>>     - 'box.error.TRANSACTION_CONFLICT : 97'
>>     - 'box.error.GUEST_USER_PASSWORD : 96'
>>     - 'box.error.PROC_C : 102'
>> @@ -405,7 +404,7 @@ t;
>>     - 'box.error.injection : table: <address>
>>     - 'box.error.NULLABLE_MISMATCH : 153'
>>     - 'box.error.LAST_DROP : 15'
>> -  - 'box.error.NO_SUCH_ROLE : 82'
>> +  - 'box.error.TUPLE_FORMAT_LIMIT : 16'
>>     - 'box.error.DECOMPRESSION : 124'
>>     - 'box.error.CREATE_SEQUENCE : 142'
>>     - 'box.error.CREATE_USER : 43'
>> @@ -414,66 +413,66 @@ t;
>>     - 'box.error.SEQUENCE_OVERFLOW : 147'
>>     - 'box.error.SYSTEM : 115'
>>     - 'box.error.KEY_PART_IS_TOO_LONG : 118'
>> -  - 'box.error.TUPLE_FORMAT_LIMIT : 16'
>> -  - 'box.error.BEFORE_REPLACE_RET : 53'
>> +  - 'box.error.INJECTION : 8'
>> +  - 'box.error.INVALID_MSGPACK : 20'
>>     - 'box.error.NO_SUCH_SAVEPOINT : 61'
>>     - 'box.error.TRUNCATE_SYSTEM_SPACE : 137'
>>     - 'box.error.VY_QUOTA_TIMEOUT : 135'
>>     - 'box.error.WRONG_INDEX_OPTIONS : 108'
>>     - 'box.error.INVALID_VYLOG_FILE : 133'
>>     - 'box.error.INDEX_FIELD_COUNT_LIMIT : 127'
>> -  - 'box.error.READ_VIEW_ABORTED : 130'
>> +  - 'box.error.PRIV_NOT_GRANTED : 91'
>>     - 'box.error.USER_MAX : 56'
>> -  - 'box.error.PROTOCOL : 104'
>> +  - 'box.error.BEFORE_REPLACE_RET : 53'
>>     - 'box.error.TUPLE_NOT_ARRAY : 22'
>>     - 'box.error.KEY_PART_COUNT : 31'
>>     - 'box.error.ALTER_SPACE : 12'
>>     - 'box.error.ACTIVE_TRANSACTION : 79'
>>     - 'box.error.EXACT_FIELD_COUNT : 38'
>>     - 'box.error.DROP_SEQUENCE : 144'
>> -  - 'box.error.INVALID_MSGPACK : 20'
>>     - 'box.error.MORE_THAN_ONE_TUPLE : 41'
>> -  - 'box.error.RTREE_RECT : 101'
>> -  - 'box.error.SUB_STMT_MAX : 121'
>> +  - 'box.error.UPSERT_UNIQUE_SECONDARY_KEY : 105'
>>     - 'box.error.UNKNOWN_REQUEST_TYPE : 48'
>> -  - 'box.error.SPACE_EXISTS : 10'
>> +  - 'box.error.SUB_STMT_MAX : 121'
>>     - 'box.error.PROC_LUA : 32'
>> +  - 'box.error.SPACE_EXISTS : 10'
>>     - 'box.error.ROLE_NOT_GRANTED : 92'
>> +  - 'box.error.UNSUPPORTED : 5'
>>     - 'box.error.NO_SUCH_SPACE : 36'
>>     - 'box.error.WRONG_INDEX_PARTS : 107'
>> -  - 'box.error.DROP_SPACE : 11'
>>     - 'box.error.MIN_FIELD_COUNT : 39'
>>     - 'box.error.REPLICASET_UUID_MISMATCH : 63'
>>     - 'box.error.UPDATE_FIELD : 29'
>> +  - 'box.error.INDEX_EXISTS : 85'
>>     - 'box.error.COMPRESSION : 119'
>>     - 'box.error.INVALID_ORDER : 68'
>> -  - 'box.error.INDEX_EXISTS : 85'
>>     - 'box.error.SPLICE : 25'
>>     - 'box.error.UNKNOWN : 0'
>> +  - 'box.error.IDENTIFIER : 70'
>>     - 'box.error.DROP_PRIMARY_KEY : 17'
>>     - 'box.error.NULLABLE_PRIMARY : 152'
>>     - 'box.error.NO_SUCH_SEQUENCE : 145'
>>     - 'box.error.RELOAD_CFG : 58'
>>     - 'box.error.INVALID_UUID : 64'
>> -  - 'box.error.INJECTION : 8'
>> +  - 'box.error.DROP_SPACE : 11'
>>     - 'box.error.TIMEOUT : 78'
>> -  - 'box.error.IDENTIFIER : 70'
>>     - 'box.error.ITERATOR_TYPE : 72'
>>     - 'box.error.REPLICA_MAX : 73'
>> +  - 'box.error.NO_SUCH_ROLE : 82'
>>     - 'box.error.MISSING_REQUEST_FIELD : 69'
>>     - 'box.error.MISSING_SNAPSHOT : 93'
>>     - 'box.error.WRONG_SPACE_OPTIONS : 111'
>>     - 'box.error.READONLY : 7'
>> -  - 'box.error.UNSUPPORTED : 5'
>>     - 'box.error.UPDATE_INTEGER_OVERFLOW : 95'
>> +  - 'box.error.RTREE_RECT : 101'
>>     - 'box.error.NO_CONNECTION : 77'
>>     - 'box.error.INVALID_XLOG_ORDER : 76'
>> -  - 'box.error.UPSERT_UNIQUE_SECONDARY_KEY : 105'
>> -  - 'box.error.ROLLBACK_IN_SUB_STMT : 123'
>>     - 'box.error.WRONG_SCHEMA_VERSION : 109'
>> -  - 'box.error.UNSUPPORTED_INDEX_FEATURE : 112'
>> -  - 'box.error.INDEX_PART_TYPE_MISMATCH : 24'
>> +  - 'box.error.ROLLBACK_IN_SUB_STMT : 123'
>> +  - 'box.error.PROTOCOL : 104'
>>     - 'box.error.INVALID_XLOG_TYPE : 125'
>> +  - 'box.error.INDEX_PART_TYPE_MISMATCH : 24'
>> +  - 'box.error.UNSUPPORTED_INDEX_FEATURE : 112'
>>   ...
>>   test_run:cmd("setopt delimiter ''");
>>   ---
>> diff --git a/test/box/select.result b/test/box/select.result
>> index 4aed706..b3ee6cd 100644
>> --- a/test/box/select.result
>> +++ b/test/box/select.result
>> @@ -619,31 +619,62 @@ collectgarbage('collect')
>>   ---
>>   - 0
>>   ...
>> +-- gh-3224 resurrect tuple bigrefs
>> +collectgarbage('stop')
>> +---
>> +- 0
>> +...
>>   s = box.schema.space.create('select', { temporary = true })
>>   ---
>>   ...
>>   index = s:create_index('primary', { type = 'tree' })
>>   ---
>>   ...
>> -a = s:insert{0}
>> +_ = s:insert{0}
>> +---
>> +...
>> +_ = s:insert{1}
>> +---
>> +...
>> +_ = s:insert{2}
>> +---
>> +...
>> +_ = s:insert{3}
>> +---
>> +...
>> +lots_of_links = setmetatable({}, {__mode = 'v'})
>>   ---
>>   ...
>> -lots_of_links = {}
>> +i = 0
>> +---
>> +...
>> +while (i < 33000) do table.insert(lots_of_links, s:get{0}) i = i + 1 end
>> +---
>> +...
>> +while (i < 66000) do table.insert(lots_of_links, s:get{1}) i = i + 1 end
>> +---
>> +...
>> +while (i < 100000) do table.insert(lots_of_links, s:get{2}) i = i + 1 end
>>   ---
>>   ...
>>   ref_count = 0
>>   ---
>>   ...
>> -while (true) do table.insert(lots_of_links, s:get{0}) ref_count = ref_count + 1 end
>> +for k, v in pairs(lots_of_links) do ref_count = ref_count + 1 end
>>   ---
>> -- error: Tuple reference counter overflow
>>   ...
>>   ref_count
>>   ---
>> -- 65531
>> +- 100000
>>   ...
>> -lots_of_links = {}
>> +-- check that tuples are deleted after gc is activated
>> +collectgarbage('collect')
>>   ---
>> +- 0
>> +...
>> +lots_of_links
>> +---
>> +- []
>>   ...
>>   s:drop()
>>   ---
>> diff --git a/test/box/select.test.lua b/test/box/select.test.lua
>> index 54c2ecc..3400bda 100644
>> --- a/test/box/select.test.lua
>> +++ b/test/box/select.test.lua
>> @@ -124,12 +124,25 @@ test.random(s.index[0], 48)
>>   s:drop()
>> 
>>   collectgarbage('collect')
>> +
>> +-- gh-3224 resurrect tuple bigrefs
>> +
>> +collectgarbage('stop')
>>   s = box.schema.space.create('select', { temporary = true })
>>   index = s:create_index('primary', { type = 'tree' })
>> -a = s:insert{0}
>> -lots_of_links = {}
>> +_ = s:insert{0}
>> +_ = s:insert{1}
>> +_ = s:insert{2}
>> +_ = s:insert{3}
>> +lots_of_links = setmetatable({}, {__mode = 'v'})
>> +i = 0
>> +while (i < 33000) do table.insert(lots_of_links, s:get{0}) i = i + 1 end
>> +while (i < 66000) do table.insert(lots_of_links, s:get{1}) i = i + 1 end
>> +while (i < 100000) do table.insert(lots_of_links, s:get{2}) i = i + 1 end
>>   ref_count = 0
>> -while (true) do table.insert(lots_of_links, s:get{0}) ref_count = ref_count + 1 end
>> +for k, v in pairs(lots_of_links) do ref_count = ref_count + 1 end
>>   ref_count
>> -lots_of_links = {}
>> +-- check that tuples are deleted after gc is activated
>> +collectgarbage('collect')
>> +lots_of_links
>>   s:drop()
>> diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
>> index dbc02cd..aef5316 100644
>> --- a/test/unit/CMakeLists.txt
>> +++ b/test/unit/CMakeLists.txt
>> @@ -192,3 +192,6 @@ target_link_libraries(vy_cache.test ${ITERATOR_TEST_LIBS})
>> 
>>   add_executable(coll.test coll.cpp)
>>   target_link_libraries(coll.test core unit ${ICU_LIBRARIES} misc)
>> +
>> +add_executable(tuple_bigref.test tuple_bigref.c)
>> +target_link_libraries(tuple_bigref.test tuple unit)
>> diff --git a/test/unit/tuple_bigref.c b/test/unit/tuple_bigref.c
>> new file mode 100644
>> index 0000000..97095dc
>> --- /dev/null
>> +++ b/test/unit/tuple_bigref.c
>> @@ -0,0 +1,117 @@
>> +#include "vy_iterators_helper.h"
>> +#include "memory.h"
>> +#include "fiber.h"
>> +#include "unit.h"
>> +#include <msgpuck.h>
>> +#include "trivia/util.h"
>> +
>> +enum {
>> +	BIGREF_DIFF = 10,
>> +	BIGREF_COUNT = 70003,
>> +	BIGREF_CAPACITY = 107,
>> +};
>> +
>> +static char tuple_buf[64];
>> +static char *tuple_end = tuple_buf;
>> +
>> +/**
>> + * This function creates new tuple with refs == 1.
>> + */
>> +static inline struct tuple *
>> +create_tuple()
>> +{
>> +	struct tuple *ret =
>> +		tuple_new(box_tuple_format_default(), tuple_buf, tuple_end);
>> +	tuple_ref(ret);
>> +	return ret;
>> +}
>> +
>> +/**
>> + * This test performs overall check of bigrefs.
>> + * What it checks:
>> + * 1) Till refs <= TUPLE_REF_MAX it shows number of refs
>> + * of tuple and it isn't a bigref.
>> + * 2) When refs > TUPLE_REF_MAX first 15 bits of it becomes
>> + * index of bigref and the last bit becomes true which
>> + * shows that it is bigref.
>> + * 3) Each of tuple has its own number of refs, but all
>> + * these numbers more than it is needed for getting a bigref.
>> + * 4) Indexes of bigrefs are given sequentially.
>> + * 5) After some tuples are sequentially deleted all of
>> + * others bigrefs are fine. In this test BIGREF_CAPACITY
>> + * tuples created and each of their ref counter increased
>> + * to (BIGREF_COUNT - index of tuple). Tuples are created
>> + * consistently.
>> + */
>> +static int
>> +test_bigrefs_1()
>> +{
>> +	struct tuple **tuples = (struct tuple **) malloc(BIGREF_CAPACITY *
>> +							 sizeof(*tuples));
>> +	for(int i = 0; i < BIGREF_CAPACITY; ++i)
>> +		tuples[i] = create_tuple();
>> +	for(int i = 0; i < BIGREF_CAPACITY; ++i) {
>> +		assert(tuples[i]->refs == 1);
>> +		for(int j = 1; j < TUPLE_REF_MAX; ++j)
>> +			tuple_ref(tuples[i]);
>> +		assert(! tuples[i]->is_bigref);
>> +		tuple_ref(tuples[i]);
>> +		assert(tuples[i]->is_bigref);
>> +		for(int j = TUPLE_REF_MAX + 1; j < BIGREF_COUNT - i; ++j)
>> +			tuple_ref(tuples[i]);
>> +		assert(tuples[i]->is_bigref);
>> +	}
>> +	for(int i = 0; i < BIGREF_CAPACITY; ++i) {
>> +		for(int j = 1; j < BIGREF_COUNT - i; ++j)
>> +			tuple_unref(tuples[i]);
>> +		assert(tuples[i]->refs == 1);
>> +		tuple_unref(tuples[i]);
>> +	}
>> +	free(tuples);
>> +	return 0;
>> +}
>> +
>> +/**
>> + * This test checks that bigrefs works fine after being
>> + * created and destroyed BIGREF_DIFF times.
>> + */
>> +static int
>> +test_bigrefs_2()
>> +{
>> +	struct tuple *tuple = create_tuple();
>> +	for(int i = 0; i < 2; ++i) {
>> +		assert(tuple->refs == 1);
>> +		for(int j = 1; j < BIGREF_COUNT; ++j)
>> +			tuple_ref(tuple);
>> +		assert(tuple->is_bigref);
>> +		for(int j = 1; j < BIGREF_COUNT; ++j)
>> +			tuple_unref(tuple);
>> +		assert(tuple->refs == 1);
>> +	}
>> +	tuple_unref(tuple);
>> +	return 0;
>> +}
>> +
>> +int
>> +main()
>> +{
>> +	header();
>> +	plan(2);
>> +
>> +	memory_init();
>> +	fiber_init(fiber_c_invoke);
>> +	tuple_init(NULL);
>> +
>> +	tuple_end = mp_encode_array(tuple_end, 1);
>> +	tuple_end = mp_encode_uint(tuple_end, 2);
>> +
>> +	ok(test_bigrefs_1() == 0, "Overall test passed.");
>> +	ok(test_bigrefs_2() == 0, "Create/destroy test passed.");
>> +
>> +	tuple_free();
>> +	fiber_free();
>> +	memory_free();
>> +
>> +	footer();
>> +	check_plan();
>> +}
>> diff --git a/test/unit/tuple_bigref.result b/test/unit/tuple_bigref.result
>> new file mode 100644
>> index 0000000..ad22f45
>> --- /dev/null
>> +++ b/test/unit/tuple_bigref.result
>> @@ -0,0 +1,5 @@
>> +	*** main ***
>> +1..2
>> +ok 1 - Overall test passed.
>> +ok 2 - Create/destroy test passed.
>> +	*** main: done ***
>> 
>
commit 1d497cd2488202a6b566e5399160acc8484ff39d
Author: Mergen Imeev <imeevma at gmail.com>
Date:   Mon May 28 19:17:51 2018 +0300

    box: create bigrefs for tuples
    
    Due to limitation of reference counters for tuple being only
    65535 it was possible to reach this limitation. This patch
    increases capacity of reference counters to 4 billions.
    
    Closes #3224

diff --git a/src/box/box.cc b/src/box/box.cc
index c728a4c53..42578613d 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -174,20 +174,22 @@ process_rw(struct request *request, struct space *space, struct tuple **result)
         txn_rollback_stmt();
         return -1;
     }
+    if (result == NULL)
+        return txn_commit_stmt(txn, request);
+    *result = tuple;
+    if (tuple == NULL)
+        return txn_commit_stmt(txn, request);
     /*
      * Pin the tuple locally before the commit,
      * otherwise it may go away during yield in
      * when WAL is written in autocommit mode.
      */
-    TupleRefNil ref(tuple);
-    if (txn_commit_stmt(txn, request) != 0)
-        return -1;
-    if (result != NULL) {
-        if (tuple != NULL && tuple_bless(tuple) == NULL)
-            return -1;
-        *result = tuple;
-    }
-    return 0;
+    tuple_ref(tuple);
+    int rc = txn_commit_stmt(txn, request);
+    if (rc == 0)
+        tuple_bless(tuple);
+    tuple_unref(tuple);
+    return rc;
 }
 
 void
diff --git a/src/box/errcode.h b/src/box/errcode.h
index a0759f8f4..e009524ad 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -138,7 +138,7 @@ struct errcode_record {
     /* 83 */_(ER_ROLE_EXISTS,        "Role '%s' already exists") \
     /* 84 */_(ER_CREATE_ROLE,        "Failed to create role '%s': %s") \
     /* 85 */_(ER_INDEX_EXISTS,        "Index '%s' already exists") \
-    /* 86 */_(ER_TUPLE_REF_OVERFLOW,    "Tuple reference counter overflow") \
+    /* 86 */_(ER_UNUSED6,            "") \
     /* 87 */_(ER_ROLE_LOOP,            "Granting role '%s' to role '%s' would create a loop") \
     /* 88 */_(ER_GRANT,            "Incorrect grant arguments: %s") \
     /* 89 */_(ER_PRIV_GRANTED,        "User '%s' already has %s access on %s '%s'") \
diff --git a/src/box/index.cc b/src/box/index.cc
index 3c62ec1cc..f992bc94c 100644
--- a/src/box/index.cc
+++ b/src/box/index.cc
@@ -220,8 +220,8 @@ box_index_random(uint32_t space_id, uint32_t index_id, uint32_t rnd,
     /* No tx management, random() is for approximation anyway. */
     if (index_random(index, rnd, result) != 0)
         return -1;
-    if (*result != NULL && tuple_bless(*result) == NULL)
-        return -1;
+    if (*result != NULL)
+        tuple_bless(*result);
     return 0;
 }
 
@@ -253,8 +253,8 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key,
     txn_commit_ro_stmt(txn);
     /* Count statistics. */
     rmean_collect(rmean_box, IPROTO_SELECT, 1);
-    if (*result != NULL && tuple_bless(*result) == NULL)
-        return -1;
+    if (*result != NULL)
+        tuple_bless(*result);
     return 0;
 }
 
@@ -285,8 +285,8 @@ box_index_min(uint32_t space_id, uint32_t index_id, const char *key,
         return -1;
     }
     txn_commit_ro_stmt(txn);
-    if (*result != NULL && tuple_bless(*result) == NULL)
-        return -1;
+    if (*result != NULL)
+        tuple_bless(*result);
     return 0;
 }
 
@@ -317,8 +317,8 @@ box_index_max(uint32_t space_id, uint32_t index_id, const char *key,
         return -1;
     }
     txn_commit_ro_stmt(txn);
-    if (*result != NULL && tuple_bless(*result) == NULL)
-        return -1;
+    if (*result != NULL)
+        tuple_bless(*result);
     return 0;
 }
 
@@ -397,8 +397,8 @@ box_iterator_next(box_iterator_t *itr, box_tuple_t **result)
     assert(result != NULL);
     if (iterator_next(itr, result) != 0)
         return -1;
-    if (*result != NULL && tuple_bless(*result) == NULL)
-        return -1;
+    if (*result != NULL)
+        tuple_bless(*result);
     return 0;
 }
 
diff --git a/src/box/lua/tuple.c b/src/box/lua/tuple.c
index 80573313d..22fe69611 100644
--- a/src/box/lua/tuple.c
+++ b/src/box/lua/tuple.c
@@ -496,10 +496,7 @@ luaT_pushtuple(struct lua_State *L, box_tuple_t *tuple)
         luaL_pushcdata(L, CTID_CONST_STRUCT_TUPLE_REF);
     *ptr = tuple;
     /* The order is important - first reference tuple, next set gc */
-    if (box_tuple_ref(tuple) != 0) {
-        luaT_error(L);
-        return;
-    }
+    box_tuple_ref(tuple);
     lua_pushcfunction(L, lbox_tuple_gc);
     luaL_setcdatagc(L, -2);
 }
diff --git a/src/box/port.c b/src/box/port.c
index 03f6be79d..9b6b8580c 100644
--- a/src/box/port.c
+++ b/src/box/port.c
@@ -45,8 +45,7 @@ port_tuple_add(struct port *base, struct tuple *tuple)
     struct port_tuple *port = port_tuple(base);
     struct port_tuple_entry *e;
     if (port->size == 0) {
-        if (tuple_ref(tuple) != 0)
-            return -1;
+        tuple_ref(tuple);
         e = &port->first_entry;
         port->first = port->last = e;
     } else {
@@ -55,10 +54,7 @@ port_tuple_add(struct port *base, struct tuple *tuple)
             diag_set(OutOfMemory, sizeof(*e), "mempool_alloc", "e");
             return -1;
         }
-        if (tuple_ref(tuple) != 0) {
-            mempool_free(&port_tuple_entry_pool, e);
-            return -1;
-        }
+        tuple_ref(tuple);
         port->last->next = e;
         port->last = e;
     }
diff --git a/src/box/tuple.c b/src/box/tuple.c
index 014f37406..4eed95f6c 100644
--- a/src/box/tuple.c
+++ b/src/box/tuple.c
@@ -48,6 +48,28 @@ enum {
     OBJSIZE_MIN = 16,
 };
 
+/**
+ * Container for big reference counters. Contains array of big
+ * reference counters, size of this array and number of non-zero
+ * big reference counters. When reference counter of tuple becomes
+ * more than 32767, field refs of this tuple becomes index of big
+ * reference counter in big reference counter array and field
+ * is_bigref is set true. The moment big reference becomes equal
+ * 32767 it is set to 0, refs of the tuple becomes 32767 and
+ * is_bigref becomes false. Big reference counter can be equal to
+ * 0 or be more than 32767.
+ */
+static struct bigref_list {
+    /** Free-list of big reference counters. */
+    uint32_t *refs;
+    /** Number of non-zero elements in the array. */
+    uint16_t size;
+    /** Capacity of the array. */
+    uint16_t capacity;
+    /** Index of first free element. */
+    uint16_t vacant_index;
+} bigref_list;
+
 static const double ALLOC_FACTOR = 1.05;
 
 /**
@@ -151,6 +173,13 @@ tuple_validate_raw(struct tuple_format *format, const char *tuple)
     return 0;
 }
 
+/** Initialize big references container. */
+static inline void
+bigref_list_create()
+{
+    memset(&bigref_list, 0, sizeof(bigref_list));
+}
+
 /**
  * Incremented on every snapshot and is used to distinguish tuples
  * which were created after start of a snapshot (these tuples can
@@ -211,6 +240,8 @@ tuple_init(field_name_hash_f hash)
 
     box_tuple_last = NULL;
 
+    bigref_list_create();
+
     if (coll_id_cache_init() != 0)
         return -1;
 
@@ -244,6 +275,106 @@ tuple_arena_create(struct slab_arena *arena, struct quota *quota,
     }
 }
 
+enum {
+    BIGREF_FACTOR = 2,
+    BIGREF_MAX = UINT32_MAX,
+    BIGREF_MIN_CAPACITY = 16,
+    /**
+     * Only 15 bits are available for bigref list index in
+     * struct tuple.
+     */
+    BIGREF_MAX_CAPACITY = UINT16_MAX >> 1
+};
+
+/** Destroy big references and free memory that was allocated. */
+static inline void
+bigref_list_reset()
+{
+    free(bigref_list.refs);
+    bigref_list_create();
+}
+
+/**
+ * Increase capacity of bigref_list.
+ */
+static inline void
+bigref_list_increase_capacity()
+{
+    assert(bigref_list.capacity == bigref_list.vacant_index);
+    uint32_t *refs = bigref_list.refs;
+    uint16_t capacity = bigref_list.capacity;
+    if (capacity == 0)
+        capacity = BIGREF_MIN_CAPACITY;
+    else if (capacity < BIGREF_MAX_CAPACITY)
+        capacity = MIN(capacity * BIGREF_FACTOR, BIGREF_MAX_CAPACITY);
+    else
+        panic("Too many big references");
+    refs = (uint32_t *) realloc(refs, capacity * sizeof(*refs));
+    if (refs == NULL) {
+        panic("failed to reallocate %zu bytes: Cannot allocate "\
+              "memory.", capacity * sizeof(*refs));
+    }
+    for(uint16_t i = bigref_list.capacity; i < capacity; ++i)
+        refs[i] = i + 1;
+    bigref_list.refs = refs;
+    bigref_list.capacity = capacity;
+}
+
+/**
+ * Return index for new big reference counter and allocate memory
+ * if needed.
+ * @retval index for new big reference counter.
+ */
+static inline uint16_t
+bigref_list_new_index()
+{
+    if (bigref_list.size == bigref_list.capacity)
+        bigref_list_increase_capacity();
+    ++bigref_list.size;
+    uint16_t vacant_index = bigref_list.vacant_index;
+    bigref_list.vacant_index = bigref_list.refs[vacant_index];
+    return vacant_index;
+}
+
+void
+tuple_ref_slow(struct tuple *tuple)
+{
+    assert(tuple->is_bigref || tuple->refs == TUPLE_REF_MAX);
+    if (! tuple->is_bigref) {
+        tuple->ref_index = bigref_list_new_index();
+        tuple->is_bigref = true;
+        bigref_list.refs[tuple->ref_index] = TUPLE_REF_MAX;
+    } else if (bigref_list.refs[tuple->ref_index] == BIGREF_MAX) {
+        panic("Tuple big reference counter overflow");
+    }
+    bigref_list.refs[tuple->ref_index]++;
+}
+
+/**
+ * Free index and add it to free-index-list. Free memory
+ * when size == 0.
+ */
+static inline void
+bigref_list_delete_index(uint16_t index)
+{
+    bigref_list.refs[index] = bigref_list.vacant_index;
+    bigref_list.vacant_index = index;
+    if (--bigref_list.size == 0)
+        bigref_list_reset();
+}
+
+void
+tuple_unref_slow(struct tuple *tuple)
+{
+    assert(tuple->is_bigref &&
+           bigref_list.refs[tuple->ref_index] > TUPLE_REF_MAX);
+    if(--bigref_list.refs[tuple->ref_index] == TUPLE_REF_MAX) {
+        bigref_list_delete_index(tuple->ref_index);
+        tuple->ref_index = TUPLE_REF_MAX;
+        tuple->is_bigref = false;
+    }
+}
+
 void
 tuple_arena_destroy(struct slab_arena *arena)
 {
@@ -265,6 +396,8 @@ tuple_free(void)
     tuple_format_free();
 
     coll_id_cache_destroy();
+
+    bigref_list_reset();
 }
 
 box_tuple_format_t *
@@ -288,7 +421,8 @@ int
 box_tuple_ref(box_tuple_t *tuple)
 {
     assert(tuple != NULL);
-    return tuple_ref(tuple);
+    tuple_ref(tuple);
+    return 0;
 }
 
 void
@@ -357,10 +491,7 @@ box_tuple_iterator(box_tuple_t *tuple)
              "mempool", "new slab");
         return NULL;
     }
-    if (tuple_ref(tuple) != 0) {
-        mempool_free(&tuple_iterator_pool, it);
-        return NULL;
-    }
+    tuple_ref(tuple);
     tuple_rewind(it, tuple);
     return it;
 }
@@ -451,7 +582,6 @@ box_tuple_new(box_tuple_format_t *format, const char *data, const char *end)
     struct tuple *ret = tuple_new(format, data, end);
     if (ret == NULL)
         return NULL;
-    /* Can't fail on zero refs. */
     return tuple_bless(ret);
 }
 
diff --git a/src/box/tuple.h b/src/box/tuple.h
index e2384dda0..14dbd4094 100644
--- a/src/box/tuple.h
+++ b/src/box/tuple.h
@@ -105,8 +105,7 @@ typedef struct tuple box_tuple_t;
  * tuple will leak.
  *
  * \param tuple a tuple
- * \retval -1 on error (check box_error_last())
- * \retval 0 on success
+ * \retval 0 always
  * \sa box_tuple_unref()
  */
 int
@@ -269,8 +268,7 @@ box_tuple_next(box_tuple_iterator_t *it);
  * Use box_tuple_format_default() to create space-independent tuple.
  * \param data tuple data in MsgPack Array format ([field1, field2, ...]).
  * \param end the end of \a data
- * \retval NULL on out of memory
- * \retval tuple otherwise
+ * \retval tuple
  * \pre data, end is valid MsgPack Array
  * \sa \code box.tuple.new(data) \endcode
  */
@@ -307,9 +305,17 @@ box_tuple_upsert(const box_tuple_t *tuple, const char *expr, const
  */
 struct PACKED tuple
 {
-    /** reference counter */
-    uint16_t refs;
-    /** format identifier */
+    union {
+        /** Reference counter. */
+        uint16_t refs;
+        struct {
+            /** Index of big reference counter. */
+            uint16_t ref_index : 15;
+            /** Big reference flag. */
+            bool is_bigref : 1;
+        };
+    };
+    /** Format identifier. */
     uint16_t format_id;
     /**
      * Length of the MessagePack data in raw part of the
@@ -774,25 +780,35 @@ tuple_field_uuid(const struct tuple *tuple, int fieldno,
     return 0;
 }
 
-enum { TUPLE_REF_MAX = UINT16_MAX };
+enum { TUPLE_REF_MAX = UINT16_MAX >> 1 };
+
+/**
+ * Increase tuple big reference counter.
+ * @param tuple Tuple to reference.
+ */
+void
+tuple_ref_slow(struct tuple *tuple);
 
 /**
  * Increment tuple reference counter.
  * @param tuple Tuple to reference.
- * @retval  0 Success.
- * @retval -1 Too many refs error.
  */
-static inline int
+static inline void
 tuple_ref(struct tuple *tuple)
 {
-    if (tuple->refs + 1 > TUPLE_REF_MAX) {
-        diag_set(ClientError, ER_TUPLE_REF_OVERFLOW);
-        return -1;
-    }
-    tuple->refs++;
-    return 0;
+    if (unlikely(tuple->refs >= TUPLE_REF_MAX))
+        tuple_ref_slow(tuple);
+    else
+        tuple->refs++;
 }
 
+/**
+ * Decrease tuple big reference counter.
+ * @param tuple Tuple to reference.
+ */
+void
+tuple_unref_slow(struct tuple *tuple);
+
 /**
  * Decrement tuple reference counter. If it has reached zero, free the tuple.
  *
@@ -802,10 +818,9 @@ static inline void
 tuple_unref(struct tuple *tuple)
 {
     assert(tuple->refs - 1 >= 0);
-
-    tuple->refs--;
-
-    if (tuple->refs == 0)
+    if (unlikely(tuple->is_bigref))
+        tuple_unref_slow(tuple);
+    else if (--tuple->refs == 0)
         tuple_delete(tuple);
 }
 
@@ -813,25 +828,18 @@ extern struct tuple *box_tuple_last;
 
 /**
  * Convert internal `struct tuple` to public `box_tuple_t`.
- * \retval tuple on success
- * \retval NULL on error, check diag
+ * \retval tuple
  * \post \a tuple ref counted until the next call.
- * \post tuple_ref() doesn't fail at least once
  * \sa tuple_ref
  */
 static inline box_tuple_t *
 tuple_bless(struct tuple *tuple)
 {
     assert(tuple != NULL);
-    /* Ensure tuple can be referenced at least once after return */
-    if (tuple->refs + 2 > TUPLE_REF_MAX) {
-        diag_set(ClientError, ER_TUPLE_REF_OVERFLOW);
-        return NULL;
-    }
-    tuple->refs++;
+    tuple_ref(tuple);
     /* Remove previous tuple */
     if (likely(box_tuple_last != NULL))
-        tuple_unref(box_tuple_last); /* do not throw */
+        tuple_unref(box_tuple_last);
     /* Remember current tuple */
     box_tuple_last = tuple;
     return tuple;
@@ -849,41 +857,6 @@ tuple_to_buf(const struct tuple *tuple, char *buf, size_t size);
 #include "tuple_update.h"
 #include "errinj.h"
 
-/**
- * \copydoc tuple_ref()
- * \throws if overflow detected.
- */
-static inline void
-tuple_ref_xc(struct tuple *tuple)
-{
-    if (tuple_ref(tuple))
-        diag_raise();
-}
-
-/**
- * \copydoc tuple_bless
- * \throw ER_TUPLE_REF_OVERFLOW
- */
-static inline box_tuple_t *
-tuple_bless_xc(struct tuple *tuple)
-{
-    box_tuple_t *blessed = tuple_bless(tuple);
-    if (blessed == NULL)
-        diag_raise();
-    return blessed;
-}
-
-/** Make tuple references exception-friendly in absence of @finally. */
-struct TupleRefNil {
-    struct tuple *tuple;
-    TupleRefNil (struct tuple *arg) :tuple(arg)
-    { if (tuple) tuple_ref_xc(tuple); }
-    ~TupleRefNil() { if (tuple) tuple_unref(tuple); }
-
-    TupleRefNil(const TupleRefNil&) = delete;
-    void operator=(const TupleRefNil&) = delete;
-};
-
 /* @copydoc tuple_field_with_type() */
 static inline const char *
 tuple_field_with_type_xc(const struct tuple *tuple, uint32_t fieldno,
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index f0d26874e..dc0d02054 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -3822,7 +3822,6 @@ vinyl_iterator_primary_next(struct iterator *base, struct tuple **ret)
     assert(base->next = vinyl_iterator_primary_next);
     struct vinyl_iterator *it = (struct vinyl_iterator *)base;
     assert(it->lsm->index_id == 0);
-    struct tuple *tuple;
 
     if (it->tx == NULL) {
         diag_set(ClientError, ER_CURSOR_NO_TRANSACTION);
@@ -3833,18 +3832,15 @@ vinyl_iterator_primary_next(struct iterator *base, struct tuple **ret)
         goto fail;
     }
 
-    if (vy_read_iterator_next(&it->iterator, &tuple) != 0)
+    if (vy_read_iterator_next(&it->iterator, ret) != 0)
         goto fail;
-
-    if (tuple == NULL) {
+    if (*ret == NULL) {
         /* EOF. Close the iterator immediately. */
         vinyl_iterator_close(it);
-        *ret = NULL;
-        return 0;
+    } else {
+        tuple_bless(*ret);
     }
-    *ret = tuple_bless(tuple);
-    if (*ret != NULL)
-        return 0;
+    return 0;
 fail:
     vinyl_iterator_close(it);
     return -1;
@@ -3890,11 +3886,10 @@ next:
      * Note, there's no need in vy_tx_track() as the
      * tuple is already tracked in the secondary index.
      */
-    struct tuple *full_tuple;
     if (vy_point_lookup(it->lsm->pk, it->tx, vy_tx_read_view(it->tx),
-                tuple, &full_tuple) != 0)
+                tuple, ret) != 0)
         goto fail;
-    if (full_tuple == NULL) {
+    if (*ret == NULL) {
         /*
          * All indexes of a space must be consistent, i.e.
          * if a tuple is present in one index, it must be
@@ -3908,10 +3903,9 @@ next:
              vy_lsm_name(it->lsm), vy_stmt_str(tuple));
         goto next;
     }
-    *ret = tuple_bless(full_tuple);
-    tuple_unref(full_tuple);
-    if (*ret != NULL)
-        return 0;
+    tuple_bless(*ret);
+    tuple_unref(*ret);
+    return 0;
 fail:
     vinyl_iterator_close(it);
     return -1;
@@ -3997,16 +3991,12 @@ vinyl_index_get(struct index *index, const char *key,
     const struct vy_read_view **rv = (tx != NULL ? vy_tx_read_view(tx) :
                       &env->xm->p_global_read_view);
 
-    struct tuple *tuple;
-    if (vy_lsm_full_by_key(lsm, tx, rv, key, part_count, &tuple) != 0)
+    if (vy_lsm_full_by_key(lsm, tx, rv, key, part_count, ret) != 0)
         return -1;
-
-    if (tuple != NULL) {
-        *ret = tuple_bless(tuple);
-        tuple_unref(tuple);
-        return *ret == NULL ? -1 : 0;
+    if (*ret != NULL) {
+        tuple_bless(*ret);
+        tuple_unref(*ret);
     }
-    *ret = NULL;
     return 0;
 }
 
diff --git a/test/box/misc.result b/test/box/misc.result
index 8f94f5513..f7703ba8d 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -332,7 +332,6 @@ t;
   - 'box.error.UNKNOWN_UPDATE_OP : 28'
   - 'box.error.WRONG_COLLATION_OPTIONS : 151'
   - 'box.error.CURSOR_NO_TRANSACTION : 80'
-  - 'box.error.TUPLE_REF_OVERFLOW : 86'
   - 'box.error.ALTER_SEQUENCE : 143'
   - 'box.error.INVALID_XLOG_NAME : 75'
   - 'box.error.SAVEPOINT_EMPTY_TX : 60'
@@ -360,7 +359,7 @@ t;
   - 'box.error.VINYL_MAX_TUPLE_SIZE : 139'
   - 'box.error.LOAD_FUNCTION : 99'
   - 'box.error.INVALID_XLOG : 74'
-  - 'box.error.PRIV_NOT_GRANTED : 91'
+  - 'box.error.READ_VIEW_ABORTED : 130'
   - 'box.error.TRANSACTION_CONFLICT : 97'
   - 'box.error.GUEST_USER_PASSWORD : 96'
   - 'box.error.PROC_C : 102'
@@ -405,7 +404,7 @@ t;
   - 'box.error.injection : table: <address>
   - 'box.error.NULLABLE_MISMATCH : 153'
   - 'box.error.LAST_DROP : 15'
-  - 'box.error.NO_SUCH_ROLE : 82'
+  - 'box.error.TUPLE_FORMAT_LIMIT : 16'
   - 'box.error.DECOMPRESSION : 124'
   - 'box.error.CREATE_SEQUENCE : 142'
   - 'box.error.CREATE_USER : 43'
@@ -414,66 +413,66 @@ t;
   - 'box.error.SEQUENCE_OVERFLOW : 147'
   - 'box.error.SYSTEM : 115'
   - 'box.error.KEY_PART_IS_TOO_LONG : 118'
-  - 'box.error.TUPLE_FORMAT_LIMIT : 16'
-  - 'box.error.BEFORE_REPLACE_RET : 53'
+  - 'box.error.INJECTION : 8'
+  - 'box.error.INVALID_MSGPACK : 20'
   - 'box.error.NO_SUCH_SAVEPOINT : 61'
   - 'box.error.TRUNCATE_SYSTEM_SPACE : 137'
   - 'box.error.VY_QUOTA_TIMEOUT : 135'
   - 'box.error.WRONG_INDEX_OPTIONS : 108'
   - 'box.error.INVALID_VYLOG_FILE : 133'
   - 'box.error.INDEX_FIELD_COUNT_LIMIT : 127'
-  - 'box.error.READ_VIEW_ABORTED : 130'
+  - 'box.error.PRIV_NOT_GRANTED : 91'
   - 'box.error.USER_MAX : 56'
-  - 'box.error.PROTOCOL : 104'
+  - 'box.error.BEFORE_REPLACE_RET : 53'
   - 'box.error.TUPLE_NOT_ARRAY : 22'
   - 'box.error.KEY_PART_COUNT : 31'
   - 'box.error.ALTER_SPACE : 12'
   - 'box.error.ACTIVE_TRANSACTION : 79'
   - 'box.error.EXACT_FIELD_COUNT : 38'
   - 'box.error.DROP_SEQUENCE : 144'
-  - 'box.error.INVALID_MSGPACK : 20'
   - 'box.error.MORE_THAN_ONE_TUPLE : 41'
-  - 'box.error.RTREE_RECT : 101'
-  - 'box.error.SUB_STMT_MAX : 121'
+  - 'box.error.UPSERT_UNIQUE_SECONDARY_KEY : 105'
   - 'box.error.UNKNOWN_REQUEST_TYPE : 48'
-  - 'box.error.SPACE_EXISTS : 10'
+  - 'box.error.SUB_STMT_MAX : 121'
   - 'box.error.PROC_LUA : 32'
+  - 'box.error.SPACE_EXISTS : 10'
   - 'box.error.ROLE_NOT_GRANTED : 92'
+  - 'box.error.UNSUPPORTED : 5'
   - 'box.error.NO_SUCH_SPACE : 36'
   - 'box.error.WRONG_INDEX_PARTS : 107'
-  - 'box.error.DROP_SPACE : 11'
   - 'box.error.MIN_FIELD_COUNT : 39'
   - 'box.error.REPLICASET_UUID_MISMATCH : 63'
   - 'box.error.UPDATE_FIELD : 29'
+  - 'box.error.INDEX_EXISTS : 85'
   - 'box.error.COMPRESSION : 119'
   - 'box.error.INVALID_ORDER : 68'
-  - 'box.error.INDEX_EXISTS : 85'
   - 'box.error.SPLICE : 25'
   - 'box.error.UNKNOWN : 0'
+  - 'box.error.IDENTIFIER : 70'
   - 'box.error.DROP_PRIMARY_KEY : 17'
   - 'box.error.NULLABLE_PRIMARY : 152'
   - 'box.error.NO_SUCH_SEQUENCE : 145'
   - 'box.error.RELOAD_CFG : 58'
   - 'box.error.INVALID_UUID : 64'
-  - 'box.error.INJECTION : 8'
+  - 'box.error.DROP_SPACE : 11'
   - 'box.error.TIMEOUT : 78'
-  - 'box.error.IDENTIFIER : 70'
   - 'box.error.ITERATOR_TYPE : 72'
   - 'box.error.REPLICA_MAX : 73'
+  - 'box.error.NO_SUCH_ROLE : 82'
   - 'box.error.MISSING_REQUEST_FIELD : 69'
   - 'box.error.MISSING_SNAPSHOT : 93'
   - 'box.error.WRONG_SPACE_OPTIONS : 111'
   - 'box.error.READONLY : 7'
-  - 'box.error.UNSUPPORTED : 5'
   - 'box.error.UPDATE_INTEGER_OVERFLOW : 95'
+  - 'box.error.RTREE_RECT : 101'
   - 'box.error.NO_CONNECTION : 77'
   - 'box.error.INVALID_XLOG_ORDER : 76'
-  - 'box.error.UPSERT_UNIQUE_SECONDARY_KEY : 105'
-  - 'box.error.ROLLBACK_IN_SUB_STMT : 123'
   - 'box.error.WRONG_SCHEMA_VERSION : 109'
-  - 'box.error.UNSUPPORTED_INDEX_FEATURE : 112'
-  - 'box.error.INDEX_PART_TYPE_MISMATCH : 24'
+  - 'box.error.ROLLBACK_IN_SUB_STMT : 123'
+  - 'box.error.PROTOCOL : 104'
   - 'box.error.INVALID_XLOG_TYPE : 125'
+  - 'box.error.INDEX_PART_TYPE_MISMATCH : 24'
+  - 'box.error.UNSUPPORTED_INDEX_FEATURE : 112'
 ...
 test_run:cmd("setopt delimiter ''");
 ---
diff --git a/test/box/select.result b/test/box/select.result
index 4aed70630..b3ee6cd57 100644
--- a/test/box/select.result
+++ b/test/box/select.result
@@ -619,31 +619,62 @@ collectgarbage('collect')
 ---
 - 0
 ...
+-- gh-3224 resurrect tuple bigrefs
+collectgarbage('stop')
+---
+- 0
+...
 s = box.schema.space.create('select', { temporary = true })
 ---
 ...
 index = s:create_index('primary', { type = 'tree' })
 ---
 ...
-a = s:insert{0}
+_ = s:insert{0}
+---
+...
+_ = s:insert{1}
+---
+...
+_ = s:insert{2}
+---
+...
+_ = s:insert{3}
+---
+...
+lots_of_links = setmetatable({}, {__mode = 'v'})
 ---
 ...
-lots_of_links = {}
+i = 0
+---
+...
+while (i < 33000) do table.insert(lots_of_links, s:get{0}) i = i + 1 end
+---
+...
+while (i < 66000) do table.insert(lots_of_links, s:get{1}) i = i + 1 end
+---
+...
+while (i < 100000) do table.insert(lots_of_links, s:get{2}) i = i + 1 end
 ---
 ...
 ref_count = 0
 ---
 ...
-while (true) do table.insert(lots_of_links, s:get{0}) ref_count = ref_count + 1 end
+for k, v in pairs(lots_of_links) do ref_count = ref_count + 1 end
 ---
-- error: Tuple reference counter overflow
 ...
 ref_count
 ---
-- 65531
+- 100000
 ...
-lots_of_links = {}
+-- check that tuples are deleted after gc is activated
+collectgarbage('collect')
 ---
+- 0
+...
+lots_of_links
+---
+- []
 ...
 s:drop()
 ---
diff --git a/test/box/select.test.lua b/test/box/select.test.lua
index 54c2ecc1c..3400bdafe 100644
--- a/test/box/select.test.lua
+++ b/test/box/select.test.lua
@@ -124,12 +124,25 @@ test.random(s.index[0], 48)
 s:drop()
 
 collectgarbage('collect')
+
+-- gh-3224 resurrect tuple bigrefs
+
+collectgarbage('stop')
 s = box.schema.space.create('select', { temporary = true })
 index = s:create_index('primary', { type = 'tree' })
-a = s:insert{0}
-lots_of_links = {}
+_ = s:insert{0}
+_ = s:insert{1}
+_ = s:insert{2}
+_ = s:insert{3}
+lots_of_links = setmetatable({}, {__mode = 'v'})
+i = 0
+while (i < 33000) do table.insert(lots_of_links, s:get{0}) i = i + 1 end
+while (i < 66000) do table.insert(lots_of_links, s:get{1}) i = i + 1 end
+while (i < 100000) do table.insert(lots_of_links, s:get{2}) i = i + 1 end
 ref_count = 0
-while (true) do table.insert(lots_of_links, s:get{0}) ref_count = ref_count + 1 end
+for k, v in pairs(lots_of_links) do ref_count = ref_count + 1 end
 ref_count
-lots_of_links = {}
+-- check that tuples are deleted after gc is activated
+collectgarbage('collect')
+lots_of_links
 s:drop()
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index dbc02cdf0..aef531600 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -192,3 +192,6 @@ target_link_libraries(vy_cache.test ${ITERATOR_TEST_LIBS})
 
 add_executable(coll.test coll.cpp)
 target_link_libraries(coll.test core unit ${ICU_LIBRARIES} misc)
+
+add_executable(tuple_bigref.test tuple_bigref.c)
+target_link_libraries(tuple_bigref.test tuple unit)
diff --git a/test/unit/tuple_bigref.c b/test/unit/tuple_bigref.c
new file mode 100644
index 000000000..154c10ac1
--- /dev/null
+++ b/test/unit/tuple_bigref.c
@@ -0,0 +1,179 @@
+#include "memory.h"
+#include "fiber.h"
+#include "tuple.h"
+#include "unit.h"
+#include <stdio.h>
+
+enum {
+    BIGREF_DIFF = 10,
+    BIGREF_COUNT = 70003,
+    BIGREF_CAPACITY = 107,
+};
+
+static char tuple_buf[64];
+static char *tuple_end = tuple_buf;
+
+/**
+ * This function creates new tuple with refs == 1.
+ */
+static inline struct tuple *
+create_tuple()
+{
+    struct tuple *ret =
+        tuple_new(box_tuple_format_default(), tuple_buf, tuple_end);
+    tuple_ref(ret);
+    return ret;
+}
+
+/**
+ * This test performs overall check of bigrefs.
+ * What it checks:
+ * 1) Till refs <= TUPLE_REF_MAX it shows number of refs
+ * of tuple and it isn't a bigref.
+ * 2) When refs > TUPLE_REF_MAX first 15 bits of it becomes
+ * index of bigref and the last bit becomes true which
+ * shows that it is bigref.
+ * 3) Each of tuple has its own number of refs, but all
+ * these numbers more than it is needed for getting a bigref.
+ * 4) Indexes of bigrefs are given sequentially.
+ * 5) After some tuples are sequentially deleted all of
+ * others bigrefs are fine. In this test BIGREF_CAPACITY
+ * tuples created and each of their ref counter increased
+ * to (BIGREF_COUNT - index of tuple). Tuples are created
+ * consistently.
+ */
+static void
+test_bigrefs_1()
+{
+    printf("Test1: Overall check of bigrefs.\n");
+    uint16_t counter = 0;
+    struct tuple **tuples = (struct tuple **) malloc(BIGREF_CAPACITY *
+                             sizeof(*tuples));
+    for(int i = 0; i < BIGREF_CAPACITY; ++i)
+        tuples[i] = create_tuple();
+    for(int i = 0; i < BIGREF_CAPACITY; ++i)
+        counter += tuples[i]->refs == 1;
+    is(counter, BIGREF_CAPACITY, "All tuples have refs == 1.");
+    for(int i = 0; i < BIGREF_CAPACITY; ++i) {
+        for(int j = 1; j < TUPLE_REF_MAX; ++j)
+            tuple_ref(tuples[i]);
+        tuple_ref(tuples[i]);
+        for(int j = TUPLE_REF_MAX + 1; j < BIGREF_COUNT - i; ++j)
+            tuple_ref(tuples[i]);
+    }
+    counter = 0;
+    for(int i = 0; i < BIGREF_CAPACITY; ++i)
+        counter += tuples[i]->is_bigref == true;
+    is(counter, BIGREF_CAPACITY, "All tuples have bigrefs.");
+    counter = 0;
+    for(int i = 0; i < BIGREF_CAPACITY; ++i) {
+        for(int j = 1; j < BIGREF_COUNT - i; ++j)
+            tuple_unref(tuples[i]);
+        counter += tuples[i]->refs == 1;
+        tuple_unref(tuples[i]);
+    }
+    is(counter, BIGREF_CAPACITY, "All tuples were deleted.");
+    free(tuples);
+}
+
+/**
+ * This test checks that bigrefs works fine after being
+ * created and destroyed 2 times.
+ */
+static void
+test_bigrefs_2()
+{
+    printf("Test 2: Create/destroy test.\n");
+    struct tuple *tuple = create_tuple();
+    for(int j = 1; j < BIGREF_COUNT; ++j)
+        tuple_ref(tuple);
+    ok(tuple->refs && tuple->ref_index == 0,
+       "Tuple becomes bigref first time with ref_index == 0.");
+    for(int j = 1; j < BIGREF_COUNT; ++j)
+        tuple_unref(tuple);
+    for(int j = 1; j < BIGREF_COUNT; ++j)
+        tuple_ref(tuple);
+    ok(tuple->refs && tuple->ref_index == 0,
+       "Tuple becomes bigref second time with ref_index == 0.");
+    for(int j = 1; j < BIGREF_COUNT; ++j)
+        tuple_unref(tuple);
+    tuple_unref(tuple);
+}
+
+/**
+ * This test checks that indexes are given as
+ * intended.
+ */
+static void
+test_bigrefs_3()
+{
+    printf("Test3: Non-consistent indexes test.\n");
+    uint16_t counter = 0;
+    uint16_t max_index = BIGREF_CAPACITY / BIGREF_DIFF;
+    struct tuple **tuples = (struct tuple **) malloc(BIGREF_CAPACITY *
+                             sizeof(*tuples));
+    uint16_t *indexes = (uint16_t *) malloc (sizeof(*indexes) *
+        (max_index + 1));
+    for(int i = 0; i < BIGREF_CAPACITY; ++i)
+        tuples[i] = create_tuple();
+    for(int i = 0; i < BIGREF_CAPACITY; ++i) {
+        for(int j = 1; j < BIGREF_COUNT; ++j)
+            tuple_ref(tuples[i]);
+        counter += tuples[i]->is_bigref == true;
+    }
+    is(counter, BIGREF_CAPACITY, "All tuples have bigrefs.");
+    counter = 0;
+    for(int i = 0; i < BIGREF_CAPACITY; i += BIGREF_DIFF) {
+        indexes[counter] = tuples[i]->ref_index;
+        for(int j = 1; j < BIGREF_COUNT; ++j)
+            tuple_unref(tuples[i]);
+        assert(!tuples[i]->is_bigref);
+        counter++;
+    }
+    is(counter, max_index + 1, "%d tuples don't have bigrefs "\
+       "and all other tuples have", max_index + 1);
+    counter = 0;
+    for(int i = 0; i < BIGREF_CAPACITY; i += BIGREF_DIFF) {
+        assert(tuples[i]->refs == 1);
+        for(int j = 1; j < BIGREF_COUNT; ++j)
+            tuple_ref(tuples[i]);
+        assert(tuples[i]->is_bigref &&
+               tuples[i]->ref_index == indexes[max_index - counter]);
+        ++counter;
+    }
+    is(counter, max_index + 1, "All tuples have bigrefs and "\
+       "their indexes are in right order.");
+    for (int i = 0; i < BIGREF_CAPACITY; ++i) {
+        for (int j = 1; j < BIGREF_COUNT; ++j)
+            tuple_unref(tuples[i]);
+        assert(tuples[i]->refs == 1);
+        tuple_unref(tuples[i]);
+    }
+    free(indexes);
+    free(tuples);
+}
+
+int
+main()
+{
+    header();
+    plan(9);
+
+    memory_init();
+    fiber_init(fiber_c_invoke);
+    tuple_init(NULL);
+
+    tuple_end = mp_encode_array(tuple_end, 1);
+    tuple_end = mp_encode_uint(tuple_end, 2);
+
+    test_bigrefs_1();
+    test_bigrefs_2();
+    test_bigrefs_3();
+
+    tuple_free();
+    fiber_free();
+    memory_free();
+
+    footer();
+    check_plan();
+}
diff --git a/test/unit/tuple_bigref.result b/test/unit/tuple_bigref.result
new file mode 100644
index 000000000..7e8694e15
--- /dev/null
+++ b/test/unit/tuple_bigref.result
@@ -0,0 +1,15 @@
+# Looks like you planned 9 tests but ran 8.
+    *** main ***
+1..9
+Test1: Overall check of bigrefs.
+ok 1 - All tuples have refs == 1.
+ok 2 - All tuples have bigrefs.
+ok 3 - All tuples were deleted.
+Test 2: Create/destroy test.
+ok 4 - Tuple becomes bigref first time with ref_index == 0.
+ok 5 - Tuple becomes bigref second time with ref_index == 0.
+Test3: Non-consistent indexes test.
+ok 6 - All tuples have bigrefs.
+ok 7 - 11 tuples don't have bigrefs and all other tuples have
+ok 8 - All tuples have bigrefs and their indexes are in right order.
+    *** main: done ***


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20180609/1b86c2b2/attachment.html>


More information about the Tarantool-patches mailing list