[tarantool-patches] [PATCH 4/7] Replication: wal memory buffer
    Georgy Kirichenko 
    georgy at tarantool.org
       
    Tue Aug 13 09:27:42 MSK 2019
    
    
  
Introduce a wal memory buffer which contains logged transactions. Wal
writes all rows into the memory buffer and then flushes new data to disk.
Wal memory consist of rotated pairs of xrow header array and encoded xrow
data buffer.
---
 src/box/CMakeLists.txt |   1 +
 src/box/wal.c          |  38 ++++--
 src/box/wal_mem.c      | 273 +++++++++++++++++++++++++++++++++++++++++
 src/box/wal_mem.h      | 166 +++++++++++++++++++++++++
 src/box/xlog.c         |  77 +++++++++---
 src/box/xlog.h         |   9 ++
 6 files changed, 536 insertions(+), 28 deletions(-)
 create mode 100644 src/box/wal_mem.c
 create mode 100644 src/box/wal_mem.h
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 9bba37bcb..bd31b07df 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -125,6 +125,7 @@ add_library(box STATIC
     bind.c
     execute.c
     wal.c
+    wal_mem.c
     call.c
     merger.c
     ${lua_sources}
diff --git a/src/box/wal.c b/src/box/wal.c
index a09ab7187..6cdb0db15 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -44,6 +44,7 @@
 #include "coio_task.h"
 #include "replication.h"
 #include "gc.h"
+#include "wal_mem.h"
 
 enum {
 	/**
@@ -156,6 +157,8 @@ struct wal_writer
 	 * Used for replication relays.
 	 */
 	struct rlist watchers;
+	/** Wal memory buffer. */
+	struct wal_mem wal_mem;
 };
 
 enum wal_msg_type {
@@ -936,6 +939,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 	int64_t tsn = 0;
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
+		(*row)->tm = ev_now(loop());
 		if ((*row)->replica_id == 0) {
 			(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
 				      vclock_get(base, instance_id);
@@ -1027,25 +1031,37 @@ wal_write_batch(struct wal_writer *writer, struct wal_msg *wal_msg)
 	int rc;
 	struct journal_entry *entry;
 	struct stailq_entry *last_committed = NULL;
+	wal_mem_svp(&writer->wal_mem, &writer->vclock);
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
 		wal_assign_lsn(&vclock_diff, &writer->vclock,
 			       entry->rows, entry->rows + entry->n_rows);
 		entry->res = vclock_sum(&vclock_diff) +
 			     vclock_sum(&writer->vclock);
-		rc = xlog_write_entry(l, entry);
-		if (rc < 0)
+		if (wal_mem_write(&writer->wal_mem, entry->rows,
+				  entry->rows + entry->n_rows) < 0) {
+			wal_mem_svp_reset(&writer->wal_mem);
 			goto done;
-		if (rc > 0) {
-			writer->checkpoint_wal_size += rc;
-			last_committed = &entry->fifo;
-			vclock_merge(&writer->vclock, &vclock_diff);
 		}
-		/* rc == 0: the write is buffered in xlog_tx */
 	}
-	rc = xlog_flush(l);
-	if (rc < 0)
-		goto done;
 
+	struct iovec iov[SMALL_OBUF_IOV_MAX];
+	int iovcnt;
+	iovcnt = wal_mem_svp_data(&writer->wal_mem, iov);
+	xlog_tx_begin(l);
+	if (xlog_write_iov(l, iov, iovcnt,
+			   wal_mem_svp_row_count(&writer->wal_mem)) < 0) {
+		xlog_tx_rollback(l);
+		wal_mem_svp_reset(&writer->wal_mem);
+		goto done;
+	}
+	rc = xlog_tx_commit(l);
+	if (rc == 0)
+		/* Data is buffered but not yet flushed. */
+		rc = xlog_flush(l);
+	if (rc < 0) {
+		wal_mem_svp_reset(&writer->wal_mem);
+		goto done;
+	}
 	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
 	vclock_merge(&writer->vclock, &vclock_diff);
@@ -1147,6 +1163,7 @@ wal_cord_f(va_list ap)
 {
 	(void) ap;
 	struct wal_writer *writer = &wal_writer_singleton;
+	wal_mem_create(&writer->wal_mem);
 
 	/** Initialize eio in this thread */
 	coio_enable();
@@ -1195,6 +1212,7 @@ wal_cord_f(va_list ap)
 		xlog_close(&vy_log_writer.xlog, false);
 
 	cpipe_destroy(&writer->tx_prio_pipe);
+	wal_mem_destroy(&writer->wal_mem);
 	return 0;
 }
 
diff --git a/src/box/wal_mem.c b/src/box/wal_mem.c
new file mode 100644
index 000000000..fdfc6f93d
--- /dev/null
+++ b/src/box/wal_mem.c
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "wal_mem.h"
+
+#include "fiber.h"
+#include "errinj.h"
+
+enum {
+	/* Initial size for rows storage. */
+	WAL_MEM_BUF_INITIAL_ROW_COUNT = 4096,
+	/* Initial size for data storage. */
+	WAL_MEM_BUF_INITIAL_DATA_SIZE = 65536,
+	/* How many rows we will place in one buffer. */
+	WAL_MEM_BUF_ROWS_LIMIT = 8192,
+	/* How many data we will place in one buffer. */
+	WAL_MEM_BUF_DATA_LIMIT = 1 << 19,
+};
+
+void
+wal_mem_create(struct wal_mem *wal_mem)
+{
+	int i;
+	for (i = 0; i < WAL_MEM_BUF_COUNT; ++i) {
+		ibuf_create(&wal_mem->buf[i].rows, &cord()->slabc,
+			    WAL_MEM_BUF_INITIAL_ROW_COUNT *
+			    sizeof(struct xrow_header));
+		obuf_create(&wal_mem->buf[i].data, &cord()->slabc,
+			    WAL_MEM_BUF_INITIAL_DATA_SIZE);
+	}
+	wal_mem->last_buf_index = 0;
+	wal_mem->first_buf_index = 0;
+}
+
+void
+wal_mem_destroy(struct wal_mem *wal_mem)
+{
+	int i;
+	for (i = 0; i < WAL_MEM_BUF_COUNT; ++i) {
+		ibuf_destroy(&wal_mem->buf[i].rows);
+		obuf_destroy(&wal_mem->buf[i].data);
+	}
+}
+
+/*
+ * Switch to the next buffer if required and discard outdated data.
+ */
+static struct wal_mem_buf *
+wal_mem_rotate(struct wal_mem *wal_mem)
+{
+	struct wal_mem_buf *mem_buf = wal_mem->buf +
+				      wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+	if (ibuf_used(&mem_buf->rows) < WAL_MEM_BUF_ROWS_LIMIT *
+					sizeof(struct xrow_header) &&
+	    obuf_size(&mem_buf->data) < WAL_MEM_BUF_DATA_LIMIT)
+		return mem_buf;
+	/* Switch to the next buffer (a target to append new data). */
+	++wal_mem->last_buf_index;
+	mem_buf = wal_mem->buf + wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+	if (wal_mem->last_buf_index - wal_mem->first_buf_index <
+	    WAL_MEM_BUF_COUNT) {
+		/* The buffer is unused, nothing to do. */
+		return mem_buf;
+	}
+	/* Discard data and adjust first buffer index. */
+	ibuf_reset(&mem_buf->rows);
+	obuf_reset(&mem_buf->data);
+	++wal_mem->first_buf_index;
+	return mem_buf;
+}
+
+void
+wal_mem_svp(struct wal_mem *wal_mem, struct vclock *vclock)
+{
+	struct wal_mem_buf *mem_buf = wal_mem_rotate(wal_mem);
+	/* Check if the current buffer is empty and setup vclock. */
+	if (ibuf_used(&mem_buf->rows) == 0)
+		vclock_copy(&mem_buf->vclock, vclock);
+	wal_mem->tx_first_row_index = ibuf_used(&mem_buf->rows) /
+				      sizeof(struct wal_mem_buf_row);
+	wal_mem->tx_first_row_svp = obuf_create_svp(&mem_buf->data);
+}
+
+/* Commit a wal memory transaction and build an iovec with encoded data. */
+int
+wal_mem_svp_data(struct wal_mem *wal_mem, struct iovec *iovec)
+{
+	struct wal_mem_buf *mem_buf = wal_mem->buf +
+				      wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+	if (wal_mem->tx_first_row_svp.used == obuf_size(&mem_buf->data))
+		return 0;
+
+	int iov_cnt = 1 + obuf_iovcnt(&mem_buf->data) -
+		      wal_mem->tx_first_row_svp.pos;
+	memcpy(iovec, mem_buf->data.iov + wal_mem->tx_first_row_svp.pos,
+	       sizeof(struct iovec) * iov_cnt);
+	iovec[0].iov_base += wal_mem->tx_first_row_svp.iov_len;
+	iovec[0].iov_len -= wal_mem->tx_first_row_svp.iov_len;
+	return iov_cnt;
+}
+
+/* Truncate all the data written in the current transaction. */
+void
+wal_mem_svp_reset(struct wal_mem *wal_mem)
+{
+	struct wal_mem_buf *mem_buf = wal_mem->buf +
+				      wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+	mem_buf->rows.wpos = mem_buf->rows.rpos + wal_mem->tx_first_row_index *
+						  sizeof(struct wal_mem_buf_row);
+	obuf_rollback_to_svp(&mem_buf->data, &wal_mem->tx_first_row_svp);
+}
+
+int
+wal_mem_write(struct wal_mem *wal_mem, struct xrow_header **begin,
+	      struct xrow_header **end)
+{
+	struct wal_mem_buf *mem_buf = wal_mem->buf +
+				      wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+
+	/* Save rollback values. */
+	size_t old_rows_size = ibuf_used(&mem_buf->rows);
+	struct obuf_svp data_svp = obuf_create_svp(&mem_buf->data);
+
+	/* Allocate space for row descriptors. */
+	struct wal_mem_buf_row *mem_row =
+		(struct wal_mem_buf_row *)
+		ibuf_alloc(&mem_buf->rows, (end - begin) *
+					   sizeof(struct wal_mem_buf_row));
+	if (mem_row == NULL) {
+		diag_set(OutOfMemory, (end - begin) *
+				      sizeof(struct wal_mem_buf_row),
+			 "region", "wal memory rows");
+		goto error;
+	}
+	/* Append rows. */
+	struct xrow_header **row;
+	for (row = begin; row < end; ++row, ++mem_row) {
+		struct errinj *inj = errinj(ERRINJ_WAL_BREAK_LSN, ERRINJ_INT);
+		if (inj != NULL && inj->iparam == (*row)->lsn) {
+			(*row)->lsn = inj->iparam - 1;
+			say_warn("injected broken lsn: %lld",
+				 (long long) (*row)->lsn);
+		}
+
+		/* Reserve space. */
+		char *data = obuf_reserve(&mem_buf->data, xrow_approx_len(*row));
+		if (data == NULL) {
+			diag_set(OutOfMemory, xrow_approx_len(*row),
+				 "region", "wal memory data");
+			goto error;
+		}
+
+		struct iovec iov[XROW_BODY_IOVMAX];
+		/*
+		 * xrow_header_encode allocates fiber gc space only for row
+		 * header.
+		 */
+		int iov_cnt = xrow_header_encode(*row, 0, iov, 0);
+		if (iov_cnt < 0)
+			goto error;
+		/* Copy row header. */
+		data = obuf_alloc(&mem_buf->data, iov[0].iov_len);
+		memcpy(data, iov[0].iov_base, iov[0].iov_len);
+		/* Initialize row descriptor. */
+		mem_row->xrow = **row;
+		mem_row->data = data;
+		mem_row->size = iov[0].iov_len;
+		/* Write bodies and patch location. */
+		int i;
+		for (i = 1; i < iov_cnt; ++i) {
+			/* Append xrow bodies and patch xrow pointers. */
+			data = obuf_alloc(&mem_buf->data, iov[i].iov_len);
+			memcpy(data, iov[i].iov_base, iov[i].iov_len);
+			mem_row->xrow.body[i - 1].iov_base = data;
+			mem_row->size += iov[i].iov_len;
+		}
+	}
+	return 0;
+
+error:
+	/* Restore buffer state. */
+	mem_buf->rows.wpos = mem_buf->rows.rpos + old_rows_size;
+	obuf_rollback_to_svp(&mem_buf->data, &data_svp);
+	return -1;
+}
+
+int
+wal_mem_cursor_create(struct wal_mem *wal_mem,
+		      struct wal_mem_cursor *wal_mem_cursor,
+		      struct vclock *vclock)
+{
+	uint64_t buf_index;
+	for (buf_index = wal_mem->first_buf_index;
+	     buf_index <= wal_mem->last_buf_index;
+	     ++buf_index) {
+		struct wal_mem_buf *mem_buf = wal_mem->buf +
+				      wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+		int rc = vclock_compare(&mem_buf->vclock, vclock);
+		if (rc != 0 && rc != -1)
+			break;
+	}
+	if (buf_index == wal_mem->first_buf_index)
+		return -1;
+	wal_mem_cursor->buf_index = buf_index - 1;
+	wal_mem_cursor->row_index = 0;
+	return 0;
+}
+
+int
+wal_mem_cursor_next(struct wal_mem *wal_mem,
+		    struct wal_mem_cursor *wal_mem_cursor,
+		    struct xrow_header **row,
+		    void **data,
+		    size_t *size)
+{
+	if (wal_mem->first_buf_index > wal_mem_cursor->buf_index) {
+		/* Buffer was discarded. */
+		return -1;
+	}
+
+	struct wal_mem_buf *mem_buf;
+	size_t last_row_index;
+
+next_buffer:
+	mem_buf = wal_mem->buf +
+		  wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+	last_row_index = ibuf_used(&mem_buf->rows) /
+			 sizeof(struct wal_mem_buf_row);
+	if (last_row_index == wal_mem_cursor->row_index) {
+		/* No more rows in the current buffer. */
+		if (wal_mem->last_buf_index == wal_mem_cursor->buf_index)
+			/* No more rows in the memory. */
+			return 1;
+		wal_mem_cursor->row_index = 0;
+		++wal_mem_cursor->buf_index;
+		goto next_buffer;
+	}
+	struct wal_mem_buf_row *buf_row =
+		(struct wal_mem_buf_row *)mem_buf->rows.rpos +
+		wal_mem_cursor->row_index;
+	*row = &buf_row->xrow;
+	*data = buf_row->data;
+	*size = buf_row->size;
+	return 0;
+}
diff --git a/src/box/wal_mem.h b/src/box/wal_mem.h
new file mode 100644
index 000000000..d26d00157
--- /dev/null
+++ b/src/box/wal_mem.h
@@ -0,0 +1,166 @@
+#ifndef TARANTOOL_WAL_MEM_H_INCLUDED
+#define TARANTOOL_WAL_MEM_H_INCLUDED
+/*
+ * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <stdint.h>
+
+#include "small/ibuf.h"
+#include "small/obuf.h"
+#include "xrow.h"
+#include "vclock.h"
+
+enum {
+	/*
+	 * Wal memory object contains some count of rotating data buffers.
+	 * Estimated decrease in amount of stored row is about
+	 * 1/(COUNT OF BUFFERS). However the bigger value makes rotation
+	 * more frequent, the decrease would be smoother and size of
+	 * a wal memory more stable.
+	 */
+	WAL_MEM_BUF_COUNT = 8,
+};
+
+/*
+ * A wal memory row descriptor which contains decoded xrow header and
+ * encoded data pointer and size.
+ */
+struct wal_mem_buf_row {
+	/* Decoded xrow header. */
+	struct xrow_header xrow;
+	/* Pointer to the xrow encoded raw data. */
+	void *data;
+	/* xrow raw data size. */
+	size_t size;
+};
+
+/*
+ * Wal memory data buffer which contains
+ *  a vclock just before the first contained row,
+ *  an ibuf with row descriptors
+ *  an obuf with encoded data
+ */
+struct wal_mem_buf {
+	/* vclock just before the first row. */
+	struct vclock vclock;
+	/* A row descriptor array. */
+	struct ibuf rows;
+	/* Data storage for encoded row data. */
+	struct obuf data;
+};
+
+/*
+ * Wal memory contains WAL_MEM_BUF_COUNT wal memory buffers which are
+ * organized in a ring. In order to track Wal memory tracks the first and
+ * the last used buffers indexes (generation) and those indexes are not wrapped
+ * around the ring. Each rotation increases the last buffer index and
+ * each buffer discard increases the first buffer index. To evaluate effective
+ * index in an wal memory array a modulo operation (or mask) should be used.
+ */
+struct wal_mem {
+	/* An index of the first used buffer. */
+	uint64_t first_buf_index;
+	/* An index of the last used buffer. */
+	uint64_t last_buf_index;
+	/* A memory buffer array. */
+	struct wal_mem_buf buf[WAL_MEM_BUF_COUNT];
+	/* The first row index written in the current transaction. */
+	uint32_t tx_first_row_index;
+	/* The first row data svp written in the current transaction. */
+	struct obuf_svp tx_first_row_svp;
+};
+
+/* Create a wal memory. */
+void
+wal_mem_create(struct wal_mem *wal_mem);
+
+/* Destroy wal memory structure. */
+void
+wal_mem_destroy(struct wal_mem *wal_mem);
+
+/*
+ * Rotate a wal memory if required and save the current wal memory write
+ * position.
+ */
+void
+wal_mem_svp(struct wal_mem *wal_mem, struct vclock *vclock);
+
+/* Retrieve data after last svp. */
+int
+wal_mem_svp_data(struct wal_mem *wal_mem, struct iovec *iovec);
+
+/* Truncate all the data written after the last svp. */
+void
+wal_mem_svp_reset(struct wal_mem *wal_mem);
+
+/* Count of rows written since the last svp. */
+static inline int
+wal_mem_svp_row_count(struct wal_mem *wal_mem)
+{
+	struct wal_mem_buf *mem_buf = wal_mem->buf +
+				      wal_mem->last_buf_index % WAL_MEM_BUF_COUNT;
+	return ibuf_used(&mem_buf->rows) / sizeof(struct wal_mem_buf_row) -
+	       wal_mem->tx_first_row_index;
+}
+
+/*
+ * Append xrow array to a wal memory. The array is placed into one
+ * wal memory buffer and each row takes a continuous space in a data buffer.
+ * continuously.
+ * Return
+ *  0 for Ok
+ *  -1 in case of error
+ */
+int
+wal_mem_write(struct wal_mem *wal_mem, struct xrow_header **begin,
+	      struct xrow_header **end);
+
+/* Wal memory cursor to track a position in a wal memory. */
+struct wal_mem_cursor {
+	/* Current memory buffer index. */
+	uint64_t buf_index;
+	/* Current row index. */
+	uint32_t row_index;
+};
+
+/* Create a wal memory cursor from the wal memory current position. */
+int
+wal_mem_cursor_create(struct wal_mem *wal_mem,
+		      struct wal_mem_cursor *wal_mem_cursor,
+		      struct vclock *vclock);
+
+int
+wal_mem_cursor_next(struct wal_mem *wal_mem,
+		    struct wal_mem_cursor *wal_mem_cursor,
+		    struct xrow_header **row,
+		    void **data,
+		    size_t *size);
+
+#endif /* TARANTOOL_WAL_MEM_H_INCLUDED */
diff --git a/src/box/xlog.c b/src/box/xlog.c
index 54a7b5246..363f93ef5 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -1267,14 +1267,8 @@ xlog_tx_write(struct xlog *log)
 	return written;
 }
 
-/*
- * Add a row to a log and possibly flush the log.
- *
- * @retval  -1 error, check diag.
- * @retval >=0 the number of bytes written to buffer.
- */
-ssize_t
-xlog_write_row(struct xlog *log, const struct xrow_header *packet)
+static int
+xlog_write_prepare(struct xlog *log)
 {
 	/*
 	 * Automatically reserve space for a fixheader when adding
@@ -1288,17 +1282,17 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
 			return -1;
 		}
 	}
+	return 0;
+}
 
+/*
+ * Append iov to a log internal buffer.
+ */
+static ssize_t
+xlog_writev(struct xlog *log, struct iovec *iov, int iovcnt)
+{
 	struct obuf_svp svp = obuf_create_svp(&log->obuf);
-	size_t page_offset = obuf_size(&log->obuf);
-	/** encode row into iovec */
-	struct iovec iov[XROW_IOVMAX];
-	/** don't write sync to the disk */
-	int iovcnt = xrow_header_encode(packet, 0, iov, 0);
-	if (iovcnt < 0) {
-		obuf_rollback_to_svp(&log->obuf, &svp);
-		return -1;
-	}
+	size_t old_size = obuf_size(&log->obuf);
 	for (int i = 0; i < iovcnt; ++i) {
 		struct errinj *inj = errinj(ERRINJ_WAL_WRITE_PARTIAL,
 					    ERRINJ_INT);
@@ -1317,10 +1311,33 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
 			return -1;
 		}
 	}
+	return obuf_size(&log->obuf) - old_size;
+}
+
+/*
+ * Add a row to a log and possibly flush the log.
+ *
+ * @retval  -1 error, check diag.
+ * @retval >=0 the number of bytes written to buffer.
+ */
+ssize_t
+xlog_write_row(struct xlog *log, const struct xrow_header *packet)
+{
+	if (xlog_write_prepare(log) != 0)
+		return -1;
+
+	/** encode row into iovec */
+	struct iovec iov[XROW_IOVMAX];
+	/** don't write sync to the disk */
+	int iovcnt = xrow_header_encode(packet, 0, iov, 0);
+	if (iovcnt < 0)
+		return -1;
 	assert(iovcnt <= XROW_IOVMAX);
+	ssize_t row_size = xlog_writev(log, iov, iovcnt);
+	if (row_size < 0)
+		return -1;
 	log->tx_rows++;
 
-	size_t row_size = obuf_size(&log->obuf) - page_offset;
 	if (log->is_autocommit &&
 	    obuf_size(&log->obuf) >= XLOG_TX_AUTOCOMMIT_THRESHOLD &&
 	    xlog_tx_write(log) < 0)
@@ -1329,6 +1346,30 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
 	return row_size;
 }
 
+/*
+ * Add an io vector to a log and possibly flush the log.
+ *
+ * @retval  -1 error, check diag.
+ * @retval >=0 the number of bytes written to buffer.
+ */
+ssize_t
+xlog_write_iov(struct xlog *log, struct iovec *iov, int iovcnt, int rows)
+{
+	if (xlog_write_prepare(log) != 0)
+		return -1;
+
+	ssize_t row_size = xlog_writev(log, iov, iovcnt);
+	if (row_size < 0)
+		return -1;
+	log->tx_rows += rows;
+
+	if (log->is_autocommit &&
+	    obuf_size(&log->obuf) >= XLOG_TX_AUTOCOMMIT_THRESHOLD &&
+	    xlog_tx_write(log) < 0)
+		return -1;
+
+	return row_size;
+}
 /**
  * Begin a multi-statement xlog transaction. All xrow objects
  * of a single transaction share the same header and checksum
diff --git a/src/box/xlog.h b/src/box/xlog.h
index 6a71e0cbd..713e14e2b 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -501,6 +501,15 @@ xlog_fallocate(struct xlog *log, size_t size);
 ssize_t
 xlog_write_row(struct xlog *log, const struct xrow_header *packet);
 
+/**
+ * Write a io vector to xlog,
+ *
+ * @retval count of writen bytes
+ * @retval -1 for error
+ */
+ssize_t
+xlog_write_iov(struct xlog *xlog, struct iovec *iov, int iovcnt, int rows);
+
 /**
  * Prevent xlog row buffer offloading, should be use
  * at transaction start to write transaction in one xlog tx
-- 
2.22.0
    
    
More information about the Tarantool-patches
mailing list