From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp54.i.mail.ru (smtp54.i.mail.ru [217.69.128.34]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id AB6BD43D678 for ; Tue, 22 Oct 2019 02:01:23 +0300 (MSK) References: From: Vladislav Shpilevoy Message-ID: Date: Tue, 22 Oct 2019 01:06:38 +0200 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 7bit Subject: Re: [Tarantool-patches] [tarantool-patches] [PATCH v3 2/4] wal: xrow buffer structure List-Id: Tarantool development List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@freelists.org, Georgy Kirichenko , tarantool-patches@dev.tarantool.org Thanks for the patch! See 20 comments below. On 09/10/2019 18:45, Georgy Kirichenko wrote: > Introduce a xrow buffer to store encoded xrows in memory after > transaction was finished. Wal uses an xrow buffer object in > order to encode transactions and then writes encoded data > to a log file so encoded data still lives in memory for > some time after a transaction is finished and cleared by an engine. > Xrow buffer consist of not more than XROW_BUF_CHUNK_COUNT rotating > chunks organized in a ring. Rotation thresholds and > XROW_BUF_CHUNK_COUNT are empiric values and were hardcoded. > > Part of #3794 > --- > src/box/CMakeLists.txt | 1 + > src/box/wal.c | 47 ++++++- > src/box/xlog.c | 57 ++++++--- > src/box/xlog.h | 16 ++- > src/box/xrow_buf.c | 285 +++++++++++++++++++++++++++++++++++++++++ > src/box/xrow_buf.h | 160 +++++++++++++++++++++++ > 6 files changed, 540 insertions(+), 26 deletions(-) > create mode 100644 src/box/xrow_buf.c > create mode 100644 src/box/xrow_buf.h > > diff --git a/src/box/wal.c b/src/box/wal.c > index 9219d6779..63e0c728d 100644 > --- a/src/box/wal.c > +++ b/src/box/wal.c > @@ -156,6 +157,14 @@ struct wal_writer > * Used for replication relays. > */ > struct rlist watchers; > + /** > + * In-memory WAl write buffer used to encode transaction rows and 1. Is 'l' a typo? Probably you meant 'L', a capital letter? > + * write them to an xlog file. An in-memory buffer allows us to > + * preserve xrows after transaction processing was finished. > + * This buffer will be used by replication to fetch xrows from memory > + * without xlog files access. > + */ > + struct xrow_buf xrow_buf; > }; > > struct wal_msg { > @@ -933,6 +942,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base, > int64_t tsn = 0; > /** Assign LSN to all local rows. */ > for ( ; row < end; row++) { > + (*row)->tm = ev_now(loop()); 2. Why do you need it? You said, that your patch is pure refactoring about keeping xlogs in memory for a while, but this looks like a functional change. > if ((*row)->replica_id == 0) { > (*row)->lsn = vclock_inc(vclock_diff, instance_id) + > vclock_get(base, instance_id); > @@ -1016,25 +1026,52 @@ wal_write_to_disk(struct cmsg *msg) > int rc; > struct journal_entry *entry; > struct stailq_entry *last_committed = NULL; > + /* Start a wal memory buffer transaction. */ > + xrow_buf_tx_begin(&writer->xrow_buf, &writer->vclock); > stailq_foreach_entry(entry, &wal_msg->commit, fifo) { > wal_assign_lsn(&vclock_diff, &writer->vclock, > entry->rows, entry->rows + entry->n_rows); > entry->res = vclock_sum(&vclock_diff) + > vclock_sum(&writer->vclock); > - rc = xlog_write_entry(l, entry); > - if (rc < 0) > + struct iovec *iov; > + int iovcnt = xrow_buf_write(&writer->xrow_buf, entry->rows, > + entry->rows + entry->n_rows, &iov); > + if (iovcnt < 0) { > + xrow_buf_tx_rollback(&writer->xrow_buf); > + goto done; > + } > + xlog_tx_begin(l); > + if (xlog_write_iov(l, iov, iovcnt, entry->n_rows) < 0) { > + xlog_tx_rollback(l); > + xrow_buf_tx_rollback(&writer->xrow_buf); > + goto done; > + } > + rc = xlog_tx_commit(l); 3. Sorry, Kostja is right (in not understanding how these transactions are related). These xlog/xbuf simultaneous begin/commit really confuse. Previously xlog transactions were encapsulated inside xlog_write_entry. Could you please do something similar here to hide them? For example, add a function xlog_write_iov_tx(), which would internally call xlog_tx_begin()/rollback(). It would really help. > + if (rc < 0) { > + /* Failed write. */ > + xrow_buf_tx_rollback(&writer->xrow_buf); > goto done; > + } > if (rc > 0) { > + /* > + * Data flushed to disk, start a new memory > + * buffer transaction > + */ > writer->checkpoint_wal_size += rc; > last_committed = &entry->fifo; > vclock_merge(&writer->vclock, &vclock_diff); > + xrow_buf_tx_commit(&writer->xrow_buf); > + xrow_buf_tx_begin(&writer->xrow_buf, &writer->vclock); > } > /* rc == 0: the write is buffered in xlog_tx */ > } > diff --git a/src/box/xlog.c b/src/box/xlog.c > index 8254cce20..2fcf7f0df 100644 > --- a/src/box/xlog.c > +++ b/src/box/xlog.c > @@ -1275,14 +1275,8 @@ xlog_tx_write(struct xlog *log) > return written; > } > > -/* > - * Add a row to a log and possibly flush the log. > - * > - * @retval -1 error, check diag. > - * @retval >=0 the number of bytes written to buffer. > - */ > -ssize_t > -xlog_write_row(struct xlog *log, const struct xrow_header *packet) > +static int > +xlog_write_prepare(struct xlog *log) 4. Why do you need this function alone? Looks like an artifact from the previous version, when it was used in two places. Now you can inline it back to xlog_write_iov(). > { > /* > * Automatically reserve space for a fixheader when adding > @@ -1296,17 +1290,20 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet) > return -1; > } > } > + return 0; > +} > > - struct obuf_svp svp = obuf_create_svp(&log->obuf); > - size_t page_offset = obuf_size(&log->obuf); > - /** encode row into iovec */ > - struct iovec iov[XROW_IOVMAX]; > - /** don't write sync to the disk */ > - int iovcnt = xrow_header_encode(packet, 0, iov, 0); > - if (iovcnt < 0) { > - obuf_rollback_to_svp(&log->obuf, &svp); > +/* 5. Please, use /** in comments out of function bodies. We use doxygen comment format, it is its syntax. Here and in all other places, and other commits. > + * Write an xrow containing iovec to a xlog. > + */ > +ssize_t > +xlog_write_iov(struct xlog *log, struct iovec *iov, int iovcnt, int row_count) > +{ > @@ -1325,16 +1322,34 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet) > +/* > + * Add a row to a log and possibly flush the log. > + * > + * @retval -1 error, check diag. > + * @retval >=0 the number of bytes written to buffer. > + */ > +ssize_t > +xlog_write_row(struct xlog *log, const struct xrow_header *packet) > +{ > + /** encode row into an iovec */ > + struct iovec iov[XROW_IOVMAX]; > + /** don't write sync to the disk */ 6. I understand, that this is copypaste from the old code, but please, use capital letters to start a sentence, and use /* comment starter inside function bodies. Here and in all other places, and other commits. > + int iovcnt = xrow_header_encode(packet, 0, iov, 0); > + if (iovcnt < 0) > + return -1; > + assert(iovcnt <= XROW_IOVMAX); > + return xlog_write_iov(log, iov, iovcnt, 1); > } > > /** > diff --git a/src/box/xlog.h b/src/box/xlog.h > index a48b05fc4..aa3c14d84 100644 > --- a/src/box/xlog.h > +++ b/src/box/xlog.h > @@ -493,7 +493,7 @@ ssize_t > xlog_fallocate(struct xlog *log, size_t size); > > /** > - * Write a row to xlog, > + * Write a row into a xlog, 7. Stray change. Please, discard. > * > * @retval count of writen bytes > * @retval -1 for error > diff --git a/src/box/xrow_buf.c b/src/box/xrow_buf.c > new file mode 100644 > index 000000000..e4455e01a > --- /dev/null > +++ b/src/box/xrow_buf.c > @@ -0,0 +1,285 @@ > +/* > + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. > + * > + * Redistribution and use in source and binary forms, with or > + * without modification, are permitted provided that the following > + * conditions are met: > + * > + * 1. Redistributions of source code must retain the above > + * copyright notice, this list of conditions and the > + * following disclaimer. > + * > + * 2. Redistributions in binary form must reproduce the above > + * copyright notice, this list of conditions and the following > + * disclaimer in the documentation and/or other materials > + * provided with the distribution. > + * > + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED > + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL > + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, > + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR > + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF > + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF > + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF > + * SUCH DAMAGE. > + */ > + > +#include "xrow_buf.h" > +#include "fiber.h" > +#include "errinj.h" > + > +/* Xrow buffer chunk options (empirical values). */ > +enum { > + /* Chunk row info array capacity increment */ > + XROW_BUF_CHUNK_CAPACITY_INCREMENT = 16384, > + /* Initial size for raw data storage. */ > + XROW_BUF_CHUNK_INITIAL_DATA_SIZE = 65536, > + /* How many rows we will place in one buffer. */ > + XROW_BUF_CHUNK_ROW_COUNT_THRESHOLD = 8192, > + /* How many data we will place in one buffer. */ > + XROW_BUF_CHUNK_DATA_SIZE_THRESHOLD = 1 << 19, > +}; > + > + > +/* > + * Save the current xrow buffer chunk state wich consists of two 8. wich -> which. > + * values index and position where the next row header and raw data > + * would be placed. This state is used to track the next > + * transaction starting boundary. > + */ > +static inline void > +xrow_buf_save_state(struct xrow_buf *xrow_buf) > +{ > + struct xrow_buf_chunk *chunk = xrow_buf->chunk + > + xrow_buf->last_chunk_index % XROW_BUF_CHUNK_COUNT; > + /* Save the current xrow buffer state. */ > + xrow_buf->tx_first_row_index = chunk->row_count; > + xrow_buf->tx_first_row_svp = obuf_create_svp(&chunk->data); > +} > + > +void > +xrow_buf_destroy(struct xrow_buf *xrow_buf) > +{ > + for (int i = 0; i < XROW_BUF_CHUNK_COUNT; ++i) { > + if (xrow_buf->chunk[i].row_info_capacity > 0) > + slab_put(&cord()->slabc, > + slab_from_data(xrow_buf->chunk[i].row_info)); 9. Please, use {} for code blocks consisting of more than 1 line. > + obuf_destroy(&xrow_buf->chunk[i].data); > + } > +} > + > +void > +xrow_buf_tx_begin(struct xrow_buf *xrow_buf, const struct vclock *vclock) > +{ > + /* > + * Xrow buffer fits a transaction in one chunk and does not 10. A verb is missing. 'Does not do chunk rotation', or 'Does not rotate a chunk'. > + * chunk rotation while transaction is in progress. So check > + * current chunk thresholds and rotate if required. > + */ > + struct xrow_buf_chunk *chunk = xrow_buf_rotate(xrow_buf); > + /* > + * Check if the current chunk is empty and a vclock for > + * the chunk should be set. > + */ > + if (chunk->row_count == 0) > + vclock_copy(&chunk->vclock, vclock); > +} > + > +int > +xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin, > + struct xrow_header **end, > + struct iovec **iovec) > +{ > + struct xrow_buf_chunk *chunk = xrow_buf->chunk + > + xrow_buf->last_chunk_index % XROW_BUF_CHUNK_COUNT; > + > + /* Save a data buffer svp to restore the buffer in case of an error. */ > + struct obuf_svp data_svp = obuf_create_svp(&chunk->data); > + > + size_t row_count = chunk->row_count + (end - begin); > + /* Allocate space for new row information members if required. */ > + if (row_count > chunk->row_info_capacity) { > + /* Round allocation up to XROW_BUF_CHUNK_CAPACITY_INCREMENT. */ > + uint32_t capacity = XROW_BUF_CHUNK_CAPACITY_INCREMENT * > + ((row_count + > + XROW_BUF_CHUNK_CAPACITY_INCREMENT - 1) / > + XROW_BUF_CHUNK_CAPACITY_INCREMENT); > + > + struct slab *row_info_slab = > + slab_get(&cord()->slabc, > + sizeof(struct xrow_buf_row_info) * capacity); > + if (row_info_slab == NULL) { > + diag_set(OutOfMemory, capacity * > + sizeof(struct xrow_buf_row_info), > + "region", "row info array"); > + goto error; > + } > + if (chunk->row_info_capacity > 0) { > + memcpy(slab_data(row_info_slab), chunk->row_info, > + sizeof(struct xrow_buf_row_info) * > + chunk->row_count); > + slab_put(&cord()->slabc, > + slab_from_data(chunk->row_info)); 11. Oh God, what is that? It is even more intricate and complex than the previous version with ibuf. Again - why not to use a normal struct xrow_buf_row_info* array on the simple heap? Have you done benchmarks showing that this slab struggling gives any perf increase? > + } > + chunk->row_info = slab_data(row_info_slab); > + chunk->row_info_capacity = capacity; > + } > diff --git a/src/box/xrow_buf.h b/src/box/xrow_buf.h > new file mode 100644 > index 000000000..266cc0f76 > --- /dev/null > +++ b/src/box/xrow_buf.h > @@ -0,0 +1,160 @@ > +#ifndef TARANTOOL_XROW_BUF_H_INCLUDED > +#define TARANTOOL_XROW_BUF_H_INCLUDED > +/* > + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. > + * > + * Redistribution and use in source and binary forms, with or > + * without modification, are permitted provided that the following > + * conditions are met: > + * > + * 1. Redistributions of source code must retain the above > + * copyright notice, this list of conditions and the > + * following disclaimer. > + * > + * 2. Redistributions in binary form must reproduce the above > + * copyright notice, this list of conditions and the following > + * disclaimer in the documentation and/or other materials > + * provided with the distribution. > + * > + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED > + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL > + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, > + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR > + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF > + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF > + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF > + * SUCH DAMAGE. > + */ > +#include > + > +#include "small/obuf.h" > +#include "small/rlist.h" 12. Why do you need rlist.h? > +#include "xrow.h" > +#include "vclock.h" > + > +enum { > + /*8 13. 8 -> *. > + * Xrow buffer contains some count of rotating data chunks. > + * Every rotation has an estimated decrease in amount of > + * stored rows at 1/(COUNT OF CHUNKS). However the bigger > + * value makes rotation more frequent, the decrease would be > + * smoother and size of a xrow buffer more stable. > + */ > + XROW_BUF_CHUNK_COUNT = 16, > +}; > + > +/** > + * Xrow_info structure used to describe a row stored in a xrow > + * buffer. Xrow info contains an xrow_header structure, pointer > + * and size of the row_header encoded representation. Xrow header > + * allows to filter rows by replica_id, lsn or replication group > + * while encoded representation could be used to write xrow 14. Leading whitespace in the end of the line above. > + * without any further encoding. > + */ > +struct xrow_buf_row_info { > + /** Stored row header. */ > + struct xrow_header xrow; > + /** Pointer to row encoded raw data. */ > + void *data; > + /** Row encoded raw data size. */ > + size_t size; > +}; > + > +/** > + * Xrow buffer data chunk structure is used to store a continuous > + * sequence of xrow headers written to a xrow buffer. Xrow buffer data > + * chunk contains a vclock of the last row just before the first row > + * stored in the chunk, count of rows, its encoded raw data, and array of 15. vclock of the last row just before the first row? Sorry, I didn't understand. Could you please rephrase? > + * stored row info. Vclock is used to track stored vclock lower boundary. > + */ > +struct xrow_buf_chunk { > + /** Vclock just before the first row in this chunk. */ > + struct vclock vclock; > + /** Count of stored rows. */ > + size_t row_count; > + /** Stored rows information array. */ > + struct xrow_buf_row_info *row_info; > + /** Capacity of stored rows information array. */ > + size_t row_info_capacity; > + /** Data storage for encoded rows data. */ > + struct obuf data; > +}; > + > +/** > + * Xrow buffer enables to encode and store some continuous sequence > + * of xrows (both headers and binary encoded representation). > + * Storage organized as a range of globally indexed chunks. New rows > + * are appended to the last one chunk (the youngest one). If the last > + * chunk is already full then a new chunk will be used. Xrow_buffer > + * maintains not more than XROW_BUF_CHUNK_COUNT chunks so when the buffer > + * is already full then a first one chunk should be discarded before a > + * new one could be used. All chunks are organized in a ring which is > + * XROW_BUF_CHUNK_COUNT the size so a chunk in-ring index could be > + * evaluated from the chunk global index with the modulo operation. > + */ > +struct xrow_buf { > + /** Global index of the first used chunk (the oldest one). */ > + size_t first_chunk_index; > + /** Global index of the last used chunk (the youngest one). */ > + size_t last_chunk_index; > + /** Ring -array containing chunks . */ 16. -array? Something is missing. > + struct xrow_buf_chunk chunk[XROW_BUF_CHUNK_COUNT]; > + /** > + * A xrow_buf transaction is recorded in one chunk only. > + * When transaction is started current row count and data > + * buffer svp from the current chunk (which is the last one) > + * are remembered in order to be able to restore the chunk > + * state in case of rollback. > + */ > + struct { 17. Why do you need these two members inside an anon struct? > + /** The current transaction first row index. */ > + uint32_t tx_first_row_index; 18. I would rather call it saved_row_count and saved_obuf. You never use `tx_first_row_index` as an index of anything. You always just assign it from row_count, and restore row_count from it back. In the next commits too. These two attributes are just a savepoint, and nothing more. I mean, take obuf_svp for example. It has 3 simple members, which are named the same as the original members of obuf: pos, iov_len, used. They are not named 'obuf_first_alloc_pos', 'obuf_first_iov_len', 'obuf_first_alloc_pos_used'. The simpler the name the easier it is understand what it stands for. For the savepoint just take the original names as is. Maybe with a simple prefix like 'saved_'. > + /** The current transaction encoded data start svp. */ > + struct obuf_svp tx_first_row_svp; > + }; > +}; > + > +/** Create a wal memory. */ > +void > +xrow_buf_create(struct xrow_buf *xrow_buf); > + > +/** Destroy wal memory structure. */ > +void > +xrow_buf_destroy(struct xrow_buf *xrow_buf); > + > +/** > + * Begin a xrow buffer transaction. This function may rotate the > + * last one data chunk and use the vclock parameter as a new chunk > + * starting vclock. > + */ > +void > +xrow_buf_tx_begin(struct xrow_buf *xrow_buf, const struct vclock *vclock); > + > +/** Discard all the data was written after the last transaction. */ 19. 'the data written', without 'was'. > +void > +xrow_buf_tx_rollback(struct xrow_buf *xrow_buf); > + > +/** Commit a xrow buffer transaction. */ > +void > +xrow_buf_tx_commit(struct xrow_buf *xrow_buf); > + > +/** > + * Append an xrow array to a wal memory. The array is placed into > + * one xrow buffer data chunk and each row takes a continuous > + * space in a data buffer. Raw encoded data is placed onto > + * gc-allocated iovec array. > + * > + * @retval count of written iovec members for success > + * @retval -1 in case of error > + */ > +int > +xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin, > + struct xrow_header **end, > + struct iovec **iovec); 20. `iovec` and `end` arguments fit in one line. > + > +#endif /* TARANTOOL_XROW_BUF_H_INCLUDED */ >