[tarantool-patches] [PATCH v3 3/4] wal: xrow buffer cursor

Georgy Kirichenko georgy at tarantool.org
Wed Oct 9 19:45:45 MSK 2019


This structure enables to find a xrow buffer row less than given vclock
and then fetch row by row from the xrow forwards to the last appended one.
A xrow buffer cursor is essential to allow the from memory replication
because of a relay which required to be able to fetch all logged rows,
stored in a wal memory (implemented as xrow buffer), from given position
and then follow all new changes.

Part of #3794
---
 src/box/xrow_buf.c | 97 ++++++++++++++++++++++++++++++++++++++++++++++
 src/box/xrow_buf.h | 38 ++++++++++++++++++
 2 files changed, 135 insertions(+)

diff --git a/src/box/xrow_buf.c b/src/box/xrow_buf.c
index e4455e01a..a5bf1efed 100644
--- a/src/box/xrow_buf.c
+++ b/src/box/xrow_buf.c
@@ -283,3 +283,100 @@ error:
 	return -1;
 }
 
+/*
+ * Returns an index of the first row after given vclock
+ * in a chunk.
+ */
+static int
+xrow_buf_chunk_locate_vclock(struct xrow_buf_chunk *chunk,
+			     struct vclock *vclock)
+{
+	for (uint32_t row_index = 0; row_index < chunk->row_count;
+	     ++row_index) {
+		struct xrow_header *row = &chunk->row_info[row_index].xrow;
+		if (vclock_get(vclock, row->replica_id) < row->lsn)
+			return row_index;
+	}
+	/*
+	 * We did not find any row with vclock not less than
+	 * given one so return an index just after the last one.
+	 */
+	return chunk->row_count;
+}
+
+int
+xrow_buf_cursor_create(struct xrow_buf *xrow_buf,
+		       struct xrow_buf_cursor *xrow_buf_cursor,
+		       struct vclock *vclock)
+{
+	/* Check if a buffer has requested data. */
+	struct xrow_buf_chunk *chunk =
+			xrow_buf->chunk + xrow_buf->first_chunk_index %
+					  XROW_BUF_CHUNK_COUNT;
+	int rc = vclock_compare(&chunk->vclock, vclock);
+	if (rc >= 0 || rc == VCLOCK_ORDER_UNDEFINED) {
+		/* The requested data was discarded. */
+		return -1;
+	}
+	uint32_t index = xrow_buf->first_chunk_index;
+	while (index < xrow_buf->last_chunk_index) {
+		chunk = xrow_buf->chunk + (index + 1) % XROW_BUF_CHUNK_COUNT;
+		int rc = vclock_compare(&chunk->vclock, vclock);
+		if (rc > 0 || rc == VCLOCK_ORDER_UNDEFINED) {
+			/* Next chunk has younger rows than requested vclock. */
+			break;
+		}
+		++index;
+	}
+	chunk = xrow_buf->chunk + (index) % XROW_BUF_CHUNK_COUNT;
+	xrow_buf_cursor->chunk_index = index;
+	xrow_buf_cursor->row_index = xrow_buf_chunk_locate_vclock(chunk, vclock);
+	return 0;
+}
+
+int
+xrow_buf_cursor_next(struct xrow_buf *xrow_buf,
+		     struct xrow_buf_cursor *xrow_buf_cursor,
+		     struct xrow_header **row, void **data, size_t *size)
+{
+	if (xrow_buf->first_chunk_index > xrow_buf_cursor->chunk_index) {
+		/* A cursor current chunk was discarded by a buffer. */
+		return -1;
+	}
+
+	struct xrow_buf_chunk *chunk;
+next_chunk:
+	chunk = xrow_buf->chunk + xrow_buf_cursor->chunk_index %
+				  XROW_BUF_CHUNK_COUNT;
+	size_t chunk_row_count = chunk->row_count;
+	if (chunk_row_count == xrow_buf_cursor->row_index) {
+		/*
+		 * No more rows in a buffer but there are two
+		 * possibilities:
+		 *  1. A cursor current chunk is the last one and there is
+		 * no more rows in the cursor.
+		 *  2. There is a chunk after the current one
+		 * so we can switch to it.
+		 * */
+		if (xrow_buf->last_chunk_index ==
+		    xrow_buf_cursor->chunk_index) {
+			/*
+			 * The current chunk is the last one -
+			 * no more rows in a buffer.
+			 */
+			return 1;
+		}
+		/* Switch to the next chunk. */
+		xrow_buf_cursor->row_index = 0;
+		++xrow_buf_cursor->chunk_index;
+		goto next_chunk;
+	}
+	/* Return row header and data pointers and data size. */
+	struct xrow_buf_row_info *row_info = chunk->row_info +
+					     xrow_buf_cursor->row_index;
+	*row = &row_info->xrow;
+	*data = row_info->data;
+	*size = row_info->size;
+	++xrow_buf_cursor->row_index;
+	return 0;
+}
diff --git a/src/box/xrow_buf.h b/src/box/xrow_buf.h
index 266cc0f76..c2de2b5a5 100644
--- a/src/box/xrow_buf.h
+++ b/src/box/xrow_buf.h
@@ -157,4 +157,42 @@ xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
 	       struct xrow_header **end,
 	       struct iovec **iovec);
 
+/**
+ * Xrow buffer cursor used to search a position in a buffer
+ * and then fetch rows one by one from the postion toward the
+ * buffer last append row.
+ */
+struct xrow_buf_cursor {
+	/** Current chunk global index. */
+	uint32_t chunk_index;
+	/** Row index in the current chunk. */
+	uint32_t row_index;
+};
+
+/**
+ * Create a xrow buffer cursor and set it's position to
+ * the first row after passed vclock value.
+ *
+ * @retval 0 cursor was created
+ * @retval -1 if a vclock was discarded
+ */
+int
+xrow_buf_cursor_create(struct xrow_buf *xrow_buf,
+		       struct xrow_buf_cursor *xrow_buf_cursor,
+		       struct vclock *vclock);
+
+/**
+ * Fetch next row from a xrow buffer cursor and return the row
+ * header and encoded data pointers as well as encoded data size
+ * in the corresponding parameters.
+ *
+ * @retval 0 in case of success
+ * @retval 1 if there is no more rows in a buffer
+ * @retval -1 if this cursor postion was discarded by xrow buffer
+ */
+int
+xrow_buf_cursor_next(struct xrow_buf *xrow_buf,
+		     struct xrow_buf_cursor *xrow_buf_cursor,
+		     struct xrow_header **row, void **data, size_t *size);
+
 #endif /* TARANTOOL_XROW_BUF_H_INCLUDED */
-- 
2.23.0





More information about the Tarantool-patches mailing list