[tarantool-patches] [PATCH 4/7] Replication: wal memory buffer

Vladimir Davydov vdavydov.dev at gmail.com
Wed Aug 21 14:57:32 MSK 2019


On Tue, Aug 13, 2019 at 09:27:42AM +0300, Georgy Kirichenko wrote:
> Introduce a wal memory buffer which contains logged transactions. Wal
> writes all rows into the memory buffer and then flushes new data to disk.
> Wal memory consist of rotated pairs of xrow header array and encoded xrow
> data buffer.
> ---
>  src/box/CMakeLists.txt |   1 +
>  src/box/wal.c          |  38 ++++--
>  src/box/wal_mem.c      | 273 +++++++++++++++++++++++++++++++++++++++++
>  src/box/wal_mem.h      | 166 +++++++++++++++++++++++++
>  src/box/xlog.c         |  77 +++++++++---
>  src/box/xlog.h         |   9 ++
>  6 files changed, 536 insertions(+), 28 deletions(-)
>  create mode 100644 src/box/wal_mem.c
>  create mode 100644 src/box/wal_mem.h
> 
> diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
> index 9bba37bcb..bd31b07df 100644
> --- a/src/box/CMakeLists.txt
> +++ b/src/box/CMakeLists.txt
> @@ -125,6 +125,7 @@ add_library(box STATIC
>      bind.c
>      execute.c
>      wal.c
> +    wal_mem.c
>      call.c
>      merger.c
>      ${lua_sources}
> diff --git a/src/box/wal.c b/src/box/wal.c
> index a09ab7187..6cdb0db15 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -44,6 +44,7 @@
>  #include "coio_task.h"
>  #include "replication.h"
>  #include "gc.h"
> +#include "wal_mem.h"
>  
>  enum {
>  	/**
> @@ -156,6 +157,8 @@ struct wal_writer
>  	 * Used for replication relays.
>  	 */
>  	struct rlist watchers;
> +	/** Wal memory buffer. */
> +	struct wal_mem wal_mem;
>  };
>  
>  enum wal_msg_type {
> @@ -936,6 +939,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>  	int64_t tsn = 0;
>  	/** Assign LSN to all local rows. */
>  	for ( ; row < end; row++) {
> +		(*row)->tm = ev_now(loop());
>  		if ((*row)->replica_id == 0) {
>  			(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
>  				      vclock_get(base, instance_id);
> @@ -1027,25 +1031,37 @@ wal_write_batch(struct wal_writer *writer, struct wal_msg *wal_msg)
>  	int rc;
>  	struct journal_entry *entry;
>  	struct stailq_entry *last_committed = NULL;
> +	wal_mem_svp(&writer->wal_mem, &writer->vclock);
>  	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
>  		wal_assign_lsn(&vclock_diff, &writer->vclock,
>  			       entry->rows, entry->rows + entry->n_rows);
>  		entry->res = vclock_sum(&vclock_diff) +
>  			     vclock_sum(&writer->vclock);
> -		rc = xlog_write_entry(l, entry);
> -		if (rc < 0)
> +		if (wal_mem_write(&writer->wal_mem, entry->rows,
> +				  entry->rows + entry->n_rows) < 0) {
> +			wal_mem_svp_reset(&writer->wal_mem);
>  			goto done;
> -		if (rc > 0) {
> -			writer->checkpoint_wal_size += rc;
> -			last_committed = &entry->fifo;
> -			vclock_merge(&writer->vclock, &vclock_diff);
>  		}
> -		/* rc == 0: the write is buffered in xlog_tx */
>  	}
> -	rc = xlog_flush(l);
> -	if (rc < 0)
> -		goto done;
>  
> +	struct iovec iov[SMALL_OBUF_IOV_MAX];

You implicitly assume that SMALL_OBUF_IOV_MAX should be enough to store
a transaction. This looks error-prone. Please rework the API somehow,
e.g. allocate iov on a region.

> +	int iovcnt;
> +	iovcnt = wal_mem_svp_data(&writer->wal_mem, iov);

For obuf, ibuf, region, one can create as many svp as he likes while
here it isn't a case. I think we should either make svp self-sufficient
or rename those methods to something like _tx_begin/commit.

> +	xlog_tx_begin(l);
> +	if (xlog_write_iov(l, iov, iovcnt,
> +			   wal_mem_svp_row_count(&writer->wal_mem)) < 0) {
> +		xlog_tx_rollback(l);
> +		wal_mem_svp_reset(&writer->wal_mem);
> +		goto done;
> +	}
> +	rc = xlog_tx_commit(l);

Before this patch we wrapped every tx transaction in
xlog_tx_begin/commit. Now we wrap the whole wal transaction
instead. Why change that?

> +	if (rc == 0)
> +		/* Data is buffered but not yet flushed. */
> +		rc = xlog_flush(l);
> +	if (rc < 0) {
> +		wal_mem_svp_reset(&writer->wal_mem);
> +		goto done;
> +	}
>  	writer->checkpoint_wal_size += rc;
>  	last_committed = stailq_last(&wal_msg->commit);
>  	vclock_merge(&writer->vclock, &vclock_diff);
> @@ -1147,6 +1163,7 @@ wal_cord_f(va_list ap)
>  {
>  	(void) ap;
>  	struct wal_writer *writer = &wal_writer_singleton;
> +	wal_mem_create(&writer->wal_mem);
>  
>  	/** Initialize eio in this thread */
>  	coio_enable();
> @@ -1195,6 +1212,7 @@ wal_cord_f(va_list ap)
>  		xlog_close(&vy_log_writer.xlog, false);
>  
>  	cpipe_destroy(&writer->tx_prio_pipe);
> +	wal_mem_destroy(&writer->wal_mem);
>  	return 0;
>  }
>  
> diff --git a/src/box/wal_mem.c b/src/box/wal_mem.c
> new file mode 100644
> index 000000000..fdfc6f93d
> --- /dev/null
> +++ b/src/box/wal_mem.c
> @@ -0,0 +1,273 @@
> +/*
> + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +
> +#include "wal_mem.h"
> +
> +#include "fiber.h"
> +#include "errinj.h"
> +
> +enum {
> +	/* Initial size for rows storage. */
> +	WAL_MEM_BUF_INITIAL_ROW_COUNT = 4096,
> +	/* Initial size for data storage. */
> +	WAL_MEM_BUF_INITIAL_DATA_SIZE = 65536,
> +	/* How many rows we will place in one buffer. */
> +	WAL_MEM_BUF_ROWS_LIMIT = 8192,
> +	/* How many data we will place in one buffer. */
> +	WAL_MEM_BUF_DATA_LIMIT = 1 << 19,
> +};
> +
> +void
> +wal_mem_create(struct wal_mem *wal_mem)

I think we should name this object in such a way that it doesn't imply
WAL. AFAICS it's just a ring buffer for storing rows so we should name
it appropriately. May be, we should allow to customize all those options
while creating a new object. Also, it would be nice to have a unit test
for this object.

> diff --git a/src/box/wal_mem.h b/src/box/wal_mem.h
> new file mode 100644
> index 000000000..d26d00157
> --- /dev/null
> +++ b/src/box/wal_mem.h
> @@ -0,0 +1,166 @@
> +#ifndef TARANTOOL_WAL_MEM_H_INCLUDED
> +#define TARANTOOL_WAL_MEM_H_INCLUDED
> +/*
> + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +#include <stdint.h>
> +
> +#include "small/ibuf.h"
> +#include "small/obuf.h"
> +#include "xrow.h"
> +#include "vclock.h"
> +
> +enum {
> +	/*
> +	 * Wal memory object contains some count of rotating data buffers.
> +	 * Estimated decrease in amount of stored row is about
> +	 * 1/(COUNT OF BUFFERS). However the bigger value makes rotation
> +	 * more frequent, the decrease would be smoother and size of
> +	 * a wal memory more stable.
> +	 */
> +	WAL_MEM_BUF_COUNT = 8,
> +};
> +
> +/*
> + * A wal memory row descriptor which contains decoded xrow header and
> + * encoded data pointer and size.
> + */
> +struct wal_mem_buf_row {
> +	/* Decoded xrow header. */
> +	struct xrow_header xrow;
> +	/* Pointer to the xrow encoded raw data. */
> +	void *data;
> +	/* xrow raw data size. */
> +	size_t size;
> +};
> +
> +/*
> + * Wal memory data buffer which contains
> + *  a vclock just before the first contained row,
> + *  an ibuf with row descriptors
> + *  an obuf with encoded data
> + */
> +struct wal_mem_buf {
> +	/* vclock just before the first row. */
> +	struct vclock vclock;
> +	/* A row descriptor array. */
> +	struct ibuf rows;
> +	/* Data storage for encoded row data. */
> +	struct obuf data;
> +};
> +
> +/*
> + * Wal memory contains WAL_MEM_BUF_COUNT wal memory buffers which are
> + * organized in a ring. In order to track Wal memory tracks the first and
> + * the last used buffers indexes (generation) and those indexes are not wrapped
> + * around the ring. Each rotation increases the last buffer index and
> + * each buffer discard increases the first buffer index. To evaluate effective
> + * index in an wal memory array a modulo operation (or mask) should be used.
> + */
> +struct wal_mem {
> +	/* An index of the first used buffer. */
> +	uint64_t first_buf_index;
> +	/* An index of the last used buffer. */
> +	uint64_t last_buf_index;
> +	/* A memory buffer array. */
> +	struct wal_mem_buf buf[WAL_MEM_BUF_COUNT];
> +	/* The first row index written in the current transaction. */
> +	uint32_t tx_first_row_index;
> +	/* The first row data svp written in the current transaction. */
> +	struct obuf_svp tx_first_row_svp;
> +};
> +
> +/* Create a wal memory. */
> +void
> +wal_mem_create(struct wal_mem *wal_mem);
> +
> +/* Destroy wal memory structure. */
> +void
> +wal_mem_destroy(struct wal_mem *wal_mem);
> +
> +/*
> + * Rotate a wal memory if required and save the current wal memory write
> + * position.
> + */
> +void
> +wal_mem_svp(struct wal_mem *wal_mem, struct vclock *vclock);
> +
> +/* Retrieve data after last svp. */
> +int
> +wal_mem_svp_data(struct wal_mem *wal_mem, struct iovec *iovec);
> +
> +/* Truncate all the data written after the last svp. */
> +void
> +wal_mem_svp_reset(struct wal_mem *wal_mem);
> +
> +/* Count of rows written since the last svp. */
> +static inline int
> +wal_mem_svp_row_count(struct wal_mem *wal_mem)
> +{
> +	struct wal_mem_buf *mem_buf = wal_mem->buf +
> +				      wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
> +	return ibuf_used(&mem_buf->rows) / sizeof(struct wal_mem_buf_row) -
> +	       wal_mem->tx_first_row_index;
> +}
> +
> +/*
> + * Append xrow array to a wal memory. The array is placed into one
> + * wal memory buffer and each row takes a continuous space in a data buffer.
> + * continuously.
> + * Return
> + *  0 for Ok
> + *  -1 in case of error
> + */
> +int
> +wal_mem_write(struct wal_mem *wal_mem, struct xrow_header **begin,
> +	      struct xrow_header **end);
> +
> +/* Wal memory cursor to track a position in a wal memory. */
> +struct wal_mem_cursor {
> +	/* Current memory buffer index. */
> +	uint64_t buf_index;
> +	/* Current row index. */
> +	uint32_t row_index;
> +};
> +
> +/* Create a wal memory cursor from the wal memory current position. */
> +int
> +wal_mem_cursor_create(struct wal_mem *wal_mem,
> +		      struct wal_mem_cursor *wal_mem_cursor,
> +		      struct vclock *vclock);
> +
> +int
> +wal_mem_cursor_next(struct wal_mem *wal_mem,
> +		    struct wal_mem_cursor *wal_mem_cursor,
> +		    struct xrow_header **row,
> +		    void **data,
> +		    size_t *size);

What is returned on EOF? What happens if the buffer is rotated while
there's a cursor open for it? A comment would be helpful here.



More information about the Tarantool-patches mailing list