[Tarantool-patches] [tarantool-patches] [PATCH v3 3/4] wal: xrow buffer cursor

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Oct 22 02:06:57 MSK 2019


Thanks for the patch!

See 5 comments below.

On 09/10/2019 18:45, Georgy Kirichenko wrote:
> This structure enables to find a xrow buffer row less than given vclock
> and then fetch row by row from the xrow forwards to the last appended one.
> A xrow buffer cursor is essential to allow the from memory replication
> because of a relay which required to be able to fetch all logged rows,
> stored in a wal memory (implemented as xrow buffer), from given position
> and then follow all new changes.
> 
> Part of #3794
> ---
>  src/box/xrow_buf.c | 97 ++++++++++++++++++++++++++++++++++++++++++++++
>  src/box/xrow_buf.h | 38 ++++++++++++++++++
>  2 files changed, 135 insertions(+)
> 
> diff --git a/src/box/xrow_buf.c b/src/box/xrow_buf.c
> index e4455e01a..a5bf1efed 100644
> --- a/src/box/xrow_buf.c
> +++ b/src/box/xrow_buf.c
> @@ -283,3 +283,100 @@ error:
>  	return -1;
>  }
>  
> +/*
> + * Returns an index of the first row after given vclock
> + * in a chunk.
> + */
> +static int
> +xrow_buf_chunk_locate_vclock(struct xrow_buf_chunk *chunk,
> +			     struct vclock *vclock)
> +{
> +	for (uint32_t row_index = 0; row_index < chunk->row_count;
> +	     ++row_index) {
> +		struct xrow_header *row = &chunk->row_info[row_index].xrow;
> +		if (vclock_get(vclock, row->replica_id) < row->lsn)
> +			return row_index;

1. Is it possible to use binary search by lsn here?

> +	}
> +	/*
> +	 * We did not find any row with vclock not less than
> +	 * given one so return an index just after the last one.
> +	 */
> +	return chunk->row_count;
> +}
> +
> +int
> +xrow_buf_cursor_create(struct xrow_buf *xrow_buf,
> +		       struct xrow_buf_cursor *xrow_buf_cursor,
> +		       struct vclock *vclock)
> +{
> +	/* Check if a buffer has requested data. */
> +	struct xrow_buf_chunk *chunk =
> +			xrow_buf->chunk + xrow_buf->first_chunk_index %
> +					  XROW_BUF_CHUNK_COUNT;
> +	int rc = vclock_compare(&chunk->vclock, vclock);
> +	if (rc >= 0 || rc == VCLOCK_ORDER_UNDEFINED) {
> +		/* The requested data was discarded. */
> +		return -1;
> +	}
> +	uint32_t index = xrow_buf->first_chunk_index;
> +	while (index < xrow_buf->last_chunk_index) {
> +		chunk = xrow_buf->chunk + (index + 1) % XROW_BUF_CHUNK_COUNT;
> +		int rc = vclock_compare(&chunk->vclock, vclock);
> +		if (rc > 0 || rc == VCLOCK_ORDER_UNDEFINED) {
> +			/* Next chunk has younger rows than requested vclock. */
> +			break;
> +		}
> +		++index;
> +	}
> +	chunk = xrow_buf->chunk + (index) % XROW_BUF_CHUNK_COUNT;
> +	xrow_buf_cursor->chunk_index = index;
> +	xrow_buf_cursor->row_index = xrow_buf_chunk_locate_vclock(chunk, vclock);
> +	return 0;
> +}
> +
> +int
> +xrow_buf_cursor_next(struct xrow_buf *xrow_buf,
> +		     struct xrow_buf_cursor *xrow_buf_cursor,
> +		     struct xrow_header **row, void **data, size_t *size)
> +{
> +	if (xrow_buf->first_chunk_index > xrow_buf_cursor->chunk_index) {
> +		/* A cursor current chunk was discarded by a buffer. */
> +		return -1;
> +	}
> +
> +	struct xrow_buf_chunk *chunk;
> +next_chunk:
> +	chunk = xrow_buf->chunk + xrow_buf_cursor->chunk_index %
> +				  XROW_BUF_CHUNK_COUNT;
> +	size_t chunk_row_count = chunk->row_count;

2. Why do you need this variable? It is used in one place
in the next line.

> +	if (chunk_row_count == xrow_buf_cursor->row_index) {
> +		/*
> +		 * No more rows in a buffer but there are two
> +		 * possibilities:
> +		 *  1. A cursor current chunk is the last one and there is
> +		 * no more rows in the cursor.
> +		 *  2. There is a chunk after the current one
> +		 * so we can switch to it.
> +		 * */
> +		if (xrow_buf->last_chunk_index ==
> +		    xrow_buf_cursor->chunk_index) {
> +			/*
> +			 * The current chunk is the last one -
> +			 * no more rows in a buffer.
> +			 */
> +			return 1;
> +		}
> +		/* Switch to the next chunk. */
> +		xrow_buf_cursor->row_index = 0;
> +		++xrow_buf_cursor->chunk_index;
> +		goto next_chunk;
> +	}
> +	/* Return row header and data pointers and data size. */
> +	struct xrow_buf_row_info *row_info = chunk->row_info +
> +					     xrow_buf_cursor->row_index;
> +	*row = &row_info->xrow;
> +	*data = row_info->data;
> +	*size = row_info->size;
> +	++xrow_buf_cursor->row_index;
> +	return 0;
> +}
> diff --git a/src/box/xrow_buf.h b/src/box/xrow_buf.h
> index 266cc0f76..c2de2b5a5 100644
> --- a/src/box/xrow_buf.h
> +++ b/src/box/xrow_buf.h
> @@ -157,4 +157,42 @@ xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
>  	       struct xrow_header **end,
>  	       struct iovec **iovec);
>  
> +/**
> + * Xrow buffer cursor used to search a position in a buffer
> + * and then fetch rows one by one from the postion toward the
> + * buffer last append row.
> + */
> +struct xrow_buf_cursor {
> +	/** Current chunk global index. */
> +	uint32_t chunk_index;
> +	/** Row index in the current chunk. */
> +	uint32_t row_index;
> +};

3. Have you considered storing xrow_buf pointer inside the
cursor? Usually iterators contain a reference at source of
their data. In your case the API allows to create a cursor
for one xrow_buf, but iterate over another.

> +
> +/**
> + * Create a xrow buffer cursor and set it's position to
> + * the first row after passed vclock value.
> + *
> + * @retval 0 cursor was created
> + * @retval -1 if a vclock was discarded
> + */
> +int
> +xrow_buf_cursor_create(struct xrow_buf *xrow_buf,
> +		       struct xrow_buf_cursor *xrow_buf_cursor,
> +		       struct vclock *vclock);

4. Looks like vclock pointer can be declared as const.

> +
> +/**
> + * Fetch next row from a xrow buffer cursor and return the row
> + * header and encoded data pointers as well as encoded data size
> + * in the corresponding parameters.
> + *
> + * @retval 0 in case of success
> + * @retval 1 if there is no more rows in a buffer
> + * @retval -1 if this cursor postion was discarded by xrow buffer

5. 'postion' -> 'position'.

> + */
> +int
> +xrow_buf_cursor_next(struct xrow_buf *xrow_buf,
> +		     struct xrow_buf_cursor *xrow_buf_cursor,
> +		     struct xrow_header **row, void **data, size_t *size);
> +
>  #endif /* TARANTOOL_XROW_BUF_H_INCLUDED */
> 


More information about the Tarantool-patches mailing list