Tarantool development patches archive
 help / color / mirror / Atom feed
* Re: [Tarantool-patches] [tarantool-patches] [PATCH v3 0/4] From memory wal replication
       [not found] <cover.1570639218.git.georgy@tarantool.org>
@ 2019-10-21 23:06 ` Vladislav Shpilevoy
       [not found] ` <99ef4fdb53bfd4fb1c47f2cf2ebb0f333761506e.1570639218.git.georgy@tarantool.org>
                   ` (3 subsequent siblings)
  4 siblings, 0 replies; 5+ messages in thread
From: Vladislav Shpilevoy @ 2019-10-21 23:06 UTC (permalink / raw)
  To: tarantool-patches, georgy, tarantool-patches, Konstantin Osipov

Hi! Thanks for the patchset!

- I see that Travis is still red on your branch:
https://travis-ci.org/tarantool/tarantool/jobs/595713541.

- Please, rebase your branch on the latest master. It
is very outdated. Probably it also will help to fix Travis.

On 09/10/2019 18:45, Georgy Kirichenko wrote:
> This patchset introduces from-wal-memory-replication and consist of 4
> patches.
> 
> The first patch purges distinct relay facilities to advanced gc consumer
> state. Also it changes gc behaviour - now gc compares consumers using
> lsns corresponding to local instance id instead of vclock signature.
> Howevers this changes gc behavior, its look like we could restore the
> gc behavior after relay would be migrated to the wal thread. The whole
> patch is essential because in relay switched to memory relaying there is
> no more file boundaries and on_close_log trigger invocation now.> 
> The second patch introduces xrow buffer structure to be used by wal
> while logging. Wal encodes rows to a memory buffer (xrow_buffer
> structure) and then writes encoded data to a xlog file.
> 
> The next patch implements xrow_buf_cursor to have an ability to found
> a position in the buffer by vclock and then fetch all rows one-by-one
> towards the last one.
> 
> The last patch makes a relay able to read from wal memory using
> a xrow_buffer cursor. When relay went out of wal memory window it turns
> back to file mode. After last known xlog file is relayed then relay
> tries to return to wal memory mode.
> 
> Changes in v3:
>  - A lot of commentary fixes and renaming according to review
>  - Slight refactoring
>  - On-stack heartbeat message issue fixed
> 
> Changes in v2:
>  - Get rid of gc refactoring
>  - Rename wal_memory to xrow_buffer according to review
>  - Commentaries
>  - Bug fixes
> 
> Branch: http://github.com/tarantool/tarantool/tree/g.kirichenko/gh-3794-memory-replication
> Issue: https://github.com/tarantool/tarantool/issues/3794
> 
> Georgy Kirichenko (4):
>   relay: adjust gc state on relay status update
>   wal: xrow buffer structure
>   wal: xrow buffer cursor
>   replication: use wal memory buffer to fetch rows
> 
>  src/box/CMakeLists.txt                        |   1 +
>  src/box/gc.c                                  |   7 +-
>  src/box/relay.cc                              | 286 +++++--------
>  src/box/wal.c                                 | 205 +++++++++-
>  src/box/wal.h                                 |  60 +++
>  src/box/xlog.c                                |  57 ++-
>  src/box/xlog.h                                |  16 +-
>  src/box/xrow_buf.c                            | 382 ++++++++++++++++++
>  src/box/xrow_buf.h                            | 198 +++++++++
>  src/lib/core/cbus.c                           |   4 +
>  src/lib/core/errinj.h                         |   1 +
>  test/box/errinj.result                        |   2 +
>  test/replication/force_recovery.result        |   8 +
>  test/replication/force_recovery.test.lua      |   2 +
>  test/replication/replica_rejoin.result        |   8 +
>  test/replication/replica_rejoin.test.lua      |   2 +
>  .../show_error_on_disconnect.result           |   8 +
>  .../show_error_on_disconnect.test.lua         |   2 +
>  test/replication/suite.ini                    |   2 +-
>  test/xlog/panic_on_wal_error.result           |  12 +
>  test/xlog/panic_on_wal_error.test.lua         |   3 +
>  test/xlog/suite.ini                           |   2 +-
>  22 files changed, 1045 insertions(+), 223 deletions(-)
>  create mode 100644 src/box/xrow_buf.c
>  create mode 100644 src/box/xrow_buf.h
> 

^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [Tarantool-patches] [tarantool-patches] [PATCH v3 1/4] relay: adjust gc state on relay status update
       [not found] ` <99ef4fdb53bfd4fb1c47f2cf2ebb0f333761506e.1570639218.git.georgy@tarantool.org>
@ 2019-10-21 23:06   ` Vladislav Shpilevoy
  0 siblings, 0 replies; 5+ messages in thread
From: Vladislav Shpilevoy @ 2019-10-21 23:06 UTC (permalink / raw)
  To: tarantool-patches, Georgy Kirichenko, tarantool-patches

Thanks for the patch!

On 09/10/2019 18:45, Georgy Kirichenko wrote:
> Don't use on_close_log trigger to track xlog file boundaries. As we
> intend implement in-memory replication relay could have no more xlog
> file operations and couldn't rely on previous trigger invocations. Now
> the consumer state is advanced together with relay vclock. After
> parallel applier implementation relay wouldn't receive an ACK packet for
> each transaction (because an applier groups them) so it should not be
> too expensive to advance gc on each relay vclock update.
> 
> Note: this changes cluster gc behavior - an instance gc will hold
> only it's locally generated transaction. Also it is only a
> temporary solution until relay processing would be merged with
> a wal writer context when wal will process relay ACK requests
> as well as log writing and redundancy evaluating.

Sorry, I am still missing a reason why do you need to keep only
local changes. This commit does not change GC behaviour - it breaks
the GC.

Also, I asked to write a test showing how non-local changes
are removed even if they are not relayed to other instances yet.
Since you are planning to return the old behaviour, anyway you
would need a test, showing that it is restored.

^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [Tarantool-patches] [tarantool-patches] [PATCH v3 2/4] wal: xrow buffer structure
       [not found] ` <e45d765c21500974a7d6676ea1455a9e617f30c9.1570639218.git.georgy@tarantool.org>
@ 2019-10-21 23:06   ` Vladislav Shpilevoy
  0 siblings, 0 replies; 5+ messages in thread
From: Vladislav Shpilevoy @ 2019-10-21 23:06 UTC (permalink / raw)
  To: tarantool-patches, Georgy Kirichenko, tarantool-patches

Thanks for the patch!

See 20 comments below.

On 09/10/2019 18:45, Georgy Kirichenko wrote:
> Introduce a xrow buffer to store encoded xrows in memory after
> transaction was finished. Wal uses an xrow buffer object in
> order to encode transactions and then writes encoded data
> to a log file so encoded data still lives in memory for
> some time after a transaction is finished and cleared by an engine.
> Xrow buffer consist of not more than XROW_BUF_CHUNK_COUNT rotating
> chunks organized in a ring. Rotation thresholds and
> XROW_BUF_CHUNK_COUNT are empiric values and were hardcoded.
> 
> Part of #3794
> ---
>  src/box/CMakeLists.txt |   1 +
>  src/box/wal.c          |  47 ++++++-
>  src/box/xlog.c         |  57 ++++++---
>  src/box/xlog.h         |  16 ++-
>  src/box/xrow_buf.c     | 285 +++++++++++++++++++++++++++++++++++++++++
>  src/box/xrow_buf.h     | 160 +++++++++++++++++++++++
>  6 files changed, 540 insertions(+), 26 deletions(-)
>  create mode 100644 src/box/xrow_buf.c
>  create mode 100644 src/box/xrow_buf.h
> 
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 9219d6779..63e0c728d 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -156,6 +157,14 @@ struct wal_writer
>  	 * Used for replication relays.
>  	 */
>  	struct rlist watchers;
> +	/**
> +	 * In-memory WAl write buffer used to encode transaction rows and

1. Is 'l' a typo? Probably you meant 'L', a capital letter?

> +	 * write them to an xlog file. An in-memory buffer allows us to
> +	 * preserve xrows after transaction processing was finished.
> +	 * This buffer will be used by replication to fetch xrows from memory
> +	 * without xlog files access.
> +	 */
> +	struct xrow_buf xrow_buf;
>  };
>  
>  struct wal_msg {
> @@ -933,6 +942,7 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>  	int64_t tsn = 0;
>  	/** Assign LSN to all local rows. */
>  	for ( ; row < end; row++) {
> +		(*row)->tm = ev_now(loop());

2. Why do you need it? You said, that your patch is pure
refactoring about keeping xlogs in memory for a while, but
this looks like a functional change.

>  		if ((*row)->replica_id == 0) {
>  			(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
>  				      vclock_get(base, instance_id);
> @@ -1016,25 +1026,52 @@ wal_write_to_disk(struct cmsg *msg)
>  	int rc;
>  	struct journal_entry *entry;
>  	struct stailq_entry *last_committed = NULL;
> +	/* Start a wal memory buffer transaction. */
> +	xrow_buf_tx_begin(&writer->xrow_buf, &writer->vclock);
>  	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
>  		wal_assign_lsn(&vclock_diff, &writer->vclock,
>  			       entry->rows, entry->rows + entry->n_rows);
>  		entry->res = vclock_sum(&vclock_diff) +
>  			     vclock_sum(&writer->vclock);
> -		rc = xlog_write_entry(l, entry);
> -		if (rc < 0)
> +		struct iovec *iov;
> +		int iovcnt = xrow_buf_write(&writer->xrow_buf, entry->rows,
> +					    entry->rows + entry->n_rows, &iov);
> +		if (iovcnt < 0) {
> +			xrow_buf_tx_rollback(&writer->xrow_buf);
> +			goto done;
> +		}
> +		xlog_tx_begin(l);
> +		if (xlog_write_iov(l, iov, iovcnt, entry->n_rows) < 0) {
> +			xlog_tx_rollback(l);
> +			xrow_buf_tx_rollback(&writer->xrow_buf);
> +			goto done;
> +		}
> +		rc = xlog_tx_commit(l);

3. Sorry, Kostja is right (in not understanding how these
transactions are related). These xlog/xbuf simultaneous
begin/commit really confuse. Previously xlog transactions
were encapsulated inside xlog_write_entry. Could you please
do something similar here to hide them?

For example, add a function xlog_write_iov_tx(), which would
internally call xlog_tx_begin()/rollback(). It would really
help.

> +		if (rc < 0) {
> +			/* Failed write. */
> +			xrow_buf_tx_rollback(&writer->xrow_buf);
>  			goto done;
> +		}
>  		if (rc > 0) {
> +			/*
> +			 * Data flushed to disk, start a new memory
> +			 * buffer transaction
> +			 */
>  			writer->checkpoint_wal_size += rc;
>  			last_committed = &entry->fifo;
>  			vclock_merge(&writer->vclock, &vclock_diff);
> +			xrow_buf_tx_commit(&writer->xrow_buf);
> +			xrow_buf_tx_begin(&writer->xrow_buf, &writer->vclock);
>  		}
>  		/* rc == 0: the write is buffered in xlog_tx */
>  	}
> diff --git a/src/box/xlog.c b/src/box/xlog.c
> index 8254cce20..2fcf7f0df 100644
> --- a/src/box/xlog.c
> +++ b/src/box/xlog.c
> @@ -1275,14 +1275,8 @@ xlog_tx_write(struct xlog *log)
>  	return written;
>  }
>  
> -/*
> - * Add a row to a log and possibly flush the log.
> - *
> - * @retval  -1 error, check diag.
> - * @retval >=0 the number of bytes written to buffer.
> - */
> -ssize_t
> -xlog_write_row(struct xlog *log, const struct xrow_header *packet)
> +static int
> +xlog_write_prepare(struct xlog *log)

4. Why do you need this function alone? Looks like an
artifact from the previous version, when it was used in
two places. Now you can inline it back to xlog_write_iov().

>  {
>  	/*
>  	 * Automatically reserve space for a fixheader when adding
> @@ -1296,17 +1290,20 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
>  			return -1;
>  		}
>  	}
> +	return 0;
> +}
>  
> -	struct obuf_svp svp = obuf_create_svp(&log->obuf);
> -	size_t page_offset = obuf_size(&log->obuf);
> -	/** encode row into iovec */
> -	struct iovec iov[XROW_IOVMAX];
> -	/** don't write sync to the disk */
> -	int iovcnt = xrow_header_encode(packet, 0, iov, 0);
> -	if (iovcnt < 0) {
> -		obuf_rollback_to_svp(&log->obuf, &svp);
> +/*

5. Please, use /** in comments out of function bodies.
We use doxygen comment format, it is its syntax.
Here and in all other places, and other commits.

> + * Write an xrow containing iovec to a xlog.
> + */
> +ssize_t
> +xlog_write_iov(struct xlog *log, struct iovec *iov, int iovcnt, int row_count)
> +{
> @@ -1325,16 +1322,34 @@ xlog_write_row(struct xlog *log, const struct xrow_header *packet)
> +/*
> + * Add a row to a log and possibly flush the log.
> + *
> + * @retval  -1 error, check diag.
> + * @retval >=0 the number of bytes written to buffer.
> + */
> +ssize_t
> +xlog_write_row(struct xlog *log, const struct xrow_header *packet)
> +{
> +	/** encode row into an iovec */
> +	struct iovec iov[XROW_IOVMAX];
> +	/** don't write sync to the disk */

6. I understand, that this is copypaste from the
old code, but please, use capital letters to start a
sentence, and use /* comment starter inside function
bodies. Here and in all other places, and other commits.

> +	int iovcnt = xrow_header_encode(packet, 0, iov, 0);
> +	if (iovcnt < 0)
> +		return -1;
> +	assert(iovcnt <= XROW_IOVMAX);
> +	return xlog_write_iov(log, iov, iovcnt, 1);
>  }
>  
>  /**
> diff --git a/src/box/xlog.h b/src/box/xlog.h
> index a48b05fc4..aa3c14d84 100644
> --- a/src/box/xlog.h
> +++ b/src/box/xlog.h
> @@ -493,7 +493,7 @@ ssize_t
>  xlog_fallocate(struct xlog *log, size_t size);
>  
>  /**
> - * Write a row to xlog, 
> + * Write a row into a xlog,

7. Stray change. Please, discard.

>   *
>   * @retval count of writen bytes
>   * @retval -1 for error
> diff --git a/src/box/xrow_buf.c b/src/box/xrow_buf.c
> new file mode 100644
> index 000000000..e4455e01a
> --- /dev/null
> +++ b/src/box/xrow_buf.c
> @@ -0,0 +1,285 @@
> +/*
> + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +
> +#include "xrow_buf.h"
> +#include "fiber.h"
> +#include "errinj.h"
> +
> +/* Xrow buffer chunk options (empirical values). */
> +enum {
> +	/* Chunk row info array capacity increment */
> +	XROW_BUF_CHUNK_CAPACITY_INCREMENT = 16384,
> +	/* Initial size for raw data storage. */
> +	XROW_BUF_CHUNK_INITIAL_DATA_SIZE = 65536,
> +	/* How many rows we will place in one buffer. */
> +	XROW_BUF_CHUNK_ROW_COUNT_THRESHOLD = 8192,
> +	/* How many data we will place in one buffer. */
> +	XROW_BUF_CHUNK_DATA_SIZE_THRESHOLD = 1 << 19,
> +};
> +
> +
> +/*
> + * Save the current xrow buffer chunk state wich consists of two

8. wich -> which.

> + * values index and position where the next row header and raw data
> + * would be placed. This state is used to track the next
> + * transaction starting boundary.
> + */
> +static inline void
> +xrow_buf_save_state(struct xrow_buf *xrow_buf)
> +{
> +	struct xrow_buf_chunk *chunk = xrow_buf->chunk +
> +		xrow_buf->last_chunk_index % XROW_BUF_CHUNK_COUNT;
> +	/* Save the current xrow buffer state. */
> +	xrow_buf->tx_first_row_index = chunk->row_count;
> +	xrow_buf->tx_first_row_svp = obuf_create_svp(&chunk->data);
> +}
> +
> +void
> +xrow_buf_destroy(struct xrow_buf *xrow_buf)
> +{
> +	for (int i = 0; i < XROW_BUF_CHUNK_COUNT; ++i) {
> +		if (xrow_buf->chunk[i].row_info_capacity > 0)
> +			slab_put(&cord()->slabc,
> +				 slab_from_data(xrow_buf->chunk[i].row_info));

9. Please, use {} for code blocks consisting of more than
1 line.

> +		obuf_destroy(&xrow_buf->chunk[i].data);
> +	}
> +}
> +
> +void
> +xrow_buf_tx_begin(struct xrow_buf *xrow_buf, const struct vclock *vclock)
> +{
> +	/*
> +	 * Xrow buffer fits a transaction in one chunk and does not

10. A verb is missing. 'Does not do chunk rotation', or
'Does not rotate a chunk'.

> +	 * chunk rotation while transaction is in progress. So check
> +	 * current chunk thresholds and rotate if required.
> +	 */
> +	struct xrow_buf_chunk *chunk = xrow_buf_rotate(xrow_buf);
> +	/*
> +	 * Check if the current chunk is empty and a vclock for
> +	 * the chunk should be set.
> +	 */
> +	if (chunk->row_count == 0)
> +		vclock_copy(&chunk->vclock, vclock);
> +}
> +
> +int
> +xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
> +	       struct xrow_header **end,
> +	       struct iovec **iovec)
> +{
> +	struct xrow_buf_chunk *chunk = xrow_buf->chunk +
> +		xrow_buf->last_chunk_index % XROW_BUF_CHUNK_COUNT;
> +
> +	/* Save a data buffer svp to restore the buffer in case of an error. */
> +	struct obuf_svp data_svp = obuf_create_svp(&chunk->data);
> +
> +	size_t row_count = chunk->row_count + (end - begin);
> +	/* Allocate space for new row information members if required. */
> +	if (row_count > chunk->row_info_capacity) {
> +		/* Round allocation up to XROW_BUF_CHUNK_CAPACITY_INCREMENT. */
> +		uint32_t capacity = XROW_BUF_CHUNK_CAPACITY_INCREMENT *
> +				    ((row_count +
> +				      XROW_BUF_CHUNK_CAPACITY_INCREMENT - 1) /
> +				     XROW_BUF_CHUNK_CAPACITY_INCREMENT);
> +
> +		struct slab *row_info_slab =
> +			slab_get(&cord()->slabc,
> +				 sizeof(struct xrow_buf_row_info) * capacity);
> +		if (row_info_slab == NULL) {
> +			diag_set(OutOfMemory, capacity *
> +					      sizeof(struct xrow_buf_row_info),
> +				 "region", "row info array");
> +			goto error;
> +		}
> +		if (chunk->row_info_capacity > 0) {
> +			memcpy(slab_data(row_info_slab), chunk->row_info,
> +			       sizeof(struct xrow_buf_row_info) *
> +			       chunk->row_count);
> +			slab_put(&cord()->slabc,
> +				 slab_from_data(chunk->row_info));

11. Oh God, what is that? It is even more intricate and
complex than the previous version with ibuf. Again - why
not to use a normal struct xrow_buf_row_info* array on the
simple heap? Have you done benchmarks showing that this
slab struggling gives any perf increase?

> +		}
> +		chunk->row_info = slab_data(row_info_slab);
> +		chunk->row_info_capacity = capacity;
> +	}
> diff --git a/src/box/xrow_buf.h b/src/box/xrow_buf.h
> new file mode 100644
> index 000000000..266cc0f76
> --- /dev/null
> +++ b/src/box/xrow_buf.h
> @@ -0,0 +1,160 @@
> +#ifndef TARANTOOL_XROW_BUF_H_INCLUDED
> +#define TARANTOOL_XROW_BUF_H_INCLUDED
> +/*
> + * Copyright 2010-2019, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +#include <stdint.h>
> +
> +#include "small/obuf.h"
> +#include "small/rlist.h"

12. Why do you need rlist.h?

> +#include "xrow.h"
> +#include "vclock.h"
> +
> +enum {
> +	/*8

13. 8 -> *.

> +	 * Xrow buffer contains some count of rotating data chunks.
> +	 * Every rotation has an estimated decrease in amount of
> +	 * stored rows at 1/(COUNT OF CHUNKS). However the bigger
> +	 * value makes rotation more frequent, the decrease would be
> +	 * smoother and size of a xrow buffer more stable.
> +	 */
> +	XROW_BUF_CHUNK_COUNT = 16,
> +};
> +
> +/**
> + * Xrow_info structure used to describe a row stored in a xrow
> + * buffer. Xrow info contains an xrow_header structure, pointer
> + * and size of the row_header encoded representation. Xrow header
> + * allows to filter rows by replica_id, lsn or replication group
> + * while encoded representation could be used to write xrow 

14. Leading whitespace in the end of the line above.

> + * without any further encoding.
> + */
> +struct xrow_buf_row_info {
> +	/** Stored row header. */
> +	struct xrow_header xrow;
> +	/** Pointer to row encoded raw data. */
> +	void *data;
> +	/** Row encoded raw data size. */
> +	size_t size;
> +};
> +
> +/**
> + * Xrow buffer data chunk structure is used to store a continuous
> + * sequence of xrow headers written to a xrow buffer. Xrow buffer data
> + * chunk contains a vclock of the last row just before the first row
> + * stored in the chunk, count of rows, its encoded raw data, and array of

15. vclock of the last row just before the first row? Sorry,
I didn't understand. Could you please rephrase?

> + * stored row info. Vclock is used to track stored vclock lower boundary.
> + */
> +struct xrow_buf_chunk {
> +	/** Vclock just before the first row in this chunk. */
> +	struct vclock vclock;
> +	/** Count of stored rows. */
> +	size_t row_count;
> +	/** Stored rows information array. */
> +	struct xrow_buf_row_info *row_info;
> +	/** Capacity of stored rows information array. */
> +	size_t row_info_capacity;
> +	/** Data storage for encoded rows data. */
> +	struct obuf data;
> +};
> +
> +/**
> + * Xrow buffer enables to encode and store some continuous sequence
> + * of xrows (both headers and binary encoded representation).
> + * Storage organized as a range of globally indexed chunks. New rows
> + * are appended to the last one chunk (the youngest one). If the last
> + * chunk is already full then a new chunk will be used. Xrow_buffer
> + * maintains not more than XROW_BUF_CHUNK_COUNT chunks so when the buffer
> + * is already full then a first one chunk should be discarded before a
> + * new one could be used. All chunks are organized in a ring which is
> + * XROW_BUF_CHUNK_COUNT the size so a chunk in-ring index could be
> + * evaluated from the chunk global index with the modulo operation.
> + */
> +struct xrow_buf {
> +	/** Global index of the first used chunk (the oldest one). */
> +	size_t first_chunk_index;
> +	/** Global index of the last used chunk (the youngest one). */
> +	size_t last_chunk_index;
> +	/** Ring -array containing chunks . */

16. -array? Something is missing.

> +	struct xrow_buf_chunk chunk[XROW_BUF_CHUNK_COUNT];
> +	/**
> +	 * A xrow_buf transaction is recorded in one chunk only.
> +	 * When transaction is started current row count and data
> +	 * buffer svp from the current chunk (which is the last one)
> +	 * are remembered in order to be able to restore the chunk
> +	 * state in case of rollback.
> +	 */
> +	struct {

17. Why do you need these two members inside an anon struct?

> +		/** The current transaction first row index. */
> +		uint32_t tx_first_row_index;

18. I would rather call it saved_row_count and saved_obuf. You
never use `tx_first_row_index` as an index of anything. You
always just assign it from row_count, and restore row_count
from it back. In the next commits too. These two attributes
are just a savepoint, and nothing more.

I mean, take obuf_svp for example. It has 3 simple members,
which are named the same as the original members of obuf:
pos, iov_len, used. They are not named 'obuf_first_alloc_pos',
'obuf_first_iov_len', 'obuf_first_alloc_pos_used'.

The simpler the name the easier it is understand what it
stands for. For the savepoint just take the original names
as is. Maybe with a simple prefix like 'saved_'.

> +		/** The current transaction encoded data start svp. */
> +		struct obuf_svp tx_first_row_svp;
> +	};
> +};
> +
> +/** Create a wal memory. */
> +void
> +xrow_buf_create(struct xrow_buf *xrow_buf);
> +
> +/** Destroy wal memory structure. */
> +void
> +xrow_buf_destroy(struct xrow_buf *xrow_buf);
> +
> +/**
> + * Begin a xrow buffer transaction. This function may rotate the
> + * last one data chunk and use the vclock parameter as a new chunk
> + * starting vclock.
> + */
> +void
> +xrow_buf_tx_begin(struct xrow_buf *xrow_buf, const struct vclock *vclock);
> +
> +/** Discard all the data was written after the last transaction. */

19. 'the data written', without 'was'.

> +void
> +xrow_buf_tx_rollback(struct xrow_buf *xrow_buf);
> +
> +/** Commit a xrow buffer transaction. */
> +void
> +xrow_buf_tx_commit(struct xrow_buf *xrow_buf);
> +
> +/**
> + * Append an xrow array to a wal memory. The array is placed into
> + * one xrow buffer data chunk and each row takes a continuous
> + * space in a data buffer. Raw encoded data is placed onto
> + * gc-allocated iovec array.
> + *
> + * @retval count of written iovec members for success
> + * @retval -1 in case of error
> + */
> +int
> +xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
> +	       struct xrow_header **end,
> +	       struct iovec **iovec);

20. `iovec` and `end` arguments fit in one line.

> +
> +#endif /* TARANTOOL_XROW_BUF_H_INCLUDED */
> 

^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [Tarantool-patches] [tarantool-patches] [PATCH v3 3/4] wal: xrow buffer cursor
       [not found] ` <8f3fc4b1a2b9c2d071225e11a863d2facd3d035e.1570639218.git.georgy@tarantool.org>
@ 2019-10-21 23:06   ` Vladislav Shpilevoy
  0 siblings, 0 replies; 5+ messages in thread
From: Vladislav Shpilevoy @ 2019-10-21 23:06 UTC (permalink / raw)
  To: tarantool-patches, Georgy Kirichenko, tarantool-patches

Thanks for the patch!

See 5 comments below.

On 09/10/2019 18:45, Georgy Kirichenko wrote:
> This structure enables to find a xrow buffer row less than given vclock
> and then fetch row by row from the xrow forwards to the last appended one.
> A xrow buffer cursor is essential to allow the from memory replication
> because of a relay which required to be able to fetch all logged rows,
> stored in a wal memory (implemented as xrow buffer), from given position
> and then follow all new changes.
> 
> Part of #3794
> ---
>  src/box/xrow_buf.c | 97 ++++++++++++++++++++++++++++++++++++++++++++++
>  src/box/xrow_buf.h | 38 ++++++++++++++++++
>  2 files changed, 135 insertions(+)
> 
> diff --git a/src/box/xrow_buf.c b/src/box/xrow_buf.c
> index e4455e01a..a5bf1efed 100644
> --- a/src/box/xrow_buf.c
> +++ b/src/box/xrow_buf.c
> @@ -283,3 +283,100 @@ error:
>  	return -1;
>  }
>  
> +/*
> + * Returns an index of the first row after given vclock
> + * in a chunk.
> + */
> +static int
> +xrow_buf_chunk_locate_vclock(struct xrow_buf_chunk *chunk,
> +			     struct vclock *vclock)
> +{
> +	for (uint32_t row_index = 0; row_index < chunk->row_count;
> +	     ++row_index) {
> +		struct xrow_header *row = &chunk->row_info[row_index].xrow;
> +		if (vclock_get(vclock, row->replica_id) < row->lsn)
> +			return row_index;

1. Is it possible to use binary search by lsn here?

> +	}
> +	/*
> +	 * We did not find any row with vclock not less than
> +	 * given one so return an index just after the last one.
> +	 */
> +	return chunk->row_count;
> +}
> +
> +int
> +xrow_buf_cursor_create(struct xrow_buf *xrow_buf,
> +		       struct xrow_buf_cursor *xrow_buf_cursor,
> +		       struct vclock *vclock)
> +{
> +	/* Check if a buffer has requested data. */
> +	struct xrow_buf_chunk *chunk =
> +			xrow_buf->chunk + xrow_buf->first_chunk_index %
> +					  XROW_BUF_CHUNK_COUNT;
> +	int rc = vclock_compare(&chunk->vclock, vclock);
> +	if (rc >= 0 || rc == VCLOCK_ORDER_UNDEFINED) {
> +		/* The requested data was discarded. */
> +		return -1;
> +	}
> +	uint32_t index = xrow_buf->first_chunk_index;
> +	while (index < xrow_buf->last_chunk_index) {
> +		chunk = xrow_buf->chunk + (index + 1) % XROW_BUF_CHUNK_COUNT;
> +		int rc = vclock_compare(&chunk->vclock, vclock);
> +		if (rc > 0 || rc == VCLOCK_ORDER_UNDEFINED) {
> +			/* Next chunk has younger rows than requested vclock. */
> +			break;
> +		}
> +		++index;
> +	}
> +	chunk = xrow_buf->chunk + (index) % XROW_BUF_CHUNK_COUNT;
> +	xrow_buf_cursor->chunk_index = index;
> +	xrow_buf_cursor->row_index = xrow_buf_chunk_locate_vclock(chunk, vclock);
> +	return 0;
> +}
> +
> +int
> +xrow_buf_cursor_next(struct xrow_buf *xrow_buf,
> +		     struct xrow_buf_cursor *xrow_buf_cursor,
> +		     struct xrow_header **row, void **data, size_t *size)
> +{
> +	if (xrow_buf->first_chunk_index > xrow_buf_cursor->chunk_index) {
> +		/* A cursor current chunk was discarded by a buffer. */
> +		return -1;
> +	}
> +
> +	struct xrow_buf_chunk *chunk;
> +next_chunk:
> +	chunk = xrow_buf->chunk + xrow_buf_cursor->chunk_index %
> +				  XROW_BUF_CHUNK_COUNT;
> +	size_t chunk_row_count = chunk->row_count;

2. Why do you need this variable? It is used in one place
in the next line.

> +	if (chunk_row_count == xrow_buf_cursor->row_index) {
> +		/*
> +		 * No more rows in a buffer but there are two
> +		 * possibilities:
> +		 *  1. A cursor current chunk is the last one and there is
> +		 * no more rows in the cursor.
> +		 *  2. There is a chunk after the current one
> +		 * so we can switch to it.
> +		 * */
> +		if (xrow_buf->last_chunk_index ==
> +		    xrow_buf_cursor->chunk_index) {
> +			/*
> +			 * The current chunk is the last one -
> +			 * no more rows in a buffer.
> +			 */
> +			return 1;
> +		}
> +		/* Switch to the next chunk. */
> +		xrow_buf_cursor->row_index = 0;
> +		++xrow_buf_cursor->chunk_index;
> +		goto next_chunk;
> +	}
> +	/* Return row header and data pointers and data size. */
> +	struct xrow_buf_row_info *row_info = chunk->row_info +
> +					     xrow_buf_cursor->row_index;
> +	*row = &row_info->xrow;
> +	*data = row_info->data;
> +	*size = row_info->size;
> +	++xrow_buf_cursor->row_index;
> +	return 0;
> +}
> diff --git a/src/box/xrow_buf.h b/src/box/xrow_buf.h
> index 266cc0f76..c2de2b5a5 100644
> --- a/src/box/xrow_buf.h
> +++ b/src/box/xrow_buf.h
> @@ -157,4 +157,42 @@ xrow_buf_write(struct xrow_buf *xrow_buf, struct xrow_header **begin,
>  	       struct xrow_header **end,
>  	       struct iovec **iovec);
>  
> +/**
> + * Xrow buffer cursor used to search a position in a buffer
> + * and then fetch rows one by one from the postion toward the
> + * buffer last append row.
> + */
> +struct xrow_buf_cursor {
> +	/** Current chunk global index. */
> +	uint32_t chunk_index;
> +	/** Row index in the current chunk. */
> +	uint32_t row_index;
> +};

3. Have you considered storing xrow_buf pointer inside the
cursor? Usually iterators contain a reference at source of
their data. In your case the API allows to create a cursor
for one xrow_buf, but iterate over another.

> +
> +/**
> + * Create a xrow buffer cursor and set it's position to
> + * the first row after passed vclock value.
> + *
> + * @retval 0 cursor was created
> + * @retval -1 if a vclock was discarded
> + */
> +int
> +xrow_buf_cursor_create(struct xrow_buf *xrow_buf,
> +		       struct xrow_buf_cursor *xrow_buf_cursor,
> +		       struct vclock *vclock);

4. Looks like vclock pointer can be declared as const.

> +
> +/**
> + * Fetch next row from a xrow buffer cursor and return the row
> + * header and encoded data pointers as well as encoded data size
> + * in the corresponding parameters.
> + *
> + * @retval 0 in case of success
> + * @retval 1 if there is no more rows in a buffer
> + * @retval -1 if this cursor postion was discarded by xrow buffer

5. 'postion' -> 'position'.

> + */
> +int
> +xrow_buf_cursor_next(struct xrow_buf *xrow_buf,
> +		     struct xrow_buf_cursor *xrow_buf_cursor,
> +		     struct xrow_header **row, void **data, size_t *size);
> +
>  #endif /* TARANTOOL_XROW_BUF_H_INCLUDED */
> 

^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [Tarantool-patches] [tarantool-patches] [PATCH v3 4/4] replication: use wal memory buffer to fetch rows
       [not found] ` <0b5ca0bc9f891c3c5363a0e3275c662f93a813de.1570639218.git.georgy@tarantool.org>
@ 2019-10-21 23:07   ` Vladislav Shpilevoy
  0 siblings, 0 replies; 5+ messages in thread
From: Vladislav Shpilevoy @ 2019-10-21 23:07 UTC (permalink / raw)
  To: tarantool-patches, Georgy Kirichenko, tarantool-patches

Thanks for the patch!

On 09/10/2019 18:45, Georgy Kirichenko wrote:
> Fetch data from wal in-memory buffer. Wal allows to start a fiber
> which creates a xrow buffer cursor with given vclock and then fetches
> row from the xrow buffer one by one and calls given callback for each
> row. Also the wal relaying fiber send a heartbeat message if all
> rows were processed there were no rows written for replication timeout
> period.

Sorry, I didn't understand the last sentence. Maybe if all rows
were processed *and* there were no rows?

See 2 comments below.

> Relay connects to wal with a replica known vclock and tries to
> relay data. In case of outdated vclock (wal could not create a cursor
> or fetch new row from the cursor) the relay makes a fallback in
> order to read logged data from file and then makes another try
> to connect to wal with updated vclock and so waiter.
> In file mode a relay already has a data to send to a replica so from
> not the relay  has not any duty to send heartbeat messages - it

1. 'so from not the relay has not' - I didn't understand it. Could
you please rephrase?

> is done by wal relay fiber while it waits for new transactions
> written by wal.
> 
> Closes #3794
> ---
>  src/box/relay.cc                              | 180 +++++++++---------
>  src/box/wal.c                                 | 158 +++++++++++++++
>  src/box/wal.h                                 |  60 ++++++
>  src/lib/core/cbus.c                           |   4 +
>  src/lib/core/errinj.h                         |   1 +
>  test/box/errinj.result                        |   2 +
>  test/replication/force_recovery.result        |   8 +
>  test/replication/force_recovery.test.lua      |   2 +
>  test/replication/replica_rejoin.result        |   8 +
>  test/replication/replica_rejoin.test.lua      |   2 +
>  .../show_error_on_disconnect.result           |   8 +
>  .../show_error_on_disconnect.test.lua         |   2 +
>  test/replication/suite.ini                    |   2 +-
>  test/xlog/panic_on_wal_error.result           |  12 ++
>  test/xlog/panic_on_wal_error.test.lua         |   3 +
>  test/xlog/suite.ini                           |   2 +-
>  16 files changed, 365 insertions(+), 89 deletions(-)
> 
> diff --git a/test/replication/force_recovery.result b/test/replication/force_recovery.result
> index f50452858..e48c12657 100644
> --- a/test/replication/force_recovery.result
> +++ b/test/replication/force_recovery.result
> @@ -16,6 +16,10 @@ _ = box.space.test:create_index('primary')
>  box.schema.user.grant('guest', 'replication')
>  ---
>  ...
> +box.error.injection.set("ERRINJ_WAL_MEM_IGNORE", true)

2. Unfortunately, you can't set error injections in
Release mode. It means, that all these injection.set are
nop in Release. Please, remove them. If you want to use
an error injection, you need to write a special test file,
and disable it for the Release build.

> +---
> +- ok
> +...
>  -- Deploy a replica.
>  test_run:cmd("create server test with rpl_master=default, script='replication/replica.lua'")
>  ---

^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2019-10-21 23:02 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
     [not found] <cover.1570639218.git.georgy@tarantool.org>
2019-10-21 23:06 ` [Tarantool-patches] [tarantool-patches] [PATCH v3 0/4] From memory wal replication Vladislav Shpilevoy
     [not found] ` <99ef4fdb53bfd4fb1c47f2cf2ebb0f333761506e.1570639218.git.georgy@tarantool.org>
2019-10-21 23:06   ` [Tarantool-patches] [tarantool-patches] [PATCH v3 1/4] relay: adjust gc state on relay status update Vladislav Shpilevoy
     [not found] ` <e45d765c21500974a7d6676ea1455a9e617f30c9.1570639218.git.georgy@tarantool.org>
2019-10-21 23:06   ` [Tarantool-patches] [tarantool-patches] [PATCH v3 2/4] wal: xrow buffer structure Vladislav Shpilevoy
     [not found] ` <8f3fc4b1a2b9c2d071225e11a863d2facd3d035e.1570639218.git.georgy@tarantool.org>
2019-10-21 23:06   ` [Tarantool-patches] [tarantool-patches] [PATCH v3 3/4] wal: xrow buffer cursor Vladislav Shpilevoy
     [not found] ` <0b5ca0bc9f891c3c5363a0e3275c662f93a813de.1570639218.git.georgy@tarantool.org>
2019-10-21 23:07   ` [Tarantool-patches] [tarantool-patches] [PATCH v3 4/4] replication: use wal memory buffer to fetch rows Vladislav Shpilevoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox