[tarantool-patches] [PATCH v2 2/4] wal: wal memory buffer

Georgy Kirichenko georgy at tarantool.org
Wed Sep 18 12:36:09 MSK 2019


Introduce a wal memory buffer to store logged transactions. The patch
changes wal writer behaviour: now wal encodes rows into the memory buffer
first and then writes encoded data from memory buffer to disk.
Wal memory is implemented as xrow buffer which of rotated data chunks.
Each data chunk includes an ibuf with xrow header array and an obuf with
encoded xrow data. Also data chunk has a vclock value as the lower boundary of
stored rows.

Prerequisites: 3794
---
 src/box/CMakeLists.txt |   1 +
 src/box/wal.c          |  38 ++++++-
 src/box/xlog.c         |  77 +++++++++++----
 src/box/xlog.h         |   9 ++
 src/box/xrow_buf.c     | 220 +++++++++++++++++++++++++++++++++++++++++
 src/box/xrow_buf.h     | 138 ++++++++++++++++++++++++++
 6 files changed, 461 insertions(+), 22 deletions(-)
 create mode 100644 src/box/xrow_buf.c
 create mode 100644 src/box/xrow_buf.h

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 9bba37bcb..2dc48e6b5 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -125,6 +125,7 @@ add_library(box STATIC
     bind.c
     execute.c
     wal.c
+    xrow_buf.c
     call.c
     merger.c
     ${lua_sources}
diff --git a/src/box/wal.c b/src/box/wal.c
index 9219d6779..e77bd1ae1 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -43,6 +43,7 @@
 #include "cbus.h"
 #include "coio_task.h"
 #include "replication.h"
+#include "xrow_buf.h"
 
 enum {
 	/**
@@ -156,6 +157,8 @@ struct wal_writer
 	 * Used for replication relays.
 	 */
 	struct rlist watchers;
+	/** Xrow buffer. */
+	struct xrow_buf xrow_buf;
 };
 
 struct wal_msg {
@@ -933,6 +936,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 	int64_t tsn = 0;
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
+		(*row)->tm = ev_now(loop());
 		if ((*row)->replica_id == 0) {
 			(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
 				      vclock_get(base, instance_id);
@@ -1016,25 +1020,49 @@ wal_write_to_disk(struct cmsg *msg)
 	int rc;
 	struct journal_entry *entry;
 	struct stailq_entry *last_committed = NULL;
+	xrow_buf_tx_begin(&writer->xrow_buf, &writer->vclock);
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
 		wal_assign_lsn(&vclock_diff, &writer->vclock,
 			       entry->rows, entry->rows + entry->n_rows);
 		entry->res = vclock_sum(&vclock_diff) +
 			     vclock_sum(&writer->vclock);
-		rc = xlog_write_entry(l, entry);
-		if (rc < 0)
+		struct iovec iov[SMALL_OBUF_IOV_MAX];
+		int iovcnt = xrow_buf_write(&writer->xrow_buf, entry->rows,
+					    entry->rows + entry->n_rows, iov);
+		if (iovcnt < 0) {
+			xrow_buf_tx_rollback(&writer->xrow_buf);
+			goto done;
+		}
+		xlog_tx_begin(l);
+		if (xlog_write_iov(l, iov, iovcnt, entry->n_rows) < 0) {
+			xlog_tx_rollback(l);
+			xrow_buf_tx_rollback(&writer->xrow_buf);
+			goto done;
+		}
+		rc = xlog_tx_commit(l);
+		if (rc < 0) {
+			/* Failed write. */
+			xrow_buf_tx_rollback(&writer->xrow_buf);
 			goto done;
+		}
 		if (rc > 0) {
+			/*
+			 * Data flushed to disk, start a new memory
+			 * buffer transaction
+			 */
 			writer->checkpoint_wal_size += rc;
 			last_committed = &entry->fifo;
 			vclock_merge(&writer->vclock, &vclock_diff);
+			xrow_buf_tx_begin(&writer->xrow_buf, &writer->vclock);
 		}
 		/* rc == 0: the write is buffered in xlog_tx */
 	}
+
 	rc = xlog_flush(l);
-	if (rc < 0)
+	if (rc < 0) {
+		xrow_buf_tx_rollback(&writer->xrow_buf);
 		goto done;
-
+	}
 	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
 	vclock_merge(&writer->vclock, &vclock_diff);
@@ -1102,6 +1130,7 @@ wal_writer_f(va_list ap)
 {
 	(void) ap;
 	struct wal_writer *writer = &wal_writer_singleton;
+	xrow_buf_create(&writer->xrow_buf);
 
 	/** Initialize eio in this thread */
 	coio_enable();
@@ -1141,6 +1170,7 @@ wal_writer_f(va_list ap)
 		xlog_close(&vy_log_writer.xlog, false);
 
 	cpipe_destroy(&writer->tx_prio_pipe);
+	xrow_buf_destroy(&writer->xrow_buf);
 	return 0;
 }
 
diff --git a/src/box/xlog.c b/src/box/xlog.c
index 8254cce20..838b3e56d 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -1275,14 +1275,8 @@ xlog_tx_write(struct xlog *log)
 	return written;
 }
 
-/*
- * Add a row to a log and possibly flush the log.
- *
- * @retval  -1 error, check diag.
- * @retval >=0 the number of bytes written to buffer.
- */
-ssize_t
-xlog_write_row(struct xlog *log, const struct xrow_header *packet)
+static int
+xlog_write_prepare(struct xlog *log)
 {
 	/*
 	 * Automatically reserve space for a fixheader when adding
@@ -1296,17 +1290,17 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
 			return -1;
 		}
 	}
+	return 0;
+}
 
+/*
+ * Append iov to a log internal buffer.
+ */
+static ssize_t
+xlog_writev(struct xlog *log, struct iovec *iov, int iovcnt)
+{
 	struct obuf_svp svp = obuf_create_svp(&log->obuf);
-	size_t page_offset = obuf_size(&log->obuf);
-	/** encode row into iovec */
-	struct iovec iov[XROW_IOVMAX];
-	/** don't write sync to the disk */
-	int iovcnt = xrow_header_encode(packet, 0, iov, 0);
-	if (iovcnt < 0) {
-		obuf_rollback_to_svp(&log->obuf, &svp);
-		return -1;
-	}
+	size_t old_size = obuf_size(&log->obuf);
 	for (int i = 0; i < iovcnt; ++i) {
 		struct errinj *inj = errinj(ERRINJ_WAL_WRITE_PARTIAL,
 					    ERRINJ_INT);
@@ -1325,10 +1319,33 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
 			return -1;
 		}
 	}
+	return obuf_size(&log->obuf) - old_size;
+}
+
+/*
+ * Add a row to a log and possibly flush the log.
+ *
+ * @retval  -1 error, check diag.
+ * @retval >=0 the number of bytes written to buffer.
+ */
+ssize_t
+xlog_write_row(struct xlog *log, const struct xrow_header *packet)
+{
+	if (xlog_write_prepare(log) != 0)
+		return -1;
+
+	/** encode row into iovec */
+	struct iovec iov[XROW_IOVMAX];
+	/** don't write sync to the disk */
+	int iovcnt = xrow_header_encode(packet, 0, iov, 0);
+	if (iovcnt < 0)
+		return -1;
 	assert(iovcnt <= XROW_IOVMAX);
+	ssize_t row_size = xlog_writev(log, iov, iovcnt);
+	if (row_size < 0)
+		return -1;
 	log->tx_rows++;
 
-	size_t row_size = obuf_size(&log->obuf) - page_offset;
 	if (log->is_autocommit &&
 	    obuf_size(&log->obuf) >= XLOG_TX_AUTOCOMMIT_THRESHOLD &&
 	    xlog_tx_write(log) < 0)
@@ -1337,6 +1354,30 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
 	return row_size;
 }
 
+/*
+ * Add an io vector to a log and possibly flush the log.
+ *
+ * @retval  -1 error, check diag.
+ * @retval >=0 the number of bytes written to buffer.
+ */
+ssize_t
+xlog_write_iov(struct xlog *log, struct iovec *iov, int iovcnt, int rows)
+{
+	if (xlog_write_prepare(log) != 0)
+		return -1;
+
+	ssize_t row_size = xlog_writev(log, iov, iovcnt);
+	if (row_size < 0)
+		return -1;
+	log->tx_rows += rows;
+
+	if (log->is_autocommit &&
+	    obuf_size(&log->obuf) >= XLOG_TX_AUTOCOMMIT_THRESHOLD &&
+	    xlog_tx_write(log) < 0)
+		return -1;
+
+	return row_size;
+}
 /**
  * Begin a multi-statement xlog transaction. All xrow objects
  * of a single transaction share the same header and checksum
diff --git a/src/box/xlog.h b/src/box/xlog.h
index a48b05fc4..784a7834b 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -501,6 +501,15 @@ xlog_fallocate(struct xlog *log, size_t size);
 ssize_t
 xlog_write_row(struct xlog *log, const struct xrow_header *packet);
 
+/**
+ * Write a io vector to xlog,
+ *
+ * @retval count of writen bytes
+ * @retval -1 for error
+ */
+ssize_t
+xlog_write_iov(struct xlog *xlog, struct iovec *iov, int iovcnt, int rows);
+
 /**
  * Prevent xlog row buffer offloading, should be use
  * at transaction start to write transaction in one xlog tx
diff --git a/src/box/xrow_buf.c b/src/box/xrow_buf.c
new file mode 100644
index 000000000..d690ac4b7
--- /dev/null
+++ b/src/box/xrow_buf.c
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "xrow_buf.h"
+
+#include "trigger.h"
+#include "fiber.h"
+#include "errinj.h"
+
+/* Xrow buffer chunk options. */
+enum {
+	/* Initial size for xrow headers storage. */
+	XROW_BUF_CHUNK_INITIAL_ROW_COUNT = 4096,
+	/* Initial size for raw data storage. */
+	XROW_BUF_CHUNK_INITIAL_DATA_SIZE = 65536,
+	/* How many rows we will place in one buffer. */
+	XROW_BUF_CHUNK_ROW_LIMIT = 8192,
+	/* How many data we will place in one buffer. */
+	XROW_BUF_CHUNK_DATA_LIMIT = 1 << 19,
+};
+
+void
+xrow_buf_create(struct xrow_buf *xrow_buf)
+{
+	int i;
+	for (i = 0; i < XROW_BUF_CHUNK_COUNT; ++i) {
+		ibuf_create(&xrow_buf->chunk[i].rows, &cord()->slabc,
+			    XROW_BUF_CHUNK_INITIAL_ROW_COUNT *
+			    sizeof(struct xrow_header));
+		obuf_create(&xrow_buf->chunk[i].data, &cord()->slabc,
+			    XROW_BUF_CHUNK_INITIAL_DATA_SIZE);
+	}
+	xrow_buf->last_chunk_gen = 0;
+	xrow_buf->first_chunk_gen = 0;
+	rlist_create(&xrow_buf->on_rotate);
+}
+
+void
+xrow_buf_destroy(struct xrow_buf *xrow_buf)
+{
+	int i;
+	for (i = 0; i < XROW_BUF_CHUNK_COUNT; ++i) {
+		ibuf_destroy(&xrow_buf->chunk[i].rows);
+		obuf_destroy(&xrow_buf->chunk[i].data);
+	}
+}
+
+/*
+ * If the current chunk data limits were reached this function
+ * rotates a xrow buffer to the next chunk. If there is no free chunks then
+ * the oldest one is going to be trucated and the reused as the youngest one.
+ */
+static struct xrow_buf_chunk *
+xrow_buf_rotate(struct xrow_buf *xrow_buf, struct vclock *vclock)
+{
+	struct xrow_buf_chunk *chunk = xrow_buf->chunk +
+		xrow_buf->last_chunk_gen % XROW_BUF_CHUNK_COUNT;
+	/* Check the current chunk could accept new data. */
+	if (ibuf_used(&chunk->rows) < XROW_BUF_CHUNK_ROW_LIMIT *
+				      sizeof(struct xrow_header) &&
+	    obuf_size(&chunk->data) < XROW_BUF_CHUNK_DATA_LIMIT)
+		return chunk;
+
+	/*
+	 * Get the next chunk to accept new data
+	 * and increase last chunk generation.
+	 */
+	++xrow_buf->last_chunk_gen;
+	chunk = xrow_buf->chunk + xrow_buf->last_chunk_gen %
+				  XROW_BUF_CHUNK_COUNT;
+	/* Check there is an unused chunk. */
+	if (xrow_buf->last_chunk_gen - xrow_buf->first_chunk_gen >=
+	    XROW_BUF_CHUNK_COUNT) {
+		/* Discard the chunk data and adjust first chunk generation. */
+		ibuf_reset(&chunk->rows);
+		obuf_reset(&chunk->data);
+		++xrow_buf->first_chunk_gen;
+	}
+
+	trigger_run(&xrow_buf->on_rotate, vclock);
+	return chunk;
+}
+
+void
+xrow_buf_tx_begin(struct xrow_buf *xrow_buf, struct vclock *vclock)
+{
+	/*
+	 * Xrow buffer stores the whole transaction in one chunk to be
+	 * sure that all transaction data could be stored in one obuf.
+	 * So it is a proper time to make a rotation if the current chunk is already
+	 * full.
+	 */
+	struct xrow_buf_chunk *chunk = xrow_buf_rotate(xrow_buf, vclock);
+	/* Save the current xrow buffer state. */
+	xrow_buf->tx_first_row_index = ibuf_used(&chunk->rows) /
+				      sizeof(struct xrow_buf_row_info);
+	xrow_buf->tx_first_row_svp = obuf_create_svp(&chunk->data);
+}
+
+void
+xrow_buf_tx_rollback(struct xrow_buf *xrow_buf)
+{
+	struct xrow_buf_chunk *chunk = xrow_buf->chunk +
+		xrow_buf->last_chunk_gen % XROW_BUF_CHUNK_COUNT;
+	chunk->rows.wpos = chunk->rows.rpos + xrow_buf->tx_first_row_index *
+					      sizeof(struct xrow_buf_row_info);
+	obuf_rollback_to_svp(&chunk->data, &xrow_buf->tx_first_row_svp);
+}
+
+int
+xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
+	       struct xrow_header **end,
+	       struct iovec *iovec)
+{
+	struct xrow_buf_chunk *chunk = xrow_buf->chunk +
+		xrow_buf->last_chunk_gen % XROW_BUF_CHUNK_COUNT;
+
+	/* Save current postion to restore the state in case of an error. */
+	size_t old_rows_size = ibuf_used(&chunk->rows);
+	struct obuf_svp data_svp = obuf_create_svp(&chunk->data);
+
+	/* Allocate space for row descriptors. */
+	struct xrow_buf_row_info *mem_info =
+		(struct xrow_buf_row_info *)
+		ibuf_alloc(&chunk->rows, (end - begin) *
+					   sizeof(struct xrow_buf_row_info));
+	if (mem_info == NULL) {
+		diag_set(OutOfMemory, (end - begin) *
+				      sizeof(struct xrow_buf_row_info),
+			 "region", "wal memory rows");
+		goto error;
+	}
+
+	/* Write rows. */
+	struct xrow_header **row;
+	for (row = begin; row < end; ++row, ++mem_info) {
+		struct errinj *inj = errinj(ERRINJ_WAL_BREAK_LSN, ERRINJ_INT);
+		if (inj != NULL && inj->iparam == (*row)->lsn) {
+			(*row)->lsn = inj->iparam - 1;
+			say_warn("injected broken lsn: %lld",
+				 (long long) (*row)->lsn);
+		}
+
+		/* Reserve space for xrow encoded data. */
+		char *data = obuf_reserve(&chunk->data, xrow_approx_len(*row));
+		if (data == NULL) {
+			diag_set(OutOfMemory, xrow_approx_len(*row),
+				 "region", "wal memory data");
+			goto error;
+		}
+
+		struct iovec iov[XROW_BODY_IOVMAX];
+		/*
+		 * xrow_header_encode allocates fiber gc space only for row
+		 * header.
+		 */
+		int iov_cnt = xrow_header_encode(*row, 0, iov, 0);
+		if (iov_cnt < 0)
+			goto error;
+		/* Copy row header. */
+		data = obuf_alloc(&chunk->data, iov[0].iov_len);
+		memcpy(data, iov[0].iov_base, iov[0].iov_len);
+		/* Initialize row descriptor. */
+		mem_info->xrow = **row;
+		mem_info->data = data;
+		mem_info->size = iov[0].iov_len;
+		/* Copy bodies and patch it's location. */
+		int i;
+		for (i = 1; i < iov_cnt; ++i) {
+			data = obuf_alloc(&chunk->data, iov[i].iov_len);
+			memcpy(data, iov[i].iov_base, iov[i].iov_len);
+			mem_info->xrow.body[i - 1].iov_base = data;
+			mem_info->size += iov[i].iov_len;
+		}
+	}
+
+	/* Copy all obuf lines containing the written data. */
+	int iov_cnt = 1 + obuf_iovcnt(&chunk->data) - data_svp.pos;
+	memcpy(iovec, chunk->data.iov + data_svp.pos,
+	       sizeof(struct iovec) * iov_cnt);
+	/* Adjust first iover member starting pointer and length. */
+	iovec[0].iov_base += data_svp.iov_len;
+	iovec[0].iov_len -= data_svp.iov_len;
+	return iov_cnt;
+
+error:
+	/* Restore buffer state. */
+	chunk->rows.wpos = chunk->rows.rpos + old_rows_size;
+	obuf_rollback_to_svp(&chunk->data, &data_svp);
+	return -1;
+}
+
diff --git a/src/box/xrow_buf.h b/src/box/xrow_buf.h
new file mode 100644
index 000000000..76f9bc6f8
--- /dev/null
+++ b/src/box/xrow_buf.h
@@ -0,0 +1,138 @@
+#ifndef TARANTOOL_XROW_BUF_H_INCLUDED
+#define TARANTOOL_XROW_BUF_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <stdint.h>
+
+#include "small/ibuf.h"
+#include "small/obuf.h"
+#include "small/rlist.h"
+#include "xrow.h"
+#include "vclock.h"
+
+enum {
+	/*
+	 * Xrow memory object contains some count of rotating data chunks.
+	 * Every rotation estimated decrease in amount of stored rows is
+	 * about 1/(COUNT OF CHUNKS). However the bigger value makes rotation
+	 * more frequent, the decrease would be smoother and size of
+	 * a wal memory more stable.
+	 */
+	XROW_BUF_CHUNK_COUNT = 8,
+};
+
+/*
+ * Each stored xrow is described by structure which contains decoded xrow
+ * header and encoded data pointer and size.
+ */
+struct xrow_buf_row_info {
+	/* Decoded xrow header. */
+	struct xrow_header xrow;
+	/* Pointer to the xrow encoded raw data. */
+	void *data;
+	/* xrow raw data size. */
+	size_t size;
+};
+
+/*
+ * Xrow memory data chunk info contains
+ *  a vclock just before the first stored row,
+ *  an ibuf with row descriptors
+ *  an obuf with encoded data
+ */
+struct xrow_buf_chunk {
+	/* vclock just before the first row. */
+	struct vclock vclock;
+	/* A row descriptor array. */
+	struct ibuf rows;
+	/* Data storage for encoded row data. */
+	struct obuf data;
+};
+
+/*
+ * Xrow memory contains XROW_BUF_CHUNK_COUNT chunks in a ring. Xrow memory
+ * tracks the first and the last used chunk generation. The generation is a
+ * monotonic sequence increasing every time when a chunk rotated on the top of
+ * a xrow memory. So each xrow memory rotation increases the last used chunk
+ * generation and each chunk discard increases the first used chunk generation.
+ * To evaluate an effective chunk index from the generation a modulo operation
+ * (or mask) should be used.
+ */
+struct xrow_buf {
+	/* A generation of the first used chunk. */
+	uint32_t first_chunk_gen;
+	/* A generation of the last used chunk. */
+	uint32_t last_chunk_gen;
+	/* A chunks array. */
+	struct xrow_buf_chunk chunk[XROW_BUF_CHUNK_COUNT];
+	/* The first row index written in the current transaction. */
+	uint32_t tx_first_row_index;
+	/* The first row data svp written in the current transaction. */
+	struct obuf_svp tx_first_row_svp;
+	/* A trigger to fire on rotation event. */
+	struct rlist on_rotate;
+};
+
+/* Create a wal memory. */
+void
+xrow_buf_create(struct xrow_buf *xrow_buf);
+
+/* Destroy wal memory structure. */
+void
+xrow_buf_destroy(struct xrow_buf *xrow_buf);
+
+/*
+ * Start a xrow buffer transaction.
+ * This function may rotate the xrow buffer and use the vclock as a top chunk
+ * starting vclock.
+ */
+void
+xrow_buf_tx_begin(struct xrow_buf *xrow_buf, struct vclock *vclock);
+
+/* Discard all the data written after the last transaction. */
+void
+xrow_buf_tx_rollback(struct xrow_buf *xrow_buf);
+
+/*
+ * Append xrow array to a wal memory. The array is placed into one
+ * wal memory buffer and each row takes a continuous space in a data buffer.
+ * continuously. The start pointer and length of encoded data are placed to
+ * out.
+ * Return
+ *  count of written iovec in case of Ok
+ *  -1 in case of error
+ */
+int
+xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
+	       struct xrow_header **end,
+	       struct iovec *iovec);
+
+#endif /* TARANTOOL_XROW_BUF_H_INCLUDED */
-- 
2.23.0





More information about the Tarantool-patches mailing list