[tarantool-patches] [PATCH 1/7] Refactoring: wal writer fiber and queue

Vladimir Davydov vdavydov.dev at gmail.com
Wed Aug 21 13:18:28 MSK 2019


On Tue, Aug 13, 2019 at 09:27:39AM +0300, Georgy Kirichenko wrote:
> As wal processes all writes in a cbus loop fiber it isn't possible to
> yield while write. The patch introduces a wal write queue and a wal write
> fiber which fetch a batch from queue and writes it out. Also checkpoint
> requests are going now throw the queue to synchronize a tx checkpoint
> status with wal.
> 
> This patch enables to put all garbage state into one gc object living in
> tx cord and to asl gc to free space from wal in case of no space
> error.
> ---
>  src/box/wal.c | 187 +++++++++++++++++++++++++++++++++++++++-----------
>  1 file changed, 146 insertions(+), 41 deletions(-)
> 
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 58a58e5b5..5d8dcc4f7 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -92,6 +92,10 @@ struct wal_writer
>  	/** A memory pool for messages. */
>  	struct mempool msg_pool;
>  	/* ----------------- wal ------------------- */
> +	/** A write queue. */
> +	struct stailq write_queue;
> +	/** A write queue condition. */
> +	struct fiber_cond write_cond;
>  	/** A setting from instance configuration - rows_per_wal */
>  	int64_t wal_max_rows;
>  	/** A setting from instance configuration - wal_max_size */
> @@ -158,19 +162,40 @@ struct wal_writer
>  	struct rlist watchers;
>  };
>  
> +enum wal_msg_type {
> +	WAL_MSG_WRITE = 0,
> +	WAL_MSG_CHECKPOINT = 1
> +};
> +
>  struct wal_msg {
>  	struct cmsg base;
> -	/** Approximate size of this request when encoded. */
> -	size_t approx_len;
> -	/** Input queue, on output contains all committed requests. */
> -	struct stailq commit;
> -	/**
> -	 * In case of rollback, contains the requests which must
> -	 * be rolled back.
> -	 */
> -	struct stailq rollback;
> -	/** vclock after the batch processed. */
> -	struct vclock vclock;
> +	/** A link to a wal writer queue. */
> +	struct stailq_entry in_queue;
> +	/** Wal messgae type. */
> +	enum wal_msg_type type;
> +	union {
> +		struct {
> +			/** Approximate size of this request when encoded. */
> +			size_t approx_len;
> +			/** Input queue, on output contains all committed requests. */
> +			struct stailq commit;
> +			/**
> +			 * In case of rollback, contains the requests which must
> +			 * be rolled back.
> +			 */
> +			struct stailq rollback;
> +			/** vclock after the batch processed. */
> +			struct vclock vclock;
> +		};
> +		struct {
> +			/** A checkpoint structure. */
> +			struct wal_checkpoint *checkpoint;
> +			/** Fiber issued the batch. */
> +			struct fiber *fiber;
> +			/** return code. */
> +			int *rc;
> +		};
> +	};
>  };

I'd introduce a base message class rather than using a union + enum.
Would look more readable that way IMHO:

	struct wal_msg {
		struct cmsg base;
		void (*process)(struct wal_msg *);
		...
	};

	struct wal_write_msg {
		struct wal_msg base;
		...
	};

	struct wal_checkpoint_msg {
		struct wal_msg base;
		...
	};

However, I must admit that this nesting of messages does look kinda
ugly. We have cbus message callbacks and now we effectively introduce
wal message callbacks (using a function pointer or a enum, doesn't
matter).

What we want to do here is notify TX of ENOSPC so that it can delete
old WAL files. Until old WAL files are deleted, the WAL must stop
processing any requests. Is there a way to achieve that without
introducing a new fiber? I guess we could instead introduce a separate
queue and append all write messages to it until we receive a message
from the TX thread that it's done removing old WAL files.

> @@ -271,18 +301,22 @@ tx_schedule_commit(struct cmsg *msg)
>  {
>  	struct wal_writer *writer = &wal_writer_singleton;
>  	struct wal_msg *batch = (struct wal_msg *) msg;
> -	/*
> -	 * Move the rollback list to the writer first, since
> -	 * wal_msg memory disappears after the first
> -	 * iteration of tx_schedule_queue loop.
> -	 */
> -	if (! stailq_empty(&batch->rollback)) {
> -		/* Closes the input valve. */
> -		stailq_concat(&writer->rollback, &batch->rollback);
> +	if (batch->type == WAL_MSG_WRITE) {
> +		/*
> +		 * Move the rollback list to the writer first, since
> +		 * wal_msg memory disappears after the first
> +		 * iteration of tx_schedule_queue loop.
> +		 */
> +		if (! stailq_empty(&batch->rollback)) {
> +			/* Closes the input valve. */
> +			stailq_concat(&writer->rollback, &batch->rollback);
> +		}
> +		/* Update the tx vclock to the latest written by wal. */
> +		vclock_copy(&replicaset.vclock, &batch->vclock);
> +		tx_schedule_queue(&batch->commit);
> +	} else {
> +		fiber_wakeup(batch->fiber);

Yeah, this branching dependent on the message type looks convoluted.
While we have just two kinds of messages, it's fine, but should we add
another one, and the code would turn into a mess. Callbacks would look
better IMHO.

>  	}
> -	/* Update the tx vclock to the latest written by wal. */
> -	vclock_copy(&replicaset.vclock, &batch->vclock);
> -	tx_schedule_queue(&batch->commit);
>  	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
>  }
>  
> @@ -922,10 +975,18 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
>  }
>  
>  static void
> -wal_write_to_disk(struct cmsg *msg)
> +wal_write_to_disk(struct cmsg *base)
>  {
>  	struct wal_writer *writer = &wal_writer_singleton;
> -	struct wal_msg *wal_msg = (struct wal_msg *) msg;
> +	struct wal_msg *wal_msg = container_of(base, struct wal_msg, base);
> +	if (stailq_empty(&writer->write_queue))
> +		fiber_cond_signal(&writer->write_cond);
> +	stailq_add_tail(&writer->write_queue, &wal_msg->in_queue);

The name of this function doesn't reflect what it does anymore.

> +}
> +
> +static void
> +wal_write_batch(struct wal_writer *writer, struct wal_msg *wal_msg)
> +{
>  	struct error *error;
>  
>  	/*



More information about the Tarantool-patches mailing list