[Tarantool-patches] [tarantool-patches] [PATCH v3 2/4] wal: xrow buffer structure

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Oct 22 02:06:38 MSK 2019


Thanks for the patch!

See 20 comments below.

On 09/10/2019 18:45, Georgy Kirichenko wrote:
> Introduce a xrow buffer to store encoded xrows in memory after
> transaction was finished. Wal uses an xrow buffer object in
> order to encode transactions and then writes encoded data
> to a log file so encoded data still lives in memory for
> some time after a transaction is finished and cleared by an engine.
> Xrow buffer consist of not more than XROW_BUF_CHUNK_COUNT rotating
> chunks organized in a ring. Rotation thresholds and
> XROW_BUF_CHUNK_COUNT are empiric values and were hardcoded.
> 
> Part of #3794
> ---
>  src/box/CMakeLists.txt |   1 +
>  src/box/wal.c          |  47 ++++++-
>  src/box/xlog.c         |  57 ++++++---
>  src/box/xlog.h         |  16 ++-
>  src/box/xrow_buf.c     | 285 +++++++++++++++++++++++++++++++++++++++++
>  src/box/xrow_buf.h     | 160 +++++++++++++++++++++++
>  6 files changed, 540 insertions(+), 26 deletions(-)
>  create mode 100644 src/box/xrow_buf.c
>  create mode 100644 src/box/xrow_buf.h
> 
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 9219d6779..63e0c728d 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -156,6 +157,14 @@ struct wal_writer
>  	 * Used for replication relays.
>  	 */
>  	struct rlist watchers;
> +	/**
> +	 * In-memory WAl write buffer used to encode transaction rows and

1. Is 'l' a typo? Probably you meant 'L', a capital letter?

> +	 * write them to an xlog file. An in-memory buffer allows us to
> +	 * preserve xrows after transaction processing was finished.
> +	 * This buffer will be used by replication to fetch xrows from memory
> +	 * without xlog files access.
> +	 */
> +	struct xrow_buf xrow_buf;
>  };
>  
>  struct wal_msg {
> @@ -933,6 +942,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>  	int64_t tsn = 0;
>  	/** Assign LSN to all local rows. */
>  	for ( ; row < end; row++) {
> +		(*row)->tm = ev_now(loop());

2. Why do you need it? You said, that your patch is pure
refactoring about keeping xlogs in memory for a while, but
this looks like a functional change.

>  		if ((*row)->replica_id == 0) {
>  			(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
>  				      vclock_get(base, instance_id);
> @@ -1016,25 +1026,52 @@ wal_write_to_disk(struct cmsg *msg)
>  	int rc;
>  	struct journal_entry *entry;
>  	struct stailq_entry *last_committed = NULL;
> +	/* Start a wal memory buffer transaction. */
> +	xrow_buf_tx_begin(&writer->xrow_buf, &writer->vclock);
>  	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
>  		wal_assign_lsn(&vclock_diff, &writer->vclock,
>  			       entry->rows, entry->rows + entry->n_rows);
>  		entry->res = vclock_sum(&vclock_diff) +
>  			     vclock_sum(&writer->vclock);
> -		rc = xlog_write_entry(l, entry);
> -		if (rc < 0)
> +		struct iovec *iov;
> +		int iovcnt = xrow_buf_write(&writer->xrow_buf, entry->rows,
> +					    entry->rows + entry->n_rows, &iov);
> +		if (iovcnt < 0) {
> +			xrow_buf_tx_rollback(&writer->xrow_buf);
> +			goto done;
> +		}
> +		xlog_tx_begin(l);
> +		if (xlog_write_iov(l, iov, iovcnt, entry->n_rows) < 0) {
> +			xlog_tx_rollback(l);
> +			xrow_buf_tx_rollback(&writer->xrow_buf);
> +			goto done;
> +		}
> +		rc = xlog_tx_commit(l);

3. Sorry, Kostja is right (in not understanding how these
transactions are related). These xlog/xbuf simultaneous
begin/commit really confuse. Previously xlog transactions
were encapsulated inside xlog_write_entry. Could you please
do something similar here to hide them?

For example, add a function xlog_write_iov_tx(), which would
internally call xlog_tx_begin()/rollback(). It would really
help.

> +		if (rc < 0) {
> +			/* Failed write. */
> +			xrow_buf_tx_rollback(&writer->xrow_buf);
>  			goto done;
> +		}
>  		if (rc > 0) {
> +			/*
> +			 * Data flushed to disk, start a new memory
> +			 * buffer transaction
> +			 */
>  			writer->checkpoint_wal_size += rc;
>  			last_committed = &entry->fifo;
>  			vclock_merge(&writer->vclock, &vclock_diff);
> +			xrow_buf_tx_commit(&writer->xrow_buf);
> +			xrow_buf_tx_begin(&writer->xrow_buf, &writer->vclock);
>  		}
>  		/* rc == 0: the write is buffered in xlog_tx */
>  	}
> diff --git a/src/box/xlog.c b/src/box/xlog.c
> index 8254cce20..2fcf7f0df 100644
> --- a/src/box/xlog.c
> +++ b/src/box/xlog.c
> @@ -1275,14 +1275,8 @@ xlog_tx_write(struct xlog *log)
>  	return written;
>  }
>  
> -/*
> - * Add a row to a log and possibly flush the log.
> - *
> - * @retval  -1 error, check diag.
> - * @retval >=0 the number of bytes written to buffer.
> - */
> -ssize_t
> -xlog_write_row(struct xlog *log, const struct xrow_header *packet)
> +static int
> +xlog_write_prepare(struct xlog *log)

4. Why do you need this function alone? Looks like an
artifact from the previous version, when it was used in
two places. Now you can inline it back to xlog_write_iov().

>  {
>  	/*
>  	 * Automatically reserve space for a fixheader when adding
> @@ -1296,17 +1290,20 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
>  			return -1;
>  		}
>  	}
> +	return 0;
> +}
>  
> -	struct obuf_svp svp = obuf_create_svp(&log->obuf);
> -	size_t page_offset = obuf_size(&log->obuf);
> -	/** encode row into iovec */
> -	struct iovec iov[XROW_IOVMAX];
> -	/** don't write sync to the disk */
> -	int iovcnt = xrow_header_encode(packet, 0, iov, 0);
> -	if (iovcnt < 0) {
> -		obuf_rollback_to_svp(&log->obuf, &svp);
> +/*

5. Please, use /** in comments out of function bodies.
We use doxygen comment format, it is its syntax.
Here and in all other places, and other commits.

> + * Write an xrow containing iovec to a xlog.
> + */
> +ssize_t
> +xlog_write_iov(struct xlog *log, struct iovec *iov, int iovcnt, int row_count)
> +{
> @@ -1325,16 +1322,34 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
> +/*
> + * Add a row to a log and possibly flush the log.
> + *
> + * @retval  -1 error, check diag.
> + * @retval >=0 the number of bytes written to buffer.
> + */
> +ssize_t
> +xlog_write_row(struct xlog *log, const struct xrow_header *packet)
> +{
> +	/** encode row into an iovec */
> +	struct iovec iov[XROW_IOVMAX];
> +	/** don't write sync to the disk */

6. I understand, that this is copypaste from the
old code, but please, use capital letters to start a
sentence, and use /* comment starter inside function
bodies. Here and in all other places, and other commits.

> +	int iovcnt = xrow_header_encode(packet, 0, iov, 0);
> +	if (iovcnt < 0)
> +		return -1;
> +	assert(iovcnt <= XROW_IOVMAX);
> +	return xlog_write_iov(log, iov, iovcnt, 1);
>  }
>  
>  /**
> diff --git a/src/box/xlog.h b/src/box/xlog.h
> index a48b05fc4..aa3c14d84 100644
> --- a/src/box/xlog.h
> +++ b/src/box/xlog.h
> @@ -493,7 +493,7 @@ ssize_t
>  xlog_fallocate(struct xlog *log, size_t size);
>  
>  /**
> - * Write a row to xlog, 
> + * Write a row into a xlog,

7. Stray change. Please, discard.

>   *
>   * @retval count of writen bytes
>   * @retval -1 for error
> diff --git a/src/box/xrow_buf.c b/src/box/xrow_buf.c
> new file mode 100644
> index 000000000..e4455e01a
> --- /dev/null
> +++ b/src/box/xrow_buf.c
> @@ -0,0 +1,285 @@
> +/*
> + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +
> +#include "xrow_buf.h"
> +#include "fiber.h"
> +#include "errinj.h"
> +
> +/* Xrow buffer chunk options (empirical values). */
> +enum {
> +	/* Chunk row info array capacity increment */
> +	XROW_BUF_CHUNK_CAPACITY_INCREMENT = 16384,
> +	/* Initial size for raw data storage. */
> +	XROW_BUF_CHUNK_INITIAL_DATA_SIZE = 65536,
> +	/* How many rows we will place in one buffer. */
> +	XROW_BUF_CHUNK_ROW_COUNT_THRESHOLD = 8192,
> +	/* How many data we will place in one buffer. */
> +	XROW_BUF_CHUNK_DATA_SIZE_THRESHOLD = 1 << 19,
> +};
> +
> +
> +/*
> + * Save the current xrow buffer chunk state wich consists of two

8. wich -> which.

> + * values index and position where the next row header and raw data
> + * would be placed. This state is used to track the next
> + * transaction starting boundary.
> + */
> +static inline void
> +xrow_buf_save_state(struct xrow_buf *xrow_buf)
> +{
> +	struct xrow_buf_chunk *chunk = xrow_buf->chunk +
> +		xrow_buf->last_chunk_index % XROW_BUF_CHUNK_COUNT;
> +	/* Save the current xrow buffer state. */
> +	xrow_buf->tx_first_row_index = chunk->row_count;
> +	xrow_buf->tx_first_row_svp = obuf_create_svp(&chunk->data);
> +}
> +
> +void
> +xrow_buf_destroy(struct xrow_buf *xrow_buf)
> +{
> +	for (int i = 0; i < XROW_BUF_CHUNK_COUNT; ++i) {
> +		if (xrow_buf->chunk[i].row_info_capacity > 0)
> +			slab_put(&cord()->slabc,
> +				 slab_from_data(xrow_buf->chunk[i].row_info));

9. Please, use {} for code blocks consisting of more than
1 line.

> +		obuf_destroy(&xrow_buf->chunk[i].data);
> +	}
> +}
> +
> +void
> +xrow_buf_tx_begin(struct xrow_buf *xrow_buf, const struct vclock *vclock)
> +{
> +	/*
> +	 * Xrow buffer fits a transaction in one chunk and does not

10. A verb is missing. 'Does not do chunk rotation', or
'Does not rotate a chunk'.

> +	 * chunk rotation while transaction is in progress. So check
> +	 * current chunk thresholds and rotate if required.
> +	 */
> +	struct xrow_buf_chunk *chunk = xrow_buf_rotate(xrow_buf);
> +	/*
> +	 * Check if the current chunk is empty and a vclock for
> +	 * the chunk should be set.
> +	 */
> +	if (chunk->row_count == 0)
> +		vclock_copy(&chunk->vclock, vclock);
> +}
> +
> +int
> +xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
> +	       struct xrow_header **end,
> +	       struct iovec **iovec)
> +{
> +	struct xrow_buf_chunk *chunk = xrow_buf->chunk +
> +		xrow_buf->last_chunk_index % XROW_BUF_CHUNK_COUNT;
> +
> +	/* Save a data buffer svp to restore the buffer in case of an error. */
> +	struct obuf_svp data_svp = obuf_create_svp(&chunk->data);
> +
> +	size_t row_count = chunk->row_count + (end - begin);
> +	/* Allocate space for new row information members if required. */
> +	if (row_count > chunk->row_info_capacity) {
> +		/* Round allocation up to XROW_BUF_CHUNK_CAPACITY_INCREMENT. */
> +		uint32_t capacity = XROW_BUF_CHUNK_CAPACITY_INCREMENT *
> +				    ((row_count +
> +				      XROW_BUF_CHUNK_CAPACITY_INCREMENT - 1) /
> +				     XROW_BUF_CHUNK_CAPACITY_INCREMENT);
> +
> +		struct slab *row_info_slab =
> +			slab_get(&cord()->slabc,
> +				 sizeof(struct xrow_buf_row_info) * capacity);
> +		if (row_info_slab == NULL) {
> +			diag_set(OutOfMemory, capacity *
> +					      sizeof(struct xrow_buf_row_info),
> +				 "region", "row info array");
> +			goto error;
> +		}
> +		if (chunk->row_info_capacity > 0) {
> +			memcpy(slab_data(row_info_slab), chunk->row_info,
> +			       sizeof(struct xrow_buf_row_info) *
> +			       chunk->row_count);
> +			slab_put(&cord()->slabc,
> +				 slab_from_data(chunk->row_info));

11. Oh God, what is that? It is even more intricate and
complex than the previous version with ibuf. Again - why
not to use a normal struct xrow_buf_row_info* array on the
simple heap? Have you done benchmarks showing that this
slab struggling gives any perf increase?

> +		}
> +		chunk->row_info = slab_data(row_info_slab);
> +		chunk->row_info_capacity = capacity;
> +	}
> diff --git a/src/box/xrow_buf.h b/src/box/xrow_buf.h
> new file mode 100644
> index 000000000..266cc0f76
> --- /dev/null
> +++ b/src/box/xrow_buf.h
> @@ -0,0 +1,160 @@
> +#ifndef TARANTOOL_XROW_BUF_H_INCLUDED
> +#define TARANTOOL_XROW_BUF_H_INCLUDED
> +/*
> + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +#include <stdint.h>
> +
> +#include "small/obuf.h"
> +#include "small/rlist.h"

12. Why do you need rlist.h?

> +#include "xrow.h"
> +#include "vclock.h"
> +
> +enum {
> +	/*8

13. 8 -> *.

> +	 * Xrow buffer contains some count of rotating data chunks.
> +	 * Every rotation has an estimated decrease in amount of
> +	 * stored rows at 1/(COUNT OF CHUNKS). However the bigger
> +	 * value makes rotation more frequent, the decrease would be
> +	 * smoother and size of a xrow buffer more stable.
> +	 */
> +	XROW_BUF_CHUNK_COUNT = 16,
> +};
> +
> +/**
> + * Xrow_info structure used to describe a row stored in a xrow
> + * buffer. Xrow info contains an xrow_header structure, pointer
> + * and size of the row_header encoded representation. Xrow header
> + * allows to filter rows by replica_id, lsn or replication group
> + * while encoded representation could be used to write xrow 

14. Leading whitespace in the end of the line above.

> + * without any further encoding.
> + */
> +struct xrow_buf_row_info {
> +	/** Stored row header. */
> +	struct xrow_header xrow;
> +	/** Pointer to row encoded raw data. */
> +	void *data;
> +	/** Row encoded raw data size. */
> +	size_t size;
> +};
> +
> +/**
> + * Xrow buffer data chunk structure is used to store a continuous
> + * sequence of xrow headers written to a xrow buffer. Xrow buffer data
> + * chunk contains a vclock of the last row just before the first row
> + * stored in the chunk, count of rows, its encoded raw data, and array of

15. vclock of the last row just before the first row? Sorry,
I didn't understand. Could you please rephrase?

> + * stored row info. Vclock is used to track stored vclock lower boundary.
> + */
> +struct xrow_buf_chunk {
> +	/** Vclock just before the first row in this chunk. */
> +	struct vclock vclock;
> +	/** Count of stored rows. */
> +	size_t row_count;
> +	/** Stored rows information array. */
> +	struct xrow_buf_row_info *row_info;
> +	/** Capacity of stored rows information array. */
> +	size_t row_info_capacity;
> +	/** Data storage for encoded rows data. */
> +	struct obuf data;
> +};
> +
> +/**
> + * Xrow buffer enables to encode and store some continuous sequence
> + * of xrows (both headers and binary encoded representation).
> + * Storage organized as a range of globally indexed chunks. New rows
> + * are appended to the last one chunk (the youngest one). If the last
> + * chunk is already full then a new chunk will be used. Xrow_buffer
> + * maintains not more than XROW_BUF_CHUNK_COUNT chunks so when the buffer
> + * is already full then a first one chunk should be discarded before a
> + * new one could be used. All chunks are organized in a ring which is
> + * XROW_BUF_CHUNK_COUNT the size so a chunk in-ring index could be
> + * evaluated from the chunk global index with the modulo operation.
> + */
> +struct xrow_buf {
> +	/** Global index of the first used chunk (the oldest one). */
> +	size_t first_chunk_index;
> +	/** Global index of the last used chunk (the youngest one). */
> +	size_t last_chunk_index;
> +	/** Ring -array containing chunks . */

16. -array? Something is missing.

> +	struct xrow_buf_chunk chunk[XROW_BUF_CHUNK_COUNT];
> +	/**
> +	 * A xrow_buf transaction is recorded in one chunk only.
> +	 * When transaction is started current row count and data
> +	 * buffer svp from the current chunk (which is the last one)
> +	 * are remembered in order to be able to restore the chunk
> +	 * state in case of rollback.
> +	 */
> +	struct {

17. Why do you need these two members inside an anon struct?

> +		/** The current transaction first row index. */
> +		uint32_t tx_first_row_index;

18. I would rather call it saved_row_count and saved_obuf. You
never use `tx_first_row_index` as an index of anything. You
always just assign it from row_count, and restore row_count
from it back. In the next commits too. These two attributes
are just a savepoint, and nothing more.

I mean, take obuf_svp for example. It has 3 simple members,
which are named the same as the original members of obuf:
pos, iov_len, used. They are not named 'obuf_first_alloc_pos',
'obuf_first_iov_len', 'obuf_first_alloc_pos_used'.

The simpler the name the easier it is understand what it
stands for. For the savepoint just take the original names
as is. Maybe with a simple prefix like 'saved_'.

> +		/** The current transaction encoded data start svp. */
> +		struct obuf_svp tx_first_row_svp;
> +	};
> +};
> +
> +/** Create a wal memory. */
> +void
> +xrow_buf_create(struct xrow_buf *xrow_buf);
> +
> +/** Destroy wal memory structure. */
> +void
> +xrow_buf_destroy(struct xrow_buf *xrow_buf);
> +
> +/**
> + * Begin a xrow buffer transaction. This function may rotate the
> + * last one data chunk and use the vclock parameter as a new chunk
> + * starting vclock.
> + */
> +void
> +xrow_buf_tx_begin(struct xrow_buf *xrow_buf, const struct vclock *vclock);
> +
> +/** Discard all the data was written after the last transaction. */

19. 'the data written', without 'was'.

> +void
> +xrow_buf_tx_rollback(struct xrow_buf *xrow_buf);
> +
> +/** Commit a xrow buffer transaction. */
> +void
> +xrow_buf_tx_commit(struct xrow_buf *xrow_buf);
> +
> +/**
> + * Append an xrow array to a wal memory. The array is placed into
> + * one xrow buffer data chunk and each row takes a continuous
> + * space in a data buffer. Raw encoded data is placed onto
> + * gc-allocated iovec array.
> + *
> + * @retval count of written iovec members for success
> + * @retval -1 in case of error
> + */
> +int
> +xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
> +	       struct xrow_header **end,
> +	       struct iovec **iovec);

20. `iovec` and `end` arguments fit in one line.

> +
> +#endif /* TARANTOOL_XROW_BUF_H_INCLUDED */
> 


More information about the Tarantool-patches mailing list