From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp33.i.mail.ru (smtp33.i.mail.ru [94.100.177.93]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 390F64696C8 for ; Wed, 12 Feb 2020 12:39:28 +0300 (MSK) From: Georgy Kirichenko Date: Wed, 12 Feb 2020 12:39:18 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v4 09/11] wal: xrow memory buffer and cursor List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org * xrow buffer structure Introduce a xrow buffer which stores encoded xrows in a memory after transaction was finished. Wal uses an xrow buffer object in order to encode transactions and then writes encoded data to a log file. Xrow buffer consist of not more than XROW_BUF_CHUNK_COUNT rotating chunks organized in a ring. Rotation thresholds and XROW_BUF_CHUNK_COUNT are the empiric values now. * xrow buffer cursor This structure allows to find a xrow buffer row with vclock less than given one and then fetch row by row from the xrow forwards to the last appended row. A xrow buffer cursor is essential to allow the from memory replication and will be used by a relay to fetch all logged rows, stored in a wal memory (implemented as xrow buffer) from given position and then follow all new changes. Part of #3974 #980 --- src/box/CMakeLists.txt | 1 + src/box/xrow_buf.c | 374 +++++++++++++++++++++++++++++++++++++++++ src/box/xrow_buf.h | 198 ++++++++++++++++++++++ 3 files changed, 573 insertions(+) create mode 100644 src/box/xrow_buf.c create mode 100644 src/box/xrow_buf.h diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index 32f922dd7..303ad9c6e 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -137,6 +137,7 @@ add_library(box STATIC sql_stmt_cache.c wal.c mclock.c + xrow_buf.c call.c merger.c ${lua_sources} diff --git a/src/box/xrow_buf.c b/src/box/xrow_buf.c new file mode 100644 index 000000000..4c3c87fcf --- /dev/null +++ b/src/box/xrow_buf.c @@ -0,0 +1,374 @@ +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include "xrow_buf.h" +#include "fiber.h" + +/* Xrow buffer chunk options (empirical values). */ +enum { + /* Chunk row info array capacity increment */ + XROW_BUF_CHUNK_CAPACITY_INCREMENT = 16384, + /* Initial size for raw data storage. */ + XROW_BUF_CHUNK_INITIAL_DATA_SIZE = 65536, + /* How many rows we will place in one buffer. */ + XROW_BUF_CHUNK_ROW_COUNT_THRESHOLD = 8192, + /* How many data we will place in one buffer. */ + XROW_BUF_CHUNK_DATA_SIZE_THRESHOLD = 1 << 19, +}; + + +/* + * Save the current xrow buffer chunk state which consists of + * two values index and position where the next row header + * and raw data would be placed. This state is used to track + * the next transaction starting boundary. + */ +static inline void +xrow_buf_save_state(struct xrow_buf *xrow_buf) +{ + struct xrow_buf_chunk *chunk = xrow_buf->chunk + + xrow_buf->last_chunk_index % XROW_BUF_CHUNK_COUNT; + /* Save the current xrow buffer state. */ + xrow_buf->tx_first_row_index = chunk->row_count; + xrow_buf->tx_first_row_svp = obuf_create_svp(&chunk->data); +} + +void +xrow_buf_create(struct xrow_buf *xrow_buf) +{ + for (int i = 0; i < XROW_BUF_CHUNK_COUNT; ++i) { + xrow_buf->chunk[i].row_info = NULL; + xrow_buf->chunk[i].row_info_capacity = 0; + xrow_buf->chunk[i].row_count = 0; + obuf_create(&xrow_buf->chunk[i].data, &cord()->slabc, + XROW_BUF_CHUNK_INITIAL_DATA_SIZE); + } + xrow_buf->last_chunk_index = 0; + xrow_buf->first_chunk_index = 0; + xrow_buf_save_state(xrow_buf); +} + +void +xrow_buf_destroy(struct xrow_buf *xrow_buf) +{ + for (int i = 0; i < XROW_BUF_CHUNK_COUNT; ++i) { + if (xrow_buf->chunk[i].row_info_capacity > 0) + slab_put(&cord()->slabc, + slab_from_data(xrow_buf->chunk[i].row_info)); + obuf_destroy(&xrow_buf->chunk[i].data); + } +} + +/* + * If the current chunk data limits are reached then this function + * switches a xrow buffer to the next chunk. If there is no free + * chunks in a xrow_buffer ring then the oldest one is going + * to be truncated, after truncation it is going to be reused in + * order to store new data. + */ +static struct xrow_buf_chunk * +xrow_buf_rotate(struct xrow_buf *xrow_buf) +{ + struct xrow_buf_chunk *chunk = xrow_buf->chunk + + xrow_buf->last_chunk_index % XROW_BUF_CHUNK_COUNT; + /* Check if the current chunk could accept new data. */ + if (chunk->row_count < XROW_BUF_CHUNK_ROW_COUNT_THRESHOLD && + obuf_size(&chunk->data) < XROW_BUF_CHUNK_DATA_SIZE_THRESHOLD) + return chunk; + + /* + * Increase the last chunk index and fetch + * corresponding chunk from the ring buffer. + */ + ++xrow_buf->last_chunk_index; + chunk = xrow_buf->chunk + xrow_buf->last_chunk_index % + XROW_BUF_CHUNK_COUNT; + /* + * Check if the next chunk has data and discard + * the data if required. + */ + if (xrow_buf->last_chunk_index - xrow_buf->first_chunk_index >= + XROW_BUF_CHUNK_COUNT) { + chunk->row_count = 0; + obuf_reset(&chunk->data); + ++xrow_buf->first_chunk_index; + } + /* + * The xrow_buffer current chunk was changed so update + * the xrow buffer state. + */ + xrow_buf_save_state(xrow_buf); + return chunk; +} + +void +xrow_buf_tx_begin(struct xrow_buf *xrow_buf, const struct vclock *vclock) +{ + /* + * Xrow buffer places a transaction in one chunk and does not + * chunk rotation while transaction is in progress. So check + * current chunk thresholds and rotate if required. + */ + struct xrow_buf_chunk *chunk = xrow_buf_rotate(xrow_buf); + /* + * Check if the current chunk is empty and a vclock for + * the chunk should be set. + */ + if (chunk->row_count == 0) + vclock_copy(&chunk->vclock, vclock); +} + +void +xrow_buf_tx_rollback(struct xrow_buf *xrow_buf) +{ + struct xrow_buf_chunk *chunk = xrow_buf->chunk + + xrow_buf->last_chunk_index % XROW_BUF_CHUNK_COUNT; + chunk->row_count = xrow_buf->tx_first_row_index; + obuf_rollback_to_svp(&chunk->data, &xrow_buf->tx_first_row_svp); +} + +void +xrow_buf_tx_commit(struct xrow_buf *xrow_buf) +{ + /* Save the current xrow buffer state. */ + xrow_buf_save_state(xrow_buf); +} + +int +xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin, + struct xrow_header **end, struct iovec **iovec) +{ + struct xrow_buf_chunk *chunk = xrow_buf->chunk + + xrow_buf->last_chunk_index % XROW_BUF_CHUNK_COUNT; + + /* Save a data buffer svp to restore the buffer in case of an error. */ + struct obuf_svp data_svp = obuf_create_svp(&chunk->data); + + size_t row_count = chunk->row_count + (end - begin); + /* Allocate space for new row information members if required. */ + if (row_count > chunk->row_info_capacity) { + /* Round allocation up to XROW_BUF_CHUNK_CAPACITY_INCREMENT. */ + uint32_t capacity = XROW_BUF_CHUNK_CAPACITY_INCREMENT * + ((row_count + + XROW_BUF_CHUNK_CAPACITY_INCREMENT - 1) / + XROW_BUF_CHUNK_CAPACITY_INCREMENT); + + struct slab *row_info_slab = + slab_get(&cord()->slabc, + sizeof(struct xrow_buf_row_info) * capacity); + if (row_info_slab == NULL) { + diag_set(OutOfMemory, capacity * + sizeof(struct xrow_buf_row_info), + "region", "row info array"); + goto error; + } + if (chunk->row_info_capacity > 0) { + memcpy(slab_data(row_info_slab), chunk->row_info, + sizeof(struct xrow_buf_row_info) * + chunk->row_count); + slab_put(&cord()->slabc, + slab_from_data(chunk->row_info)); + } + chunk->row_info = slab_data(row_info_slab); + chunk->row_info_capacity = capacity; + } + + /* Encode rows. */ + for (struct xrow_header **row = begin; row < end; ++row) { + /* Reserve space for raw encoded data. */ + char *data = obuf_reserve(&chunk->data, xrow_approx_len(*row)); + if (data == NULL) { + diag_set(OutOfMemory, xrow_approx_len(*row), + "region", "wal memory data"); + goto error; + } + + /* + * Xrow header itself is going to be encoded onto a gc + * memory region and the first member of a resulting + * iovec points to this data. Row bodies are going + * to be attached to the resulting iovec consequently. + */ + struct iovec iov[XROW_BODY_IOVMAX]; + int iov_cnt = xrow_header_encode(*row, 0, iov, 0); + if (iov_cnt < 0) + goto error; + + /* + * Now we have xrow header encoded representation + * so place it onto chunk data buffer starting + * from xrow header and then bodies. + */ + data = obuf_alloc(&chunk->data, iov[0].iov_len); + memcpy(data, iov[0].iov_base, iov[0].iov_len); + /* + * Initialize row info from xrow header and + * the row header encoded data location. + */ + struct xrow_buf_row_info *row_info = + chunk->row_info + chunk->row_count + (row - begin); + row_info->xrow = **row; + row_info->data = data; + row_info->size = iov[0].iov_len; + + for (int i = 1; i < iov_cnt; ++i) { + data = obuf_alloc(&chunk->data, iov[i].iov_len); + memcpy(data, iov[i].iov_base, iov[i].iov_len); + /* + * Adjust stored row body location as we just + * copied it to a chunk data buffer. + */ + row_info->xrow.body[i - 1].iov_base = data; + row_info->size += iov[i].iov_len; + } + } + + /* Return an iovec which points to the encoded data. */ + int iov_cnt = 1 + obuf_iovcnt(&chunk->data) - data_svp.pos; + *iovec = region_alloc(&fiber()->gc, sizeof(struct iovec) * iov_cnt); + if (*iovec == NULL) { + diag_set(OutOfMemory, sizeof(struct iovec) * iov_cnt, + "region", "xrow_buf iovec"); + goto error; + } + memcpy(*iovec, chunk->data.iov + data_svp.pos, + sizeof(struct iovec) * iov_cnt); + /* Adjust first iovec member to data starting location. */ + (*iovec)[0].iov_base += data_svp.iov_len; + (*iovec)[0].iov_len -= data_svp.iov_len; + + /* Update chunk row count. */ + chunk->row_count = row_count; + return iov_cnt; + +error: + /* Restore data buffer state. */ + obuf_rollback_to_svp(&chunk->data, &data_svp); + return -1; +} + +/* + * Returns an index of the first row after given vclock + * in a chunk. + */ +static int +xrow_buf_chunk_locate_vclock(struct xrow_buf_chunk *chunk, + struct vclock *vclock) +{ + for (uint32_t row_index = 0; row_index < chunk->row_count; + ++row_index) { + struct xrow_header *row = &chunk->row_info[row_index].xrow; + if (vclock_get(vclock, row->replica_id) < row->lsn) + return row_index; + } + /* + * We did not find any row with vclock not less than + * given one so return an index just after the last one. + */ + return chunk->row_count; +} + +int +xrow_buf_cursor_create(struct xrow_buf *xrow_buf, + struct xrow_buf_cursor *xrow_buf_cursor, + struct vclock *vclock) +{ + /* Check if a buffer has requested data. */ + struct xrow_buf_chunk *chunk = + xrow_buf->chunk + xrow_buf->first_chunk_index % + XROW_BUF_CHUNK_COUNT; + int rc = vclock_compare(&chunk->vclock, vclock); + if (rc > 0 || rc == VCLOCK_ORDER_UNDEFINED) { + /* The requested data was discarded. */ + return -1; + } + uint32_t index = xrow_buf->first_chunk_index; + while (index < xrow_buf->last_chunk_index) { + chunk = xrow_buf->chunk + (index + 1) % XROW_BUF_CHUNK_COUNT; + int rc = vclock_compare(&chunk->vclock, vclock); + if (rc > 0 || rc == VCLOCK_ORDER_UNDEFINED) { + /* Next chunk has younger rows than requested vclock. */ + break; + } + ++index; + } + chunk = xrow_buf->chunk + (index) % XROW_BUF_CHUNK_COUNT; + xrow_buf_cursor->chunk_index = index; + xrow_buf_cursor->row_index = xrow_buf_chunk_locate_vclock(chunk, vclock); + return 0; +} + +int +xrow_buf_cursor_next(struct xrow_buf *xrow_buf, + struct xrow_buf_cursor *xrow_buf_cursor, + struct xrow_header **row, void **data, size_t *size) +{ + if (xrow_buf->first_chunk_index > xrow_buf_cursor->chunk_index) { + /* A cursor current chunk was discarded by a buffer. */ + return -1; + } + + struct xrow_buf_chunk *chunk; +next_chunk: + chunk = xrow_buf->chunk + xrow_buf_cursor->chunk_index % + XROW_BUF_CHUNK_COUNT; + size_t chunk_row_count = chunk->row_count; + if (chunk_row_count == xrow_buf_cursor->row_index) { + /* + * No more rows in a buffer but there are two + * possibilities: + * 1. A cursor current chunk is the last one and there is + * no more rows in the cursor. + * 2. There is a chunk after the current one + * so we can switch to it. + * */ + if (xrow_buf->last_chunk_index == + xrow_buf_cursor->chunk_index) { + /* + * The current chunk is the last one - + * no more rows in a buffer. + */ + return 1; + } + /* Switch to the next chunk. */ + xrow_buf_cursor->row_index = 0; + ++xrow_buf_cursor->chunk_index; + goto next_chunk; + } + /* Return row header and data pointers and data size. */ + struct xrow_buf_row_info *row_info = chunk->row_info + + xrow_buf_cursor->row_index; + *row = &row_info->xrow; + *data = row_info->data; + *size = row_info->size; + ++xrow_buf_cursor->row_index; + return 0; +} diff --git a/src/box/xrow_buf.h b/src/box/xrow_buf.h new file mode 100644 index 000000000..c5f624d45 --- /dev/null +++ b/src/box/xrow_buf.h @@ -0,0 +1,198 @@ +#ifndef TARANTOOL_XROW_BUF_H_INCLUDED +#define TARANTOOL_XROW_BUF_H_INCLUDED +/* + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include + +#include "small/obuf.h" +#include "small/rlist.h" +#include "xrow.h" +#include "vclock.h" + +enum { + /* + * Xrow buffer contains some count of rotating data chunks. + * Every rotation has an estimated decrease in amount of + * stored rows at 1/(COUNT OF CHUNKS). However the bigger + * value makes rotation more frequent, the decrease would be + * smoother and size of a xrow buffer more stable. + */ + XROW_BUF_CHUNK_COUNT = 16, +}; + +/** + * Xrow_info structure used to describe a row stored in a xrow + * buffer. Xrow info contains an xrow_header structure, pointer + * and size of the row_header encoded representation. Xrow header + * allows to filter rows by replica_id, lsn or replication group + * while encoded representation could be used to write xrow + * without any further encoding. + */ +struct xrow_buf_row_info { + /** Stored row header. */ + struct xrow_header xrow; + /** Pointer to row encoded raw data. */ + void *data; + /** Row encoded raw data size. */ + size_t size; +}; + +/** + * Xrow buffer data chunk structure is used to store a continuous + * sequence of xrow headers written to a xrow buffer. Xrow buffer data + * chunk contains a vclock of the last row just before the first row + * stored in the chunk, count of rows, its encoded raw data, and array of + * stored row info. Vclock is used to track stored vclock lower boundary. + */ +struct xrow_buf_chunk { + /** Vclock just before the first row in this chunk. */ + struct vclock vclock; + /** Count of stored rows. */ + size_t row_count; + /** Stored rows information array. */ + struct xrow_buf_row_info *row_info; + /** Capacity of stored rows information array. */ + size_t row_info_capacity; + /** Data storage for encoded rows data. */ + struct obuf data; +}; + +/** + * Xrow buffer enables to encode and store some continuous sequence + * of xrows (both headers and binary encoded representation). + * Storage organized as a range of globally indexed chunks. New rows + * are appended to the last one chunk (the youngest one). If the last + * chunk is already full then a new chunk will be used. Xrow_buffer + * maintains not more than XROW_BUF_CHUNK_COUNT chunks so when the buffer + * is already full then a first one chunk should be discarded before a + * new one could be used. All chunks are organized in a ring which is + * XROW_BUF_CHUNK_COUNT the size so a chunk in-ring index could be + * evaluated from the chunk global index with the modulo operation. + */ +struct xrow_buf { + /** Global index of the first used chunk (the oldest one). */ + size_t first_chunk_index; + /** Global index of the last used chunk (the youngest one). */ + size_t last_chunk_index; + /** Ring -array containing chunks . */ + struct xrow_buf_chunk chunk[XROW_BUF_CHUNK_COUNT]; + /** + * A xrow_buf transaction is recorded in one chunk only. + * When transaction is started current row count and data + * buffer svp from the current chunk (which is the last one) + * are remembered in order to be able to restore the chunk + * state in case of rollback. + */ + struct { + /** The current transaction first row index. */ + uint32_t tx_first_row_index; + /** The current transaction encoded data start svp. */ + struct obuf_svp tx_first_row_svp; + }; +}; + +/** Create a wal memory. */ +void +xrow_buf_create(struct xrow_buf *xrow_buf); + +/** Destroy wal memory structure. */ +void +xrow_buf_destroy(struct xrow_buf *xrow_buf); + +/** + * Begin a xrow buffer transaction. This function may rotate the + * last one data chunk and use the vclock parameter as a new chunk + * starting vclock. + */ +void +xrow_buf_tx_begin(struct xrow_buf *xrow_buf, const struct vclock *vclock); + +/** Discard all the data was written after the last transaction. */ +void +xrow_buf_tx_rollback(struct xrow_buf *xrow_buf); + +/** Commit a xrow buffer transaction. */ +void +xrow_buf_tx_commit(struct xrow_buf *xrow_buf); + +/** + * Append an xrow array to a wal memory. The array is placed into + * one xrow buffer data chunk and each row takes a continuous + * space in a data buffer. Raw encoded data is placed onto + * gc-allocated iovec array. + * + * @retval count of written iovec members for success + * @retval -1 in case of error + */ +int +xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin, + struct xrow_header **end, + struct iovec **iovec); + +/** + * Xrow buffer cursor used to search a position in a buffer + * and then fetch rows one by one from the postion toward the + * buffer last append row. + */ +struct xrow_buf_cursor { + /** Current chunk global index. */ + uint32_t chunk_index; + /** Row index in the current chunk. */ + uint32_t row_index; +}; + +/** + * Create a xrow buffer cursor and set it's position to + * the first row after passed vclock value. + * + * @retval 0 cursor was created + * @retval -1 if a vclock was discarded + */ +int +xrow_buf_cursor_create(struct xrow_buf *xrow_buf, + struct xrow_buf_cursor *xrow_buf_cursor, + struct vclock *vclock); + +/** + * Fetch next row from a xrow buffer cursor and return the row + * header and encoded data pointers as well as encoded data size + * in the corresponding parameters. + * + * @retval 0 in case of success + * @retval 1 if there is no more rows in a buffer + * @retval -1 if this cursor postion was discarded by xrow buffer + */ +int +xrow_buf_cursor_next(struct xrow_buf *xrow_buf, + struct xrow_buf_cursor *xrow_buf_cursor, + struct xrow_header **row, void **data, size_t *size); + +#endif /* TARANTOOL_XROW_BUF_H_INCLUDED */ -- 2.25.0