From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Wed, 21 Aug 2019 14:57:32 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] [PATCH 4/7] Replication: wal memory buffer Message-ID: <20190821115732.GB13834@esperanza> References: <081517a5f603f55c7e492871ada4159ba6dae910.1565676868.git.georgy@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <081517a5f603f55c7e492871ada4159ba6dae910.1565676868.git.georgy@tarantool.org> To: Georgy Kirichenko Cc: tarantool-patches@freelists.org List-ID: On Tue, Aug 13, 2019 at 09:27:42AM +0300, Georgy Kirichenko wrote: > Introduce a wal memory buffer which contains logged transactions. Wal > writes all rows into the memory buffer and then flushes new data to disk. > Wal memory consist of rotated pairs of xrow header array and encoded xrow > data buffer. > --- > src/box/CMakeLists.txt | 1 + > src/box/wal.c | 38 ++++-- > src/box/wal_mem.c | 273 +++++++++++++++++++++++++++++++++++++++++ > src/box/wal_mem.h | 166 +++++++++++++++++++++++++ > src/box/xlog.c | 77 +++++++++--- > src/box/xlog.h | 9 ++ > 6 files changed, 536 insertions(+), 28 deletions(-) > create mode 100644 src/box/wal_mem.c > create mode 100644 src/box/wal_mem.h > > diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt > index 9bba37bcb..bd31b07df 100644 > --- a/src/box/CMakeLists.txt > +++ b/src/box/CMakeLists.txt > @@ -125,6 +125,7 @@ add_library(box STATIC > bind.c > execute.c > wal.c > + wal_mem.c > call.c > merger.c > ${lua_sources} > diff --git a/src/box/wal.c b/src/box/wal.c > index a09ab7187..6cdb0db15 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -44,6 +44,7 @@ > #include "coio_task.h" > #include "replication.h" > #include "gc.h" > +#include "wal_mem.h" > > enum { > /** > @@ -156,6 +157,8 @@ struct wal_writer > * Used for replication relays. > */ > struct rlist watchers; > + /** Wal memory buffer. */ > + struct wal_mem wal_mem; > }; > > enum wal_msg_type { > @@ -936,6 +939,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, > int64_t tsn = 0; > /** Assign LSN to all local rows. */ > for ( ; row < end; row++) { > + (*row)->tm = ev_now(loop()); > if ((*row)->replica_id == 0) { > (*row)->lsn = vclock_inc(vclock_diff, instance_id) + > vclock_get(base, instance_id); > @@ -1027,25 +1031,37 @@ wal_write_batch(struct wal_writer *writer, struct wal_msg *wal_msg) > int rc; > struct journal_entry *entry; > struct stailq_entry *last_committed = NULL; > + wal_mem_svp(&writer->wal_mem, &writer->vclock); > stailq_foreach_entry(entry, &wal_msg->commit, fifo) { > wal_assign_lsn(&vclock_diff, &writer->vclock, > entry->rows, entry->rows + entry->n_rows); > entry->res = vclock_sum(&vclock_diff) + > vclock_sum(&writer->vclock); > - rc = xlog_write_entry(l, entry); > - if (rc < 0) > + if (wal_mem_write(&writer->wal_mem, entry->rows, > + entry->rows + entry->n_rows) < 0) { > + wal_mem_svp_reset(&writer->wal_mem); > goto done; > - if (rc > 0) { > - writer->checkpoint_wal_size += rc; > - last_committed = &entry->fifo; > - vclock_merge(&writer->vclock, &vclock_diff); > } > - /* rc == 0: the write is buffered in xlog_tx */ > } > - rc = xlog_flush(l); > - if (rc < 0) > - goto done; > > + struct iovec iov[SMALL_OBUF_IOV_MAX]; You implicitly assume that SMALL_OBUF_IOV_MAX should be enough to store a transaction. This looks error-prone. Please rework the API somehow, e.g. allocate iov on a region. > + int iovcnt; > + iovcnt = wal_mem_svp_data(&writer->wal_mem, iov); For obuf, ibuf, region, one can create as many svp as he likes while here it isn't a case. I think we should either make svp self-sufficient or rename those methods to something like _tx_begin/commit. > + xlog_tx_begin(l); > + if (xlog_write_iov(l, iov, iovcnt, > + wal_mem_svp_row_count(&writer->wal_mem)) < 0) { > + xlog_tx_rollback(l); > + wal_mem_svp_reset(&writer->wal_mem); > + goto done; > + } > + rc = xlog_tx_commit(l); Before this patch we wrapped every tx transaction in xlog_tx_begin/commit. Now we wrap the whole wal transaction instead. Why change that? > + if (rc == 0) > + /* Data is buffered but not yet flushed. */ > + rc = xlog_flush(l); > + if (rc < 0) { > + wal_mem_svp_reset(&writer->wal_mem); > + goto done; > + } > writer->checkpoint_wal_size += rc; > last_committed = stailq_last(&wal_msg->commit); > vclock_merge(&writer->vclock, &vclock_diff); > @@ -1147,6 +1163,7 @@ wal_cord_f(va_list ap) > { > (void) ap; > struct wal_writer *writer = &wal_writer_singleton; > + wal_mem_create(&writer->wal_mem); > > /** Initialize eio in this thread */ > coio_enable(); > @@ -1195,6 +1212,7 @@ wal_cord_f(va_list ap) > xlog_close(&vy_log_writer.xlog, false); > > cpipe_destroy(&writer->tx_prio_pipe); > + wal_mem_destroy(&writer->wal_mem); > return 0; > } > > diff --git a/src/box/wal_mem.c b/src/box/wal_mem.c > new file mode 100644 > index 000000000..fdfc6f93d > --- /dev/null > +++ b/src/box/wal_mem.c > @@ -0,0 +1,273 @@ > +/* > + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. > + * > + * Redistribution and use in source and binary forms, with or > + * without modification, are permitted provided that the following > + * conditions are met: > + * > + * 1. Redistributions of source code must retain the above > + * copyright notice, this list of conditions and the > + * following disclaimer. > + * > + * 2. Redistributions in binary form must reproduce the above > + * copyright notice, this list of conditions and the following > + * disclaimer in the documentation and/or other materials > + * provided with the distribution. > + * > + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED > + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL > + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, > + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR > + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF > + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF > + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF > + * SUCH DAMAGE. > + */ > + > +#include "wal_mem.h" > + > +#include "fiber.h" > +#include "errinj.h" > + > +enum { > + /* Initial size for rows storage. */ > + WAL_MEM_BUF_INITIAL_ROW_COUNT = 4096, > + /* Initial size for data storage. */ > + WAL_MEM_BUF_INITIAL_DATA_SIZE = 65536, > + /* How many rows we will place in one buffer. */ > + WAL_MEM_BUF_ROWS_LIMIT = 8192, > + /* How many data we will place in one buffer. */ > + WAL_MEM_BUF_DATA_LIMIT = 1 << 19, > +}; > + > +void > +wal_mem_create(struct wal_mem *wal_mem) I think we should name this object in such a way that it doesn't imply WAL. AFAICS it's just a ring buffer for storing rows so we should name it appropriately. May be, we should allow to customize all those options while creating a new object. Also, it would be nice to have a unit test for this object. > diff --git a/src/box/wal_mem.h b/src/box/wal_mem.h > new file mode 100644 > index 000000000..d26d00157 > --- /dev/null > +++ b/src/box/wal_mem.h > @@ -0,0 +1,166 @@ > +#ifndef TARANTOOL_WAL_MEM_H_INCLUDED > +#define TARANTOOL_WAL_MEM_H_INCLUDED > +/* > + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. > + * > + * Redistribution and use in source and binary forms, with or > + * without modification, are permitted provided that the following > + * conditions are met: > + * > + * 1. Redistributions of source code must retain the above > + * copyright notice, this list of conditions and the > + * following disclaimer. > + * > + * 2. Redistributions in binary form must reproduce the above > + * copyright notice, this list of conditions and the following > + * disclaimer in the documentation and/or other materials > + * provided with the distribution. > + * > + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED > + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL > + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, > + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR > + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF > + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF > + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF > + * SUCH DAMAGE. > + */ > +#include > + > +#include "small/ibuf.h" > +#include "small/obuf.h" > +#include "xrow.h" > +#include "vclock.h" > + > +enum { > + /* > + * Wal memory object contains some count of rotating data buffers. > + * Estimated decrease in amount of stored row is about > + * 1/(COUNT OF BUFFERS). However the bigger value makes rotation > + * more frequent, the decrease would be smoother and size of > + * a wal memory more stable. > + */ > + WAL_MEM_BUF_COUNT = 8, > +}; > + > +/* > + * A wal memory row descriptor which contains decoded xrow header and > + * encoded data pointer and size. > + */ > +struct wal_mem_buf_row { > + /* Decoded xrow header. */ > + struct xrow_header xrow; > + /* Pointer to the xrow encoded raw data. */ > + void *data; > + /* xrow raw data size. */ > + size_t size; > +}; > + > +/* > + * Wal memory data buffer which contains > + * a vclock just before the first contained row, > + * an ibuf with row descriptors > + * an obuf with encoded data > + */ > +struct wal_mem_buf { > + /* vclock just before the first row. */ > + struct vclock vclock; > + /* A row descriptor array. */ > + struct ibuf rows; > + /* Data storage for encoded row data. */ > + struct obuf data; > +}; > + > +/* > + * Wal memory contains WAL_MEM_BUF_COUNT wal memory buffers which are > + * organized in a ring. In order to track Wal memory tracks the first and > + * the last used buffers indexes (generation) and those indexes are not wrapped > + * around the ring. Each rotation increases the last buffer index and > + * each buffer discard increases the first buffer index. To evaluate effective > + * index in an wal memory array a modulo operation (or mask) should be used. > + */ > +struct wal_mem { > + /* An index of the first used buffer. */ > + uint64_t first_buf_index; > + /* An index of the last used buffer. */ > + uint64_t last_buf_index; > + /* A memory buffer array. */ > + struct wal_mem_buf buf[WAL_MEM_BUF_COUNT]; > + /* The first row index written in the current transaction. */ > + uint32_t tx_first_row_index; > + /* The first row data svp written in the current transaction. */ > + struct obuf_svp tx_first_row_svp; > +}; > + > +/* Create a wal memory. */ > +void > +wal_mem_create(struct wal_mem *wal_mem); > + > +/* Destroy wal memory structure. */ > +void > +wal_mem_destroy(struct wal_mem *wal_mem); > + > +/* > + * Rotate a wal memory if required and save the current wal memory write > + * position. > + */ > +void > +wal_mem_svp(struct wal_mem *wal_mem, struct vclock *vclock); > + > +/* Retrieve data after last svp. */ > +int > +wal_mem_svp_data(struct wal_mem *wal_mem, struct iovec *iovec); > + > +/* Truncate all the data written after the last svp. */ > +void > +wal_mem_svp_reset(struct wal_mem *wal_mem); > + > +/* Count of rows written since the last svp. */ > +static inline int > +wal_mem_svp_row_count(struct wal_mem *wal_mem) > +{ > + struct wal_mem_buf *mem_buf = wal_mem->buf + > + wal_mem->last_buf_index % WAL_MEM_BUF_COUNT; > + return ibuf_used(&mem_buf->rows) / sizeof(struct wal_mem_buf_row) - > + wal_mem->tx_first_row_index; > +} > + > +/* > + * Append xrow array to a wal memory. The array is placed into one > + * wal memory buffer and each row takes a continuous space in a data buffer. > + * continuously. > + * Return > + * 0 for Ok > + * -1 in case of error > + */ > +int > +wal_mem_write(struct wal_mem *wal_mem, struct xrow_header **begin, > + struct xrow_header **end); > + > +/* Wal memory cursor to track a position in a wal memory. */ > +struct wal_mem_cursor { > + /* Current memory buffer index. */ > + uint64_t buf_index; > + /* Current row index. */ > + uint32_t row_index; > +}; > + > +/* Create a wal memory cursor from the wal memory current position. */ > +int > +wal_mem_cursor_create(struct wal_mem *wal_mem, > + struct wal_mem_cursor *wal_mem_cursor, > + struct vclock *vclock); > + > +int > +wal_mem_cursor_next(struct wal_mem *wal_mem, > + struct wal_mem_cursor *wal_mem_cursor, > + struct xrow_header **row, > + void **data, > + size_t *size); What is returned on EOF? What happens if the buffer is rotated while there's a cursor open for it? A comment would be helpful here.