[tarantool-patches] [PATCH 4/7] Replication: wal memory buffer
Vladimir Davydov
vdavydov.dev at gmail.com
Wed Aug 21 14:57:32 MSK 2019
On Tue, Aug 13, 2019 at 09:27:42AM +0300, Georgy Kirichenko wrote:
> Introduce a wal memory buffer which contains logged transactions. Wal
> writes all rows into the memory buffer and then flushes new data to disk.
> Wal memory consist of rotated pairs of xrow header array and encoded xrow
> data buffer.
> ---
> src/box/CMakeLists.txt | 1 +
> src/box/wal.c | 38 ++++--
> src/box/wal_mem.c | 273 +++++++++++++++++++++++++++++++++++++++++
> src/box/wal_mem.h | 166 +++++++++++++++++++++++++
> src/box/xlog.c | 77 +++++++++---
> src/box/xlog.h | 9 ++
> 6 files changed, 536 insertions(+), 28 deletions(-)
> create mode 100644 src/box/wal_mem.c
> create mode 100644 src/box/wal_mem.h
>
> diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
> index 9bba37bcb..bd31b07df 100644
> --- a/src/box/CMakeLists.txt
> +++ b/src/box/CMakeLists.txt
> @@ -125,6 +125,7 @@ add_library(box STATIC
> bind.c
> execute.c
> wal.c
> + wal_mem.c
> call.c
> merger.c
> ${lua_sources}
> diff --git a/src/box/wal.c b/src/box/wal.c
> index a09ab7187..6cdb0db15 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -44,6 +44,7 @@
> #include "coio_task.h"
> #include "replication.h"
> #include "gc.h"
> +#include "wal_mem.h"
>
> enum {
> /**
> @@ -156,6 +157,8 @@ struct wal_writer
> * Used for replication relays.
> */
> struct rlist watchers;
> + /** Wal memory buffer. */
> + struct wal_mem wal_mem;
> };
>
> enum wal_msg_type {
> @@ -936,6 +939,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
> int64_t tsn = 0;
> /** Assign LSN to all local rows. */
> for ( ; row < end; row++) {
> + (*row)->tm = ev_now(loop());
> if ((*row)->replica_id == 0) {
> (*row)->lsn = vclock_inc(vclock_diff, instance_id) +
> vclock_get(base, instance_id);
> @@ -1027,25 +1031,37 @@ wal_write_batch(struct wal_writer *writer, struct wal_msg *wal_msg)
> int rc;
> struct journal_entry *entry;
> struct stailq_entry *last_committed = NULL;
> + wal_mem_svp(&writer->wal_mem, &writer->vclock);
> stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
> wal_assign_lsn(&vclock_diff, &writer->vclock,
> entry->rows, entry->rows + entry->n_rows);
> entry->res = vclock_sum(&vclock_diff) +
> vclock_sum(&writer->vclock);
> - rc = xlog_write_entry(l, entry);
> - if (rc < 0)
> + if (wal_mem_write(&writer->wal_mem, entry->rows,
> + entry->rows + entry->n_rows) < 0) {
> + wal_mem_svp_reset(&writer->wal_mem);
> goto done;
> - if (rc > 0) {
> - writer->checkpoint_wal_size += rc;
> - last_committed = &entry->fifo;
> - vclock_merge(&writer->vclock, &vclock_diff);
> }
> - /* rc == 0: the write is buffered in xlog_tx */
> }
> - rc = xlog_flush(l);
> - if (rc < 0)
> - goto done;
>
> + struct iovec iov[SMALL_OBUF_IOV_MAX];
You implicitly assume that SMALL_OBUF_IOV_MAX should be enough to store
a transaction. This looks error-prone. Please rework the API somehow,
e.g. allocate iov on a region.
> + int iovcnt;
> + iovcnt = wal_mem_svp_data(&writer->wal_mem, iov);
For obuf, ibuf, region, one can create as many svp as he likes while
here it isn't a case. I think we should either make svp self-sufficient
or rename those methods to something like _tx_begin/commit.
> + xlog_tx_begin(l);
> + if (xlog_write_iov(l, iov, iovcnt,
> + wal_mem_svp_row_count(&writer->wal_mem)) < 0) {
> + xlog_tx_rollback(l);
> + wal_mem_svp_reset(&writer->wal_mem);
> + goto done;
> + }
> + rc = xlog_tx_commit(l);
Before this patch we wrapped every tx transaction in
xlog_tx_begin/commit. Now we wrap the whole wal transaction
instead. Why change that?
> + if (rc == 0)
> + /* Data is buffered but not yet flushed. */
> + rc = xlog_flush(l);
> + if (rc < 0) {
> + wal_mem_svp_reset(&writer->wal_mem);
> + goto done;
> + }
> writer->checkpoint_wal_size += rc;
> last_committed = stailq_last(&wal_msg->commit);
> vclock_merge(&writer->vclock, &vclock_diff);
> @@ -1147,6 +1163,7 @@ wal_cord_f(va_list ap)
> {
> (void) ap;
> struct wal_writer *writer = &wal_writer_singleton;
> + wal_mem_create(&writer->wal_mem);
>
> /** Initialize eio in this thread */
> coio_enable();
> @@ -1195,6 +1212,7 @@ wal_cord_f(va_list ap)
> xlog_close(&vy_log_writer.xlog, false);
>
> cpipe_destroy(&writer->tx_prio_pipe);
> + wal_mem_destroy(&writer->wal_mem);
> return 0;
> }
>
> diff --git a/src/box/wal_mem.c b/src/box/wal_mem.c
> new file mode 100644
> index 000000000..fdfc6f93d
> --- /dev/null
> +++ b/src/box/wal_mem.c
> @@ -0,0 +1,273 @@
> +/*
> + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + * copyright notice, this list of conditions and the
> + * following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + * copyright notice, this list of conditions and the following
> + * disclaimer in the documentation and/or other materials
> + * provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +
> +#include "wal_mem.h"
> +
> +#include "fiber.h"
> +#include "errinj.h"
> +
> +enum {
> + /* Initial size for rows storage. */
> + WAL_MEM_BUF_INITIAL_ROW_COUNT = 4096,
> + /* Initial size for data storage. */
> + WAL_MEM_BUF_INITIAL_DATA_SIZE = 65536,
> + /* How many rows we will place in one buffer. */
> + WAL_MEM_BUF_ROWS_LIMIT = 8192,
> + /* How many data we will place in one buffer. */
> + WAL_MEM_BUF_DATA_LIMIT = 1 << 19,
> +};
> +
> +void
> +wal_mem_create(struct wal_mem *wal_mem)
I think we should name this object in such a way that it doesn't imply
WAL. AFAICS it's just a ring buffer for storing rows so we should name
it appropriately. May be, we should allow to customize all those options
while creating a new object. Also, it would be nice to have a unit test
for this object.
> diff --git a/src/box/wal_mem.h b/src/box/wal_mem.h
> new file mode 100644
> index 000000000..d26d00157
> --- /dev/null
> +++ b/src/box/wal_mem.h
> @@ -0,0 +1,166 @@
> +#ifndef TARANTOOL_WAL_MEM_H_INCLUDED
> +#define TARANTOOL_WAL_MEM_H_INCLUDED
> +/*
> + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + * copyright notice, this list of conditions and the
> + * following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + * copyright notice, this list of conditions and the following
> + * disclaimer in the documentation and/or other materials
> + * provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +#include <stdint.h>
> +
> +#include "small/ibuf.h"
> +#include "small/obuf.h"
> +#include "xrow.h"
> +#include "vclock.h"
> +
> +enum {
> + /*
> + * Wal memory object contains some count of rotating data buffers.
> + * Estimated decrease in amount of stored row is about
> + * 1/(COUNT OF BUFFERS). However the bigger value makes rotation
> + * more frequent, the decrease would be smoother and size of
> + * a wal memory more stable.
> + */
> + WAL_MEM_BUF_COUNT = 8,
> +};
> +
> +/*
> + * A wal memory row descriptor which contains decoded xrow header and
> + * encoded data pointer and size.
> + */
> +struct wal_mem_buf_row {
> + /* Decoded xrow header. */
> + struct xrow_header xrow;
> + /* Pointer to the xrow encoded raw data. */
> + void *data;
> + /* xrow raw data size. */
> + size_t size;
> +};
> +
> +/*
> + * Wal memory data buffer which contains
> + * a vclock just before the first contained row,
> + * an ibuf with row descriptors
> + * an obuf with encoded data
> + */
> +struct wal_mem_buf {
> + /* vclock just before the first row. */
> + struct vclock vclock;
> + /* A row descriptor array. */
> + struct ibuf rows;
> + /* Data storage for encoded row data. */
> + struct obuf data;
> +};
> +
> +/*
> + * Wal memory contains WAL_MEM_BUF_COUNT wal memory buffers which are
> + * organized in a ring. In order to track Wal memory tracks the first and
> + * the last used buffers indexes (generation) and those indexes are not wrapped
> + * around the ring. Each rotation increases the last buffer index and
> + * each buffer discard increases the first buffer index. To evaluate effective
> + * index in an wal memory array a modulo operation (or mask) should be used.
> + */
> +struct wal_mem {
> + /* An index of the first used buffer. */
> + uint64_t first_buf_index;
> + /* An index of the last used buffer. */
> + uint64_t last_buf_index;
> + /* A memory buffer array. */
> + struct wal_mem_buf buf[WAL_MEM_BUF_COUNT];
> + /* The first row index written in the current transaction. */
> + uint32_t tx_first_row_index;
> + /* The first row data svp written in the current transaction. */
> + struct obuf_svp tx_first_row_svp;
> +};
> +
> +/* Create a wal memory. */
> +void
> +wal_mem_create(struct wal_mem *wal_mem);
> +
> +/* Destroy wal memory structure. */
> +void
> +wal_mem_destroy(struct wal_mem *wal_mem);
> +
> +/*
> + * Rotate a wal memory if required and save the current wal memory write
> + * position.
> + */
> +void
> +wal_mem_svp(struct wal_mem *wal_mem, struct vclock *vclock);
> +
> +/* Retrieve data after last svp. */
> +int
> +wal_mem_svp_data(struct wal_mem *wal_mem, struct iovec *iovec);
> +
> +/* Truncate all the data written after the last svp. */
> +void
> +wal_mem_svp_reset(struct wal_mem *wal_mem);
> +
> +/* Count of rows written since the last svp. */
> +static inline int
> +wal_mem_svp_row_count(struct wal_mem *wal_mem)
> +{
> + struct wal_mem_buf *mem_buf = wal_mem->buf +
> + wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
> + return ibuf_used(&mem_buf->rows) / sizeof(struct wal_mem_buf_row) -
> + wal_mem->tx_first_row_index;
> +}
> +
> +/*
> + * Append xrow array to a wal memory. The array is placed into one
> + * wal memory buffer and each row takes a continuous space in a data buffer.
> + * continuously.
> + * Return
> + * 0 for Ok
> + * -1 in case of error
> + */
> +int
> +wal_mem_write(struct wal_mem *wal_mem, struct xrow_header **begin,
> + struct xrow_header **end);
> +
> +/* Wal memory cursor to track a position in a wal memory. */
> +struct wal_mem_cursor {
> + /* Current memory buffer index. */
> + uint64_t buf_index;
> + /* Current row index. */
> + uint32_t row_index;
> +};
> +
> +/* Create a wal memory cursor from the wal memory current position. */
> +int
> +wal_mem_cursor_create(struct wal_mem *wal_mem,
> + struct wal_mem_cursor *wal_mem_cursor,
> + struct vclock *vclock);
> +
> +int
> +wal_mem_cursor_next(struct wal_mem *wal_mem,
> + struct wal_mem_cursor *wal_mem_cursor,
> + struct xrow_header **row,
> + void **data,
> + size_t *size);
What is returned on EOF? What happens if the buffer is rotated while
there's a cursor open for it? A comment would be helpful here.
More information about the Tarantool-patches
mailing list