[tarantool-patches] [PATCH v2 3/4] wal: xrow buffer cursor

Georgy Kirichenko georgy at tarantool.org
Wed Sep 18 12:36:10 MSK 2019


This structure enables to find a xrow buffer row less than given vclock
and then fetch row by row from the xrow forwards to the last appended ones.
A xrow buffer cursor is essential to allow from memory replication because
of relay which required to be able to fetch all log rows, stored in a wal
memory (implemented as xrow buffer), from given position and then follow
all new changes.

Prerequisites: #3794
---
 src/box/xrow_buf.c | 59 ++++++++++++++++++++++++++++++++++++++++++++++
 src/box/xrow_buf.h | 22 +++++++++++++++++
 2 files changed, 81 insertions(+)

diff --git a/src/box/xrow_buf.c b/src/box/xrow_buf.c
index d690ac4b7..c73e3acb2 100644
--- a/src/box/xrow_buf.c
+++ b/src/box/xrow_buf.c
@@ -218,3 +218,62 @@ error:
 	return -1;
 }
 
+int
+xrow_buf_cursor_create(struct xrow_buf *xrow_buf,
+		      struct xrow_buf_cursor *xrow_buf_cursor,
+		      struct vclock *vclock)
+{
+	uint64_t chunk_gen;
+	for (chunk_gen = xrow_buf->first_chunk_gen;
+	     chunk_gen <= xrow_buf->last_chunk_gen;
+	     ++chunk_gen) {
+		struct xrow_buf_chunk *chunk =
+			xrow_buf->chunk + chunk_gen % XROW_BUF_CHUNK_COUNT;
+		int rc = vclock_compare(&chunk->vclock, vclock);
+		if (rc != 0 && rc != -1)
+			break;
+	}
+	if (chunk_gen == xrow_buf->first_chunk_gen)
+		return -1;
+	xrow_buf_cursor->chunk_gen = chunk_gen - 1;
+	xrow_buf_cursor->row_index = 0;
+	return 0;
+}
+
+int
+xrow_buf_cursor_next(struct xrow_buf *xrow_buf,
+		    struct xrow_buf_cursor *xrow_buf_cursor,
+		    struct xrow_header **row,
+		    void **data,
+		    size_t *size)
+{
+	if (xrow_buf->first_chunk_gen > xrow_buf_cursor->chunk_gen) {
+		/* Buffer was discarded. */
+		return -1;
+	}
+
+	struct xrow_buf_chunk *chunk;
+
+next_chunk:
+	chunk = xrow_buf->chunk + xrow_buf_cursor->chunk_gen %
+				  XROW_BUF_CHUNK_COUNT;
+	size_t chunk_row_count = ibuf_used(&chunk->rows) /
+				 sizeof(struct xrow_buf_row_info);
+	if (chunk_row_count == xrow_buf_cursor->row_index) {
+		/* No more rows in the current buffer. */
+		if (xrow_buf->last_chunk_gen == xrow_buf_cursor->chunk_gen)
+			/* No more rows in the xrow buffer. */
+			return 1;
+		xrow_buf_cursor->row_index = 0;
+		++xrow_buf_cursor->chunk_gen;
+		goto next_chunk;
+	}
+	struct xrow_buf_row_info *row_info =
+		(struct xrow_buf_row_info *)chunk->rows.rpos +
+		xrow_buf_cursor->row_index;
+	*row = &row_info->xrow;
+	*data = row_info->data;
+	*size = row_info->size;
+	++xrow_buf_cursor->row_index;
+	return 0;
+}
diff --git a/src/box/xrow_buf.h b/src/box/xrow_buf.h
index 76f9bc6f8..c9baed360 100644
--- a/src/box/xrow_buf.h
+++ b/src/box/xrow_buf.h
@@ -135,4 +135,26 @@ xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
 	       struct xrow_header **end,
 	       struct iovec *iovec);
 
+/* xrow buffer cursor. */
+struct xrow_buf_cursor {
+	/* Current buffer chunk generation. */
+	uint32_t chunk_gen;
+	/* Current row index. */
+	uint32_t row_index;
+};
+
+/* Create a xrow buf cursor from the vclock position. */
+int
+xrow_buf_cursor_create(struct xrow_buf *xrow_buf,
+		       struct xrow_buf_cursor *xrow_buf_cursor,
+		       struct vclock *vclock);
+
+/* Fetch next row and move a cursor forward. */
+int
+xrow_buf_cursor_next(struct xrow_buf *xrow_buf,
+		     struct xrow_buf_cursor *xrow_buf_cursor,
+		     struct xrow_header **row,
+		     void **data,
+		     size_t *size);
+
 #endif /* TARANTOOL_XROW_BUF_H_INCLUDED */
-- 
2.23.0





More information about the Tarantool-patches mailing list