[tarantool-patches] [PATCH v3 2/4] wal: xrow buffer structure

Georgy Kirichenko georgy at tarantool.org
Wed Oct 9 19:45:44 MSK 2019


Introduce a xrow buffer to store encoded xrows in memory after
transaction was finished. Wal uses an xrow buffer object in
order to encode transactions and then writes encoded data
to a log file so encoded data still lives in memory for
some time after a transaction is finished and cleared by an engine.
Xrow buffer consist of not more than XROW_BUF_CHUNK_COUNT rotating
chunks organized in a ring. Rotation thresholds and
XROW_BUF_CHUNK_COUNT are empiric values and were hardcoded.

Part of #3794
---
 src/box/CMakeLists.txt |   1 +
 src/box/wal.c          |  47 ++++++-
 src/box/xlog.c         |  57 ++++++---
 src/box/xlog.h         |  16 ++-
 src/box/xrow_buf.c     | 285 +++++++++++++++++++++++++++++++++++++++++
 src/box/xrow_buf.h     | 160 +++++++++++++++++++++++
 6 files changed, 540 insertions(+), 26 deletions(-)
 create mode 100644 src/box/xrow_buf.c
 create mode 100644 src/box/xrow_buf.h

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 9bba37bcb..2dc48e6b5 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -125,6 +125,7 @@ add_library(box STATIC
     bind.c
     execute.c
     wal.c
+    xrow_buf.c
     call.c
     merger.c
     ${lua_sources}
diff --git a/src/box/wal.c b/src/box/wal.c
index 9219d6779..63e0c728d 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -43,6 +43,7 @@
 #include "cbus.h"
 #include "coio_task.h"
 #include "replication.h"
+#include "xrow_buf.h"
 
 enum {
 	/**
@@ -156,6 +157,14 @@ struct wal_writer
 	 * Used for replication relays.
 	 */
 	struct rlist watchers;
+	/**
+	 * In-memory WAl write buffer used to encode transaction rows and
+	 * write them to an xlog file. An in-memory buffer allows us to
+	 * preserve xrows after transaction processing was finished.
+	 * This buffer will be used by replication to fetch xrows from memory
+	 * without xlog files access.
+	 */
+	struct xrow_buf xrow_buf;
 };
 
 struct wal_msg {
@@ -933,6 +942,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 	int64_t tsn = 0;
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
+		(*row)->tm = ev_now(loop());
 		if ((*row)->replica_id == 0) {
 			(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
 				      vclock_get(base, instance_id);
@@ -1016,25 +1026,52 @@ wal_write_to_disk(struct cmsg *msg)
 	int rc;
 	struct journal_entry *entry;
 	struct stailq_entry *last_committed = NULL;
+	/* Start a wal memory buffer transaction. */
+	xrow_buf_tx_begin(&writer->xrow_buf, &writer->vclock);
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
 		wal_assign_lsn(&vclock_diff, &writer->vclock,
 			       entry->rows, entry->rows + entry->n_rows);
 		entry->res = vclock_sum(&vclock_diff) +
 			     vclock_sum(&writer->vclock);
-		rc = xlog_write_entry(l, entry);
-		if (rc < 0)
+		struct iovec *iov;
+		int iovcnt = xrow_buf_write(&writer->xrow_buf, entry->rows,
+					    entry->rows + entry->n_rows, &iov);
+		if (iovcnt < 0) {
+			xrow_buf_tx_rollback(&writer->xrow_buf);
+			goto done;
+		}
+		xlog_tx_begin(l);
+		if (xlog_write_iov(l, iov, iovcnt, entry->n_rows) < 0) {
+			xlog_tx_rollback(l);
+			xrow_buf_tx_rollback(&writer->xrow_buf);
+			goto done;
+		}
+		rc = xlog_tx_commit(l);
+		if (rc < 0) {
+			/* Failed write. */
+			xrow_buf_tx_rollback(&writer->xrow_buf);
 			goto done;
+		}
 		if (rc > 0) {
+			/*
+			 * Data flushed to disk, start a new memory
+			 * buffer transaction
+			 */
 			writer->checkpoint_wal_size += rc;
 			last_committed = &entry->fifo;
 			vclock_merge(&writer->vclock, &vclock_diff);
+			xrow_buf_tx_commit(&writer->xrow_buf);
+			xrow_buf_tx_begin(&writer->xrow_buf, &writer->vclock);
 		}
 		/* rc == 0: the write is buffered in xlog_tx */
 	}
+
 	rc = xlog_flush(l);
-	if (rc < 0)
+	if (rc < 0) {
+		xrow_buf_tx_rollback(&writer->xrow_buf);
 		goto done;
-
+	}
+	xrow_buf_tx_commit(&writer->xrow_buf);
 	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
 	vclock_merge(&writer->vclock, &vclock_diff);
@@ -1102,6 +1139,7 @@ wal_writer_f(va_list ap)
 {
 	(void) ap;
 	struct wal_writer *writer = &wal_writer_singleton;
+	xrow_buf_create(&writer->xrow_buf);
 
 	/** Initialize eio in this thread */
 	coio_enable();
@@ -1141,6 +1179,7 @@ wal_writer_f(va_list ap)
 		xlog_close(&vy_log_writer.xlog, false);
 
 	cpipe_destroy(&writer->tx_prio_pipe);
+	xrow_buf_destroy(&writer->xrow_buf);
 	return 0;
 }
 
diff --git a/src/box/xlog.c b/src/box/xlog.c
index 8254cce20..2fcf7f0df 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -1275,14 +1275,8 @@ xlog_tx_write(struct xlog *log)
 	return written;
 }
 
-/*
- * Add a row to a log and possibly flush the log.
- *
- * @retval  -1 error, check diag.
- * @retval >=0 the number of bytes written to buffer.
- */
-ssize_t
-xlog_write_row(struct xlog *log, const struct xrow_header *packet)
+static int
+xlog_write_prepare(struct xlog *log)
 {
 	/*
 	 * Automatically reserve space for a fixheader when adding
@@ -1296,17 +1290,20 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
 			return -1;
 		}
 	}
+	return 0;
+}
 
-	struct obuf_svp svp = obuf_create_svp(&log->obuf);
-	size_t page_offset = obuf_size(&log->obuf);
-	/** encode row into iovec */
-	struct iovec iov[XROW_IOVMAX];
-	/** don't write sync to the disk */
-	int iovcnt = xrow_header_encode(packet, 0, iov, 0);
-	if (iovcnt < 0) {
-		obuf_rollback_to_svp(&log->obuf, &svp);
+/*
+ * Write an xrow containing iovec to a xlog.
+ */
+ssize_t
+xlog_write_iov(struct xlog *log, struct iovec *iov, int iovcnt, int row_count)
+{
+	if (xlog_write_prepare(log) != 0)
 		return -1;
-	}
+
+	struct obuf_svp svp = obuf_create_svp(&log->obuf);
+	size_t old_size = obuf_size(&log->obuf);
 	for (int i = 0; i < iovcnt; ++i) {
 		struct errinj *inj = errinj(ERRINJ_WAL_WRITE_PARTIAL,
 					    ERRINJ_INT);
@@ -1325,16 +1322,34 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
 			return -1;
 		}
 	}
-	assert(iovcnt <= XROW_IOVMAX);
-	log->tx_rows++;
+	log->tx_rows += row_count;
 
-	size_t row_size = obuf_size(&log->obuf) - page_offset;
+	ssize_t written = obuf_size(&log->obuf) - old_size;
 	if (log->is_autocommit &&
 	    obuf_size(&log->obuf) >= XLOG_TX_AUTOCOMMIT_THRESHOLD &&
 	    xlog_tx_write(log) < 0)
 		return -1;
 
-	return row_size;
+	return written;
+}
+
+/*
+ * Add a row to a log and possibly flush the log.
+ *
+ * @retval  -1 error, check diag.
+ * @retval >=0 the number of bytes written to buffer.
+ */
+ssize_t
+xlog_write_row(struct xlog *log, const struct xrow_header *packet)
+{
+	/** encode row into an iovec */
+	struct iovec iov[XROW_IOVMAX];
+	/** don't write sync to the disk */
+	int iovcnt = xrow_header_encode(packet, 0, iov, 0);
+	if (iovcnt < 0)
+		return -1;
+	assert(iovcnt <= XROW_IOVMAX);
+	return xlog_write_iov(log, iov, iovcnt, 1);
 }
 
 /**
diff --git a/src/box/xlog.h b/src/box/xlog.h
index a48b05fc4..aa3c14d84 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -493,7 +493,7 @@ ssize_t
 xlog_fallocate(struct xlog *log, size_t size);
 
 /**
- * Write a row to xlog, 
+ * Write a row into a xlog,
  *
  * @retval count of writen bytes
  * @retval -1 for error
@@ -501,6 +501,20 @@ xlog_fallocate(struct xlog *log, size_t size);
 ssize_t
 xlog_write_row(struct xlog *log, const struct xrow_header *packet);
 
+/**
+ * Write an iov vector with rows into a xlog,
+ *
+ * @param xlog a xlog file to write into
+ * @param iov an iovec with encoded rows data
+ * @param iovcnt count of iovec members
+ * @param row_count count of encoded rows
+ *
+ * @retval count of writen bytes
+ * @retval -1 for error
+ */
+ssize_t
+xlog_write_iov(struct xlog *xlog, struct iovec *iov, int iovcnt, int row_count);
+
 /**
  * Prevent xlog row buffer offloading, should be use
  * at transaction start to write transaction in one xlog tx
diff --git a/src/box/xrow_buf.c b/src/box/xrow_buf.c
new file mode 100644
index 000000000..e4455e01a
--- /dev/null
+++ b/src/box/xrow_buf.c
@@ -0,0 +1,285 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "xrow_buf.h"
+#include "fiber.h"
+#include "errinj.h"
+
+/* Xrow buffer chunk options (empirical values). */
+enum {
+	/* Chunk row info array capacity increment */
+	XROW_BUF_CHUNK_CAPACITY_INCREMENT = 16384,
+	/* Initial size for raw data storage. */
+	XROW_BUF_CHUNK_INITIAL_DATA_SIZE = 65536,
+	/* How many rows we will place in one buffer. */
+	XROW_BUF_CHUNK_ROW_COUNT_THRESHOLD = 8192,
+	/* How many data we will place in one buffer. */
+	XROW_BUF_CHUNK_DATA_SIZE_THRESHOLD = 1 << 19,
+};
+
+
+/*
+ * Save the current xrow buffer chunk state wich consists of two
+ * values index and position where the next row header and raw data
+ * would be placed. This state is used to track the next
+ * transaction starting boundary.
+ */
+static inline void
+xrow_buf_save_state(struct xrow_buf *xrow_buf)
+{
+	struct xrow_buf_chunk *chunk = xrow_buf->chunk +
+		xrow_buf->last_chunk_index % XROW_BUF_CHUNK_COUNT;
+	/* Save the current xrow buffer state. */
+	xrow_buf->tx_first_row_index = chunk->row_count;
+	xrow_buf->tx_first_row_svp = obuf_create_svp(&chunk->data);
+}
+
+void
+xrow_buf_create(struct xrow_buf *xrow_buf)
+{
+	for (int i = 0; i < XROW_BUF_CHUNK_COUNT; ++i) {
+		xrow_buf->chunk[i].row_info = NULL;
+		xrow_buf->chunk[i].row_info_capacity = 0;
+		xrow_buf->chunk[i].row_count = 0;
+		obuf_create(&xrow_buf->chunk[i].data, &cord()->slabc,
+			    XROW_BUF_CHUNK_INITIAL_DATA_SIZE);
+	}
+	xrow_buf->last_chunk_index = 0;
+	xrow_buf->first_chunk_index = 0;
+	xrow_buf_save_state(xrow_buf);
+}
+
+void
+xrow_buf_destroy(struct xrow_buf *xrow_buf)
+{
+	for (int i = 0; i < XROW_BUF_CHUNK_COUNT; ++i) {
+		if (xrow_buf->chunk[i].row_info_capacity > 0)
+			slab_put(&cord()->slabc,
+				 slab_from_data(xrow_buf->chunk[i].row_info));
+		obuf_destroy(&xrow_buf->chunk[i].data);
+	}
+}
+
+/*
+ * If the current chunk data limits were reached then this function
+ * swithes a xrow buffer to the next chunk. If there is no free
+ * chunks in a xrow_buffer ring then the oldest one is going
+ * to be truncated, after truncate it will be reused to store new data.
+ */
+static struct xrow_buf_chunk *
+xrow_buf_rotate(struct xrow_buf *xrow_buf)
+{
+	struct xrow_buf_chunk *chunk = xrow_buf->chunk +
+		xrow_buf->last_chunk_index % XROW_BUF_CHUNK_COUNT;
+	/* Check if the current chunk could accept new data. */
+	if (chunk->row_count < XROW_BUF_CHUNK_ROW_COUNT_THRESHOLD &&
+	    obuf_size(&chunk->data) < XROW_BUF_CHUNK_DATA_SIZE_THRESHOLD)
+		return chunk;
+
+	/*
+	 * Increase the last chunk generation and fetch
+	 * corresponding chunk from the ring buffer.
+	 */
+	++xrow_buf->last_chunk_index;
+	chunk = xrow_buf->chunk + xrow_buf->last_chunk_index %
+				  XROW_BUF_CHUNK_COUNT;
+	/*
+	 * Check if the next chunk has data and discard
+	 * the data if required.
+	 */
+	if (xrow_buf->last_chunk_index - xrow_buf->first_chunk_index >=
+	    XROW_BUF_CHUNK_COUNT) {
+		chunk->row_count = 0;
+		obuf_reset(&chunk->data);
+		++xrow_buf->first_chunk_index;
+	}
+	/*
+	 * The xrow_buffer current chunk was changed so update
+	 * the xrow buffer state.
+	 */
+	xrow_buf_save_state(xrow_buf);
+	return chunk;
+}
+
+void
+xrow_buf_tx_begin(struct xrow_buf *xrow_buf, const struct vclock *vclock)
+{
+	/*
+	 * Xrow buffer fits a transaction in one chunk and does not
+	 * chunk rotation while transaction is in progress. So check
+	 * current chunk thresholds and rotate if required.
+	 */
+	struct xrow_buf_chunk *chunk = xrow_buf_rotate(xrow_buf);
+	/*
+	 * Check if the current chunk is empty and a vclock for
+	 * the chunk should be set.
+	 */
+	if (chunk->row_count == 0)
+		vclock_copy(&chunk->vclock, vclock);
+}
+
+void
+xrow_buf_tx_rollback(struct xrow_buf *xrow_buf)
+{
+	struct xrow_buf_chunk *chunk = xrow_buf->chunk +
+		xrow_buf->last_chunk_index % XROW_BUF_CHUNK_COUNT;
+	chunk->row_count = xrow_buf->tx_first_row_index;
+	obuf_rollback_to_svp(&chunk->data, &xrow_buf->tx_first_row_svp);
+}
+
+void
+xrow_buf_tx_commit(struct xrow_buf *xrow_buf)
+{
+	/* Save the current xrow buffer state. */
+	xrow_buf_save_state(xrow_buf);
+}
+
+int
+xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
+	       struct xrow_header **end,
+	       struct iovec **iovec)
+{
+	struct xrow_buf_chunk *chunk = xrow_buf->chunk +
+		xrow_buf->last_chunk_index % XROW_BUF_CHUNK_COUNT;
+
+	/* Save a data buffer svp to restore the buffer in case of an error. */
+	struct obuf_svp data_svp = obuf_create_svp(&chunk->data);
+
+	size_t row_count = chunk->row_count + (end - begin);
+	/* Allocate space for new row information members if required. */
+	if (row_count > chunk->row_info_capacity) {
+		/* Round allocation up to XROW_BUF_CHUNK_CAPACITY_INCREMENT. */
+		uint32_t capacity = XROW_BUF_CHUNK_CAPACITY_INCREMENT *
+				    ((row_count +
+				      XROW_BUF_CHUNK_CAPACITY_INCREMENT - 1) /
+				     XROW_BUF_CHUNK_CAPACITY_INCREMENT);
+
+		struct slab *row_info_slab =
+			slab_get(&cord()->slabc,
+				 sizeof(struct xrow_buf_row_info) * capacity);
+		if (row_info_slab == NULL) {
+			diag_set(OutOfMemory, capacity *
+					      sizeof(struct xrow_buf_row_info),
+				 "region", "row info array");
+			goto error;
+		}
+		if (chunk->row_info_capacity > 0) {
+			memcpy(slab_data(row_info_slab), chunk->row_info,
+			       sizeof(struct xrow_buf_row_info) *
+			       chunk->row_count);
+			slab_put(&cord()->slabc,
+				 slab_from_data(chunk->row_info));
+		}
+		chunk->row_info = slab_data(row_info_slab);
+		chunk->row_info_capacity = capacity;
+	}
+
+	/* Encode rows. */
+	for (struct xrow_header **row = begin; row < end; ++row) {
+		struct errinj *inj = errinj(ERRINJ_WAL_BREAK_LSN, ERRINJ_INT);
+		if (inj != NULL && inj->iparam == (*row)->lsn) {
+			(*row)->lsn = inj->iparam - 1;
+			say_warn("injected broken lsn: %lld",
+				 (long long) (*row)->lsn);
+		}
+
+		/* Reserve space for raw encoded data. */
+		char *data = obuf_reserve(&chunk->data, xrow_approx_len(*row));
+		if (data == NULL) {
+			diag_set(OutOfMemory, xrow_approx_len(*row),
+				 "region", "wal memory data");
+			goto error;
+		}
+
+		/*
+		 * Xrow header itself is going to be encoded onto a gc
+		 * memory region and the first member of a resulting
+		 * iovec points to this data. Row bodies are going
+		 * to be attached to the resulting iovec consequently.
+		 */
+		struct iovec iov[XROW_BODY_IOVMAX];
+		int iov_cnt = xrow_header_encode(*row, 0, iov, 0);
+		if (iov_cnt < 0)
+			goto error;
+
+		/*
+		 * Now we have xrow header encoded representation
+		 * so place it onto chunk data buffer starting
+		 * from xrow header and then bodies.
+		 */
+		data = obuf_alloc(&chunk->data, iov[0].iov_len);
+		memcpy(data, iov[0].iov_base, iov[0].iov_len);
+		/*
+		 * Initialize row info from xrow header and
+		 * the row header encoded data location.
+		 */
+		struct xrow_buf_row_info *row_info =
+			chunk->row_info + chunk->row_count + (row - begin);
+		row_info->xrow = **row;
+		row_info->data = data;
+		row_info->size = iov[0].iov_len;
+
+		for (int i = 1; i < iov_cnt; ++i) {
+			data = obuf_alloc(&chunk->data, iov[i].iov_len);
+			memcpy(data, iov[i].iov_base, iov[i].iov_len);
+			/*
+			 * Adjust stored row body location as we just
+			 * copied it to a chunk data buffer.
+			 */
+			row_info->xrow.body[i - 1].iov_base = data;
+			row_info->size += iov[i].iov_len;
+		}
+	}
+
+	/* Return an iovec which points to the encoded data. */
+	int iov_cnt = 1 + obuf_iovcnt(&chunk->data) - data_svp.pos;
+	*iovec = region_alloc(&fiber()->gc, sizeof(struct iovec) * iov_cnt);
+	if (*iovec == NULL) {
+		diag_set(OutOfMemory, sizeof(struct iovec) * iov_cnt,
+			 "region", "xrow_buf iovec");
+		goto error;
+	}
+	memcpy(*iovec, chunk->data.iov + data_svp.pos,
+	       sizeof(struct iovec) * iov_cnt);
+	/* Adjust first iovec member to data starting location. */
+	(*iovec)[0].iov_base += data_svp.iov_len;
+	(*iovec)[0].iov_len -= data_svp.iov_len;
+
+	/* Update chunk row count. */
+	chunk->row_count = row_count;
+	return iov_cnt;
+
+error:
+	/* Restore data buffer state. */
+	obuf_rollback_to_svp(&chunk->data, &data_svp);
+	return -1;
+}
+
diff --git a/src/box/xrow_buf.h b/src/box/xrow_buf.h
new file mode 100644
index 000000000..266cc0f76
--- /dev/null
+++ b/src/box/xrow_buf.h
@@ -0,0 +1,160 @@
+#ifndef TARANTOOL_XROW_BUF_H_INCLUDED
+#define TARANTOOL_XROW_BUF_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <stdint.h>
+
+#include "small/obuf.h"
+#include "small/rlist.h"
+#include "xrow.h"
+#include "vclock.h"
+
+enum {
+	/*8
+	 * Xrow buffer contains some count of rotating data chunks.
+	 * Every rotation has an estimated decrease in amount of
+	 * stored rows at 1/(COUNT OF CHUNKS). However the bigger
+	 * value makes rotation more frequent, the decrease would be
+	 * smoother and size of a xrow buffer more stable.
+	 */
+	XROW_BUF_CHUNK_COUNT = 16,
+};
+
+/**
+ * Xrow_info structure used to describe a row stored in a xrow
+ * buffer. Xrow info contains an xrow_header structure, pointer
+ * and size of the row_header encoded representation. Xrow header
+ * allows to filter rows by replica_id, lsn or replication group
+ * while encoded representation could be used to write xrow 
+ * without any further encoding.
+ */
+struct xrow_buf_row_info {
+	/** Stored row header. */
+	struct xrow_header xrow;
+	/** Pointer to row encoded raw data. */
+	void *data;
+	/** Row encoded raw data size. */
+	size_t size;
+};
+
+/**
+ * Xrow buffer data chunk structure is used to store a continuous
+ * sequence of xrow headers written to a xrow buffer. Xrow buffer data
+ * chunk contains a vclock of the last row just before the first row
+ * stored in the chunk, count of rows, its encoded raw data, and array of
+ * stored row info. Vclock is used to track stored vclock lower boundary.
+ */
+struct xrow_buf_chunk {
+	/** Vclock just before the first row in this chunk. */
+	struct vclock vclock;
+	/** Count of stored rows. */
+	size_t row_count;
+	/** Stored rows information array. */
+	struct xrow_buf_row_info *row_info;
+	/** Capacity of stored rows information array. */
+	size_t row_info_capacity;
+	/** Data storage for encoded rows data. */
+	struct obuf data;
+};
+
+/**
+ * Xrow buffer enables to encode and store some continuous sequence
+ * of xrows (both headers and binary encoded representation).
+ * Storage organized as a range of globally indexed chunks. New rows
+ * are appended to the last one chunk (the youngest one). If the last
+ * chunk is already full then a new chunk will be used. Xrow_buffer
+ * maintains not more than XROW_BUF_CHUNK_COUNT chunks so when the buffer
+ * is already full then a first one chunk should be discarded before a
+ * new one could be used. All chunks are organized in a ring which is
+ * XROW_BUF_CHUNK_COUNT the size so a chunk in-ring index could be
+ * evaluated from the chunk global index with the modulo operation.
+ */
+struct xrow_buf {
+	/** Global index of the first used chunk (the oldest one). */
+	size_t first_chunk_index;
+	/** Global index of the last used chunk (the youngest one). */
+	size_t last_chunk_index;
+	/** Ring -array containing chunks . */
+	struct xrow_buf_chunk chunk[XROW_BUF_CHUNK_COUNT];
+	/**
+	 * A xrow_buf transaction is recorded in one chunk only.
+	 * When transaction is started current row count and data
+	 * buffer svp from the current chunk (which is the last one)
+	 * are remembered in order to be able to restore the chunk
+	 * state in case of rollback.
+	 */
+	struct {
+		/** The current transaction first row index. */
+		uint32_t tx_first_row_index;
+		/** The current transaction encoded data start svp. */
+		struct obuf_svp tx_first_row_svp;
+	};
+};
+
+/** Create a wal memory. */
+void
+xrow_buf_create(struct xrow_buf *xrow_buf);
+
+/** Destroy wal memory structure. */
+void
+xrow_buf_destroy(struct xrow_buf *xrow_buf);
+
+/**
+ * Begin a xrow buffer transaction. This function may rotate the
+ * last one data chunk and use the vclock parameter as a new chunk
+ * starting vclock.
+ */
+void
+xrow_buf_tx_begin(struct xrow_buf *xrow_buf, const struct vclock *vclock);
+
+/** Discard all the data was written after the last transaction. */
+void
+xrow_buf_tx_rollback(struct xrow_buf *xrow_buf);
+
+/** Commit a xrow buffer transaction. */
+void
+xrow_buf_tx_commit(struct xrow_buf *xrow_buf);
+
+/**
+ * Append an xrow array to a wal memory. The array is placed into
+ * one xrow buffer data chunk and each row takes a continuous
+ * space in a data buffer. Raw encoded data is placed onto
+ * gc-allocated iovec array.
+ *
+ * @retval count of written iovec members for success
+ * @retval -1 in case of error
+ */
+int
+xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
+	       struct xrow_header **end,
+	       struct iovec **iovec);
+
+#endif /* TARANTOOL_XROW_BUF_H_INCLUDED */
-- 
2.23.0





More information about the Tarantool-patches mailing list