[tarantool-patches] [PATCH v3 3/4] wal: xrow buffer cursor
Georgy Kirichenko
georgy at tarantool.org
Wed Oct 9 19:45:45 MSK 2019
This structure enables to find a xrow buffer row less than given vclock
and then fetch row by row from the xrow forwards to the last appended one.
A xrow buffer cursor is essential to allow the from memory replication
because of a relay which required to be able to fetch all logged rows,
stored in a wal memory (implemented as xrow buffer), from given position
and then follow all new changes.
Part of #3794
---
src/box/xrow_buf.c | 97 ++++++++++++++++++++++++++++++++++++++++++++++
src/box/xrow_buf.h | 38 ++++++++++++++++++
2 files changed, 135 insertions(+)
diff --git a/src/box/xrow_buf.c b/src/box/xrow_buf.c
index e4455e01a..a5bf1efed 100644
--- a/src/box/xrow_buf.c
+++ b/src/box/xrow_buf.c
@@ -283,3 +283,100 @@ error:
return -1;
}
+/*
+ * Returns an index of the first row after given vclock
+ * in a chunk.
+ */
+static int
+xrow_buf_chunk_locate_vclock(struct xrow_buf_chunk *chunk,
+ struct vclock *vclock)
+{
+ for (uint32_t row_index = 0; row_index < chunk->row_count;
+ ++row_index) {
+ struct xrow_header *row = &chunk->row_info[row_index].xrow;
+ if (vclock_get(vclock, row->replica_id) < row->lsn)
+ return row_index;
+ }
+ /*
+ * We did not find any row with vclock not less than
+ * given one so return an index just after the last one.
+ */
+ return chunk->row_count;
+}
+
+int
+xrow_buf_cursor_create(struct xrow_buf *xrow_buf,
+ struct xrow_buf_cursor *xrow_buf_cursor,
+ struct vclock *vclock)
+{
+ /* Check if a buffer has requested data. */
+ struct xrow_buf_chunk *chunk =
+ xrow_buf->chunk + xrow_buf->first_chunk_index %
+ XROW_BUF_CHUNK_COUNT;
+ int rc = vclock_compare(&chunk->vclock, vclock);
+ if (rc >= 0 || rc == VCLOCK_ORDER_UNDEFINED) {
+ /* The requested data was discarded. */
+ return -1;
+ }
+ uint32_t index = xrow_buf->first_chunk_index;
+ while (index < xrow_buf->last_chunk_index) {
+ chunk = xrow_buf->chunk + (index + 1) % XROW_BUF_CHUNK_COUNT;
+ int rc = vclock_compare(&chunk->vclock, vclock);
+ if (rc > 0 || rc == VCLOCK_ORDER_UNDEFINED) {
+ /* Next chunk has younger rows than requested vclock. */
+ break;
+ }
+ ++index;
+ }
+ chunk = xrow_buf->chunk + (index) % XROW_BUF_CHUNK_COUNT;
+ xrow_buf_cursor->chunk_index = index;
+ xrow_buf_cursor->row_index = xrow_buf_chunk_locate_vclock(chunk, vclock);
+ return 0;
+}
+
+int
+xrow_buf_cursor_next(struct xrow_buf *xrow_buf,
+ struct xrow_buf_cursor *xrow_buf_cursor,
+ struct xrow_header **row, void **data, size_t *size)
+{
+ if (xrow_buf->first_chunk_index > xrow_buf_cursor->chunk_index) {
+ /* A cursor current chunk was discarded by a buffer. */
+ return -1;
+ }
+
+ struct xrow_buf_chunk *chunk;
+next_chunk:
+ chunk = xrow_buf->chunk + xrow_buf_cursor->chunk_index %
+ XROW_BUF_CHUNK_COUNT;
+ size_t chunk_row_count = chunk->row_count;
+ if (chunk_row_count == xrow_buf_cursor->row_index) {
+ /*
+ * No more rows in a buffer but there are two
+ * possibilities:
+ * 1. A cursor current chunk is the last one and there is
+ * no more rows in the cursor.
+ * 2. There is a chunk after the current one
+ * so we can switch to it.
+ * */
+ if (xrow_buf->last_chunk_index ==
+ xrow_buf_cursor->chunk_index) {
+ /*
+ * The current chunk is the last one -
+ * no more rows in a buffer.
+ */
+ return 1;
+ }
+ /* Switch to the next chunk. */
+ xrow_buf_cursor->row_index = 0;
+ ++xrow_buf_cursor->chunk_index;
+ goto next_chunk;
+ }
+ /* Return row header and data pointers and data size. */
+ struct xrow_buf_row_info *row_info = chunk->row_info +
+ xrow_buf_cursor->row_index;
+ *row = &row_info->xrow;
+ *data = row_info->data;
+ *size = row_info->size;
+ ++xrow_buf_cursor->row_index;
+ return 0;
+}
diff --git a/src/box/xrow_buf.h b/src/box/xrow_buf.h
index 266cc0f76..c2de2b5a5 100644
--- a/src/box/xrow_buf.h
+++ b/src/box/xrow_buf.h
@@ -157,4 +157,42 @@ xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
struct xrow_header **end,
struct iovec **iovec);
+/**
+ * Xrow buffer cursor used to search a position in a buffer
+ * and then fetch rows one by one from the postion toward the
+ * buffer last append row.
+ */
+struct xrow_buf_cursor {
+ /** Current chunk global index. */
+ uint32_t chunk_index;
+ /** Row index in the current chunk. */
+ uint32_t row_index;
+};
+
+/**
+ * Create a xrow buffer cursor and set it's position to
+ * the first row after passed vclock value.
+ *
+ * @retval 0 cursor was created
+ * @retval -1 if a vclock was discarded
+ */
+int
+xrow_buf_cursor_create(struct xrow_buf *xrow_buf,
+ struct xrow_buf_cursor *xrow_buf_cursor,
+ struct vclock *vclock);
+
+/**
+ * Fetch next row from a xrow buffer cursor and return the row
+ * header and encoded data pointers as well as encoded data size
+ * in the corresponding parameters.
+ *
+ * @retval 0 in case of success
+ * @retval 1 if there is no more rows in a buffer
+ * @retval -1 if this cursor postion was discarded by xrow buffer
+ */
+int
+xrow_buf_cursor_next(struct xrow_buf *xrow_buf,
+ struct xrow_buf_cursor *xrow_buf_cursor,
+ struct xrow_header **row, void **data, size_t *size);
+
#endif /* TARANTOOL_XROW_BUF_H_INCLUDED */
--
2.23.0
More information about the Tarantool-patches
mailing list