[patches] Re: [commits] [tarantool] 11/11: replication: introduce transactional replication

Georgy Kirichenko georgy at tarantool.org
Tue Mar 20 16:39:04 MSK 2018


Этот патч решает проблему, но это делается ценой изменения формата xlog, 
репликационного потока и алгоритма накатывания. Я думаю, что таким же объёмом 
изменений можно добиться более глобальной задачи - сделать транзакционную 
репликацию, лог с явным управлением транзакциями и сделать наш лог по 
настоящему write ahead.
Под этим я подразумеваю:
- явное указание для каждого записи в xlog транзакции к которой она относится 
и явного указания факта фиксации или отката транзакции. Идентификатор 
транзакции должен быть уникальным только в каждый конкретный момент времени
- для записей можно предусмотреть возможность указать флаг первой и/или 
последней записи в транзакции. Это позволит записывать транзакции из одной 
операции одной строкой в логе
- можно отправлять записи в xlog по факту их записи без ожидания 
подтверждения, потенциально это уменьшит время обработки длинных транзакций
- достигается транзакционность репликации
- для длинных транзакций ускоряется синхронная репликация

On Thursday, October 19, 2017 7:37:36 PM MSK Vladislav Shpilevoy wrote:
> This is an automated email from the git hooks/post-receive script.
> 
> Gerold103 pushed a commit to branch gh-2798-transactional-replication
> in repository tarantool.
> 
> commit eb8de0a305a89688191456aa1ccb30960b9bccb4
> Author: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
> AuthorDate: Thu Oct 19 17:41:47 2017 +0300
> 
>     replication: introduce transactional replication
> 
>     Closes #2798
> ---
>  src/box/applier.cc                    |  89 +++++++++----
>  src/box/applier.h                     |   5 +
>  src/box/box.cc                        |  50 ++++++-
>  src/box/recovery.cc                   |  24 +++-
>  src/box/relay.cc                      |  78 ++++++++++-
>  src/box/xrow.h                        |   1 +
>  src/box/xrow_io.cc                    | 134 ++++++++++++++++++-
>  src/box/xrow_io.h                     |  65 +++++++++
>  test/replication/gc.result            |   4 +-
>  test/replication/gc.test.lua          |   4 +-
>  test/replication/suite.cfg            |   1 +
>  test/replication/transaction.result   | 241
> ++++++++++++++++++++++++++++++++++ test/replication/transaction.test.lua |
> 118 +++++++++++++++++
>  13 files changed, 768 insertions(+), 46 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 7adca70..0c2b7b2 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -361,9 +361,10 @@ applier_subscribe(struct applier *applier)
>  	struct ev_io *coio = &applier->io;
>  	struct iobuf *iobuf = applier->iobuf;
>  	struct xrow_header row;
> +	struct vclock *vclock = &replicaset_vclock;
> 
>  	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
> -				 &replicaset_vclock);
> +				 vclock);
>  	coio_write_xrow(coio, &row);
>  	applier_set_state(applier, APPLIER_FOLLOW);
> 
> @@ -396,40 +397,69 @@ applier_subscribe(struct applier *applier)
> 
>  	/* Re-enable warnings after successful execution of SUBSCRIBE */
>  	applier->last_logged_errcode = 0;
> +	struct xstream *stream = &applier->subscribe_stream;
> +	struct xrow_batch *batch = &applier->batch;
> 
>  	/*
>  	 * Process a stream of rows from the binary log.
>  	 */
>  	while (true) {
> -		coio_read_xrow(coio, &iobuf->in, &row);
> -		applier->lag = ev_now(loop()) - row.tm;
> -		applier->last_row_time = ev_monotonic_now(loop());
> -
> -		if (iproto_type_is_error(row.type))
> -			xrow_decode_error_xc(&row);  /* error */
> -		/* Replication request. */
> -		if (row.replica_id == REPLICA_ID_NIL ||
> -		    row.replica_id >= VCLOCK_MAX) {
> -			/*
> -			 * A safety net, this can only occur
> -			 * if we're fed a strangely broken xlog.
> -			 */
> -			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
> -				  int2str(row.replica_id),
> -				  tt_uuid_str(&REPLICASET_UUID));
> -		}
> -		if (vclock_get(&replicaset_vclock, row.replica_id) < row.lsn) {
> -			/**
> -			 * Promote the replica set vclock before
> -			 * applying the row. If there is an
> -			 * exception (conflict) applying the row,
> -			 * the row is skipped when the replication
> -			 * is resumed.
> -			 */
> -			vclock_follow(&replicaset_vclock, row.replica_id,
> -				      row.lsn);
> -			xstream_write_xc(&applier->subscribe_stream, &row);
> +		xrow_batch_reset(batch);
> +		/*
> +		 * Read entire batch at once. Can not read rows
> +		 * between begin() and commit(), because it causes
> +		 * yield and rollback for a memtx transaction.
> +		 */
> +		coio_read_xrow_batch(coio, &iobuf->in, batch);
> +		applier->row_count_to_commit = batch->rows[0].row_count;
> +		xstream_begin_xc(stream);
> +		/*
> +		 * Can not use fiber rollback on stop, becase in
> +		 * a case of some errors the fiber is not stopped.
> +		 */
> +		auto tx_guard = make_scoped_guard([=] {
> +			/* Abort a transaction in any error. */
> +			xstream_rollback_xc(stream);
> +		});
> +		for (int i = 0; i < batch->count; ++i) {
> +			struct xrow_header *row = &batch->rows[i];
> +			uint32_t rep_id = row->replica_id;
> +			applier->lag = ev_now(loop()) - row->tm;
> +			applier->last_row_time = ev_monotonic_now(loop());
> +
> +			if (iproto_type_is_error(row->type))
> +				xrow_decode_error_xc(row);
> +			/* Replication request. */
> +			if (rep_id == REPLICA_ID_NIL || rep_id >= VCLOCK_MAX) {
> +				/*
> +				 * A safety net, this can only
> +				 * occur if we're fed a strangely
> +				 * broken xlog.
> +				 */
> +				tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
> +					  int2str(rep_id),
> +					  tt_uuid_str(&REPLICASET_UUID));
> +			}
> +			if (vclock_get(vclock, rep_id) < row->lsn) {
> +				/*
> +				 * Promote the replica set vclock
> +				 * before applying the row. If
> +				 * there is an exception
> +				 * (conflict) applying the row,
> +				 * the row is skipped when the
> +				 * replication is resumed.
> +				 */
> +				vclock_follow(vclock, rep_id, row->lsn);
> +				xstream_write_xc(stream, row);
> +			}
>  		}
> +		xstream_commit_xc(stream);
> +		tx_guard.is_active = false;
> +		/*
> +		 * Do not send the signal before commit - it
> +		 * causes yield and rollback for a memtx
> +		 * transaction.
> +		 */
>  		fiber_cond_signal(&applier->writer_cond);
>  		iobuf_reset(iobuf);
>  		fiber_gc();
> @@ -606,6 +636,7 @@ applier_delete(struct applier *applier)
>  	fiber_channel_destroy(&applier->pause);
>  	trigger_destroy(&applier->on_state);
>  	fiber_cond_destroy(&applier->writer_cond);
> +	xrow_batch_destroy(&applier->batch);
>  	free(applier);
>  }
> 
> diff --git a/src/box/applier.h b/src/box/applier.h
> index 0d40936..f131d50 100644
> --- a/src/box/applier.h
> +++ b/src/box/applier.h
> @@ -43,6 +43,7 @@
>  #include "uri.h"
>  #include "xstream.h"
>  #include "vclock.h"
> +#include "xrow_io.h"
> 
>  /** Network timeout */
>  extern double applier_timeout;
> @@ -113,6 +114,10 @@ struct applier {
>  	struct xstream *join_stream;
>  	/** xstream to process rows during final JOIN and SUBSCRIBE */
>  	struct xstream subscribe_stream;
> +	/** Currently appling batch. */
> +	struct xrow_batch batch;
> +	/** Count of rows to decode to commit. */
> +	int row_count_to_commit;
>  };
> 
>  /**
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 6dace80..9678e4a 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -283,21 +283,62 @@ join_journal_create(struct join_journal *journal)
>  }
> 
>  static inline void
> -apply_row(struct xstream *stream, struct xrow_header *row)
> +applier_begin_tx(struct xstream *stream)
> +{
> +	struct applier *applier =
> +		container_of(stream, struct applier, subscribe_stream);
> +	if (applier->row_count_to_commit > 1 && txn_begin(false) == NULL)
> +		diag_raise();
> +}
> +
> +static inline void
> +applier_commit_tx(struct xstream *stream)
> +{
> +	(void) stream;
> +	struct txn *txn = in_txn();
> +	if (txn != NULL && txn_commit(txn) != 0)
> +		diag_raise();
> +}
> +
> +static inline void
> +applier_rollback_tx(struct xstream *stream)
>  {
> -	assert(row->bodycnt == 1); /* always 1 for read */
>  	(void) stream;
> +	txn_rollback();
> +}
> +
> +static inline void
> +applier_apply_row(struct xstream *stream, struct xrow_header *row)
> +{
> +	struct applier *applier =
> +		container_of(stream, struct applier, subscribe_stream);
> +	assert(row->bodycnt == 1);
> +	struct txn *txn = in_txn();
> +	if (applier->row_count_to_commit == 0) {
> +		if (txn != NULL && txn_commit(txn) != 0)
> +			diag_raise();
> +		applier->row_count_to_commit = row->row_count;
> +		assert(row->row_count >= 1);
> +		if (row->row_count > 1 && txn_begin(false) == NULL)
> +			diag_raise();
> +	}
>  	struct request request;
>  	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
>  	struct space *space = space_cache_find_xc(request.space_id);
>  	if (process_rw(&request, space, NULL) != 0)
>  		diag_raise();
> +	--applier->row_count_to_commit;
>  }
> 
>  static void
>  apply_wal_row(struct xstream *stream, struct xrow_header *row)
>  {
> -	apply_row(stream, row);
> +	assert(row->bodycnt == 1);
> +	struct request request;
> +	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> +	struct space *space = space_cache_find_xc(request.space_id);
> +	if (process_rw(&request, space, NULL) != 0)
> +		diag_raise();
> 
>  	struct wal_stream *xstream =
>  		container_of(stream, struct wal_stream, base);
> @@ -486,7 +527,8 @@ cfg_get_replication(int *p_count)
>  				"too many replicas");
>  	}
>  	struct xstream subscribe_stream;
> -	xstream_create(&subscribe_stream, apply_row, NULL, NULL, NULL);
> +	xstream_create(&subscribe_stream, applier_apply_row, applier_begin_tx,
> +		       applier_commit_tx, applier_rollback_tx);
> 
>  	for (int i = 0; i < count; i++) {
>  		const char *source = cfg_getarr_elem("replication", i);
> diff --git a/src/box/recovery.cc b/src/box/recovery.cc
> index 4815797..aa711d5 100644
> --- a/src/box/recovery.cc
> +++ b/src/box/recovery.cc
> @@ -189,7 +189,26 @@ recover_xlog(struct recovery *r, struct xstream
> *stream, {
>  	struct xrow_header row;
>  	uint64_t row_count = 0;
> -	while (xlog_cursor_next_xc(&r->cursor, &row) == 0) {
> +	struct xlog_tx_cursor *tx_cursor = &r->cursor.tx_cursor;
> +	xstream_begin_xc(stream);
> +	int rc;
> +	while ((rc = xlog_tx_cursor_next_row(tx_cursor, &row)) >= 0) {
> +		if (rc == 1) {
> +			if (xstream_commit(stream) != 0) {
> +				say_error("can't apply tx: ");
> +				diag_log();
> +				if (!r->wal_dir.force_recovery)
> +					diag_raise();
> +			}
> +			rc = xlog_cursor_next_tx(&r->cursor);
> +			if (rc == -1)
> +				diag_raise();
> +			if (rc == 1)
> +				return;
> +			tx_cursor = &r->cursor.tx_cursor;
> +			xstream_begin_xc(stream);
> +			continue;
> +		}
>  		/*
>  		 * Read the next row from xlog file.
>  		 *
> @@ -216,7 +235,7 @@ recover_xlog(struct recovery *r, struct xstream *stream,
> * in case of forced recovery, when we skip the
>  		 * failed row anyway.
>  		 */
> -		vclock_follow(&r->vclock,  row.replica_id, row.lsn);
> +		vclock_follow(&r->vclock, row.replica_id, row.lsn);
>  		if (xstream_write(stream, &row) == 0) {
>  			++row_count;
>  			if (row_count % 100000 == 0)
> @@ -229,6 +248,7 @@ recover_xlog(struct recovery *r, struct xstream *stream,
> diag_raise();
>  		}
>  	}
> +	diag_raise();
>  }
> 
>  /**
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 49e93f5..b7b35d4 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -109,6 +109,15 @@ struct relay {
>  	struct vclock recv_vclock;
>  	/** Replicatoin slave version. */
>  	uint32_t version_id;
> +	/** Xrow batch to send to applier. */
> +	struct xrow_batch batch;
> +	/**
> +	 * Offset in ibuf to the end of a previous xrow. This
> +	 * position is also begin of a next xrow.
> +	 * After decoding the next xrow, we can determine its
> +	 * encoded size by current_ibuf_rpos - prev_row_end.
> +	 */
> +	off_t prev_row_end;
> 
>  	/** Relay endpoint */
>  	struct cbus_endpoint endpoint;
> @@ -137,17 +146,26 @@ static void
>  relay_send_initial_join_row(struct xstream *stream, struct xrow_header
> *row); static void
>  relay_send_row(struct xstream *stream, struct xrow_header *row);
> +static void
> +relay_add_row(struct xstream *stream, struct xrow_header *row);
> +static void
> +relay_begin_tx(struct xstream *stream);
> +static void
> +relay_send_tx(struct xstream *stream);
> 
>  static inline void
>  relay_create(struct relay *relay, int fd, uint64_t sync,
> -	     xstream_write_f stream_write)
> +	     xstream_write_f stream_write, xstream_tx_f stream_begin,
> +	     xstream_tx_f stream_commit)
>  {
>  	memset(relay, 0, sizeof(*relay));
> -	xstream_create(&relay->stream, stream_write, NULL, NULL, NULL);
> +	xstream_create(&relay->stream, stream_write, stream_begin,
> +		       stream_commit, NULL);
>  	coio_create(&relay->io, fd);
>  	relay->sync = sync;
>  	fiber_cond_create(&relay->reader_cond);
>  	diag_create(&relay->diag);
> +	xrow_batch_create(&relay->batch);
>  }
> 
>  static inline void
> @@ -157,6 +175,7 @@ relay_destroy(struct relay *relay)
>  		recovery_delete(relay->r);
>  	fiber_cond_destroy(&relay->reader_cond);
>  	diag_destroy(&relay->diag);
> +	xrow_batch_destroy(&relay->batch);
>  }
> 
>  static inline void
> @@ -178,7 +197,7 @@ void
>  relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
>  {
>  	struct relay relay;
> -	relay_create(&relay, fd, sync, relay_send_initial_join_row);
> +	relay_create(&relay, fd, sync, relay_send_initial_join_row, NULL, NULL);
>  	auto relay_guard = make_scoped_guard([&] { relay_destroy(&relay); });
>  	assert(relay.stream.write != NULL);
>  	engine_join_xc(vclock, &relay.stream);
> @@ -204,7 +223,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock
> *start_vclock, struct vclock *stop_vclock)
>  {
>  	struct relay relay;
> -	relay_create(&relay, fd, sync, relay_send_row);
> +	relay_create(&relay, fd, sync, relay_send_row, NULL, NULL);
>  	auto relay_guard = make_scoped_guard([&] { relay_destroy(&relay); });
>  	relay.r = recovery_new(cfg_gets("wal_dir"),
>  			       cfg_geti("force_recovery"),
> @@ -215,7 +234,6 @@ relay_final_join(int fd, uint64_t sync, struct vclock
> *start_vclock, relay_final_join_f, &relay);
>  	if (rc == 0)
>  		rc = cord_cojoin(&relay.cord);
> -
>  	if (rc != 0)
>  		diag_raise();
> 
> @@ -460,7 +478,8 @@ relay_subscribe(int fd, uint64_t sync, struct replica
> *replica, }
> 
>  	struct relay relay;
> -	relay_create(&relay, fd, sync, relay_send_row);
> +	relay_create(&relay, fd, sync, relay_add_row, relay_begin_tx,
> +		     relay_send_tx);
>  	auto relay_guard = make_scoped_guard([&] { relay_destroy(&relay); });
>  	relay.r = recovery_new(cfg_gets("wal_dir"),
>  			       cfg_geti("force_recovery"),
> @@ -515,3 +534,50 @@ relay_send_row(struct xstream *stream, struct
> xrow_header *packet) relay_send(relay, packet);
>  	}
>  }
> +
> +/** Add a single row to the current xlog transaction. */
> +static void
> +relay_add_row(struct xstream *stream, struct xrow_header *packet)
> +{
> +	struct relay *relay = container_of(stream, struct relay, stream);
> +	assert(iproto_type_is_dml(packet->type));
> +	struct xlog_tx_cursor *cursor = &relay->r->cursor.tx_cursor;
> +	off_t row_begin = relay->prev_row_end;
> +	off_t row_end = xlog_tx_cursor_pos(cursor);
> +	relay->prev_row_end = row_end;
> +	/*
> +	 * We're feeding a WAL, thus responding to SUBSCRIBE request.
> +	 * In that case, only send a row if it is not from the same replica
> +	 * (i.e. don't send replica's own rows back).
> +	 */
> +	if (relay->replica != NULL && relay->replica->id == packet->replica_id)
> +		return;
> +	/*
> +	 * Copy the original row to avoid its currupting. The
> +	 * original row is stored on a stack in recover_xlog() and
> +	 * is not valid after relay_add_row() finished.
> +	 */
> +	struct xrow_header *copy = xrow_batch_new_row(&relay->batch);
> +	*copy = *packet;
> +	relay->batch.bsize += row_end - row_begin;
> +}
> +
> +/** Clear a batch to read a next transaction. */
> +static void
> +relay_begin_tx(struct xstream *stream)
> +{
> +	struct relay *relay = container_of(stream, struct relay, stream);
> +	xrow_batch_reset(&relay->batch);
> +	relay->prev_row_end = 0;
> +}
> +
> +/** Send accumulated rows as a monolite iproto packet. */
> +static void
> +relay_send_tx(struct xstream *stream)
> +{
> +	struct relay *relay = container_of(stream, struct relay, stream);
> +	if(relay->batch.count == 0)
> +		return;
> +	xrow_batch_set_sync(&relay->batch, relay->sync);
> +	coio_write_xrow_batch(&relay->io, &relay->batch);
> +}
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index b5411df..7d2c716 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -48,6 +48,7 @@ enum {
>  	XROW_HEADER_LEN_MAX = 64,
>  	XROW_BODY_LEN_MAX = 128,
>  	IPROTO_HEADER_LEN = 28,
> +	XROW_BATCH_SIZE = IOV_MAX,
>  };
> 
>  struct xrow_header {
> diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
> index 8e0431e..397d4cf 100644
> --- a/src/box/xrow_io.cc
> +++ b/src/box/xrow_io.cc
> @@ -35,6 +35,102 @@
>  #include "error.h"
>  #include "msgpuck/msgpuck.h"
>  #include "errinj.h"
> +#include "iproto_constants.h"
> +#include "exception.h"
> +
> +void
> +xrow_batch_create(struct xrow_batch *batch)
> +{
> +	memset(batch, 0, sizeof(*batch));
> +}
> +
> +void
> +xrow_batch_reset(struct xrow_batch *batch)
> +{
> +	batch->count = 0;
> +	batch->bsize = 0;
> +}
> +
> +void
> +xrow_batch_set_sync(struct xrow_batch *batch, uint64_t sync)
> +{
> +	uint32_t new_sync_size = mp_sizeof_uint(sync);
> +	uint32_t iproto_sync_size = mp_sizeof_uint(IPROTO_SYNC);
> +	for (int i = 0; i <  batch->count; ++i) {
> +		uint64_t row_sync = batch->rows[i].sync;
> +		if (row_sync == 0) {
> +			if (sync != 0) {
> +				batch->bsize += iproto_sync_size;
> +				batch->bsize += new_sync_size;
> +			}
> +		} else {
> +			if (sync == 0) {
> +				batch->bsize -= iproto_sync_size;
> +				batch->bsize -= mp_sizeof_uint(row_sync);
> +			} else {
> +				batch->bsize += new_sync_size;
> +				batch->bsize -= mp_sizeof_uint(row_sync);
> +			}
> +		}
> +		batch->rows[i].sync = sync;
> +	}
> +}
> +
> +struct xrow_header *
> +xrow_batch_new_row(struct xrow_batch *batch)
> +{
> +	if (batch->count + 1 > batch->capacity) {
> +		size_t new_capacity = (batch->capacity + 1) * 2;
> +		size_t size = sizeof(batch->rows[0]) * new_capacity;
> +		struct xrow_header *rows =
> +			(struct xrow_header *)realloc(batch->rows, size);
> +		if (rows == NULL)
> +			tnt_raise(OutOfMemory, size, "realloc", "rows");
> +		batch->rows = rows;
> +		batch->capacity = new_capacity;
> +	}
> +	return &batch->rows[batch->count++];
> +}
> +
> +void
> +xrow_batch_destroy(struct xrow_batch *batch)
> +{
> +	free(batch->rows);
> +}
> +
> +void
> +coio_read_xrow_batch(struct ev_io *coio, struct ibuf *in,
> +		     struct xrow_batch *batch)
> +{
> +	/* Read fixed header */
> +	if (ibuf_used(in) < 1)
> +		coio_breadn(coio, in, 1);
> +
> +	/* Read length */
> +	if (mp_typeof(*in->rpos) != MP_UINT) {
> +		tnt_raise(ClientError, ER_INVALID_MSGPACK,
> +			  "packet length");
> +	}
> +	ssize_t to_read = mp_check_uint(in->rpos, in->wpos);
> +	if (to_read > 0)
> +		coio_breadn(coio, in, to_read);
> +
> +	uint32_t bsize = mp_decode_uint((const char **) &in->rpos);
> +	/* Read header and body */
> +	to_read = bsize - ibuf_used(in);
> +	if (to_read > 0)
> +		coio_breadn(coio, in, to_read);
> +	assert(bsize > 0);
> +	char *rpos = in->rpos;
> +	const char *end = rpos + bsize;
> +	do {
> +		struct xrow_header *row = xrow_batch_new_row(batch);
> +		const char *row_begin = rpos;
> +		xrow_header_decode_xc(row, (const char **) &rpos, end);
> +		batch->bsize += rpos - row_begin;
> +	} while (batch->bsize != bsize);
> +	in->rpos = rpos;
> +}
> 
>  void
>  coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header
> *row) @@ -93,7 +189,6 @@ coio_read_xrow_timeout_xc(struct ev_io *coio,
> struct ibuf *in, xrow_header_decode_xc(row, (const char **) &in->rpos,
> in->rpos + len); }
> 
> -
>  void
>  coio_write_xrow(struct ev_io *coio, const struct xrow_header *row)
>  {
> @@ -108,3 +203,40 @@ coio_write_xrow(struct ev_io *coio, const struct
> xrow_header *row) coio_writev(coio, iov, iovcnt, 0);
>  }
> 
> +void
> +coio_write_xrow_batch(struct ev_io *coio, const struct xrow_batch *batch)
> +{
> +	struct iovec iov[XROW_BATCH_SIZE];
> +	char fixheader[XROW_HEADER_LEN_MAX];
> +	/* In a first iov send an iproto packet header. */
> +	iov[0].iov_base = fixheader;
> +	fixheader[0] = 0xce; /* MP_UINT32 */
> +	store_u32(fixheader + 1, mp_bswap_u32(batch->bsize));
> +	iov[0].iov_len = 5;
> +	int iov_count = 1, i = 0;
> +	size_t written = 0;
> +
> +	/* Then send rows sequentially. */
> +	for (; i < batch->count; ++i) {
> +		/*
> +		 * If the batch is too big, send the batch in
> +		 * parts.
> +		 */
> +		if (iov_count + XROW_IOVMAX >= XROW_BATCH_SIZE) {
> +			written += coio_writev(coio, iov, iov_count, 0);
> +			iov_count = 0;
> +		}
> +		iov_count += xrow_header_encode(&batch->rows[i],
> +						batch->rows[i].sync,
> +						&iov[iov_count], 0);
> +	}
> +	if (iov_count > 0) {
> +		ERROR_INJECT(ERRINJ_COIO_PARTIAL_WRITE_ROW, {
> +			iov[0].iov_len /= 2;
> +			coio_writev(coio, iov, 1, 0);
> +			tnt_raise(SocketError, coio->fd, "errinj partial write");
> +		});
> +		written += coio_writev(coio, iov, iov_count, 0);
> +	}
> +	assert(written == batch->bsize + 5);
> +}
> diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h
> index 0eb7a8a..72e41d5 100644
> --- a/src/box/xrow_io.h
> +++ b/src/box/xrow_io.h
> @@ -34,10 +34,67 @@
>  extern "C" {
>  #endif
> 
> +#include "trivia/util.h"
> +
>  struct ev_io;
>  struct ibuf;
>  struct xrow_header;
> 
> +/** A batch of xrows. */
> +struct xrow_batch {
> +	/** Rows array. */
> +	struct xrow_header *rows;
> +	/** Count of rows, stored in @a rows array. */
> +	int count;
> +	/**
> +	 * Maximal count of rows, which can be stored in @a rows
> +	 * array.
> +	 */
> +	int capacity;
> +	/**
> +	 * Binary size of encoded!!! rows, stored in @a rows
> +	 * array.
> +	 */
> +	size_t bsize;
> +};
> +
> +/** Create a batch. Start capacity is 0. */
> +void
> +xrow_batch_create(struct xrow_batch *batch);
> +
> +/**
> + * Reset count of rows to 0 to reuse allocated xrow_headers.
> + * Do not free resources.
> + */
> +void
> +xrow_batch_reset(struct xrow_batch *batch);
> +
> +/** Set @a sync for all rows in batch. */
> +void
> +xrow_batch_set_sync(struct xrow_batch *batch, uint64_t sync);
> +
> +/**
> + * Create a new row or reuse an existing one from @a batch.
> + * @param batch Batch to store row.
> + * @retval Allocated row.
> + */
> +struct xrow_header *
> +xrow_batch_new_row(struct xrow_batch *batch);
> +
> +/** Free batch resources. */
> +void
> +xrow_batch_destroy(struct xrow_batch *batch);
> +
> +/**
> + * Read an iproto packet of rows as @a batch.
> + * @param coio Input stream.
> + * @param in Input buffer.
> + * @param[out] batch Batch to store rows.
> + */
> +void
> +coio_read_xrow_batch(struct ev_io *coio, struct ibuf *in,
> +		     struct xrow_batch *batch);
> +
>  void
>  coio_read_xrow(struct ev_io *coio, struct ibuf *in, struct xrow_header
> *row);
> 
> @@ -48,6 +105,14 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct
> ibuf *in, void
>  coio_write_xrow(struct ev_io *coio, const struct xrow_header *row);
> 
> +/**
> + * Write @a batch of rows. If a batch is too big, it can be
> + * sent in several parts, but still in a single iproto packet.
> + * @param coio Output stream.
> + * @param batch Batch to send.
> + */
> +void
> +coio_write_xrow_batch(struct ev_io *coio, const struct xrow_batch *batch);
> 
>  #if defined(__cplusplus)
>  } /* extern "C" */
> diff --git a/test/replication/gc.result b/test/replication/gc.result
> index d589a65..1f0a2e3 100644
> --- a/test/replication/gc.result
> +++ b/test/replication/gc.result
> @@ -159,7 +159,7 @@ test_run:cmd("switch default")
>  fiber.sleep(0.1) -- wait for relay to notify tx
>  ---
>  ...
> -#box.internal.gc.info().checkpoints == 1 or box.internal.gc.info()
> +#box.internal.gc.info().checkpoints == 2 or box.internal.gc.info()
>  ---
>  - true
>  ...
> @@ -181,7 +181,7 @@ box.snapshot()
>  ---
>  - ok
>  ...
> -#box.internal.gc.info().checkpoints == 2 or box.internal.gc.info()
> +#box.internal.gc.info().checkpoints == 3 or box.internal.gc.info()
>  ---
>  - true
>  ...
> diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua
> index 2a1fbb7..fb32cec 100644
> --- a/test/replication/gc.test.lua
> +++ b/test/replication/gc.test.lua
> @@ -84,7 +84,7 @@ test_run:cmd("switch default")
>  -- Now garbage collection should resume and delete files left
>  -- from the old checkpoint.
>  fiber.sleep(0.1) -- wait for relay to notify tx
> -#box.internal.gc.info().checkpoints == 1 or box.internal.gc.info()
> +#box.internal.gc.info().checkpoints == 2 or box.internal.gc.info()
> 
>  -- Stop the replica.
>  test_run:cmd("stop server replica")
> @@ -94,7 +94,7 @@ test_run:cmd("cleanup server replica")
>  -- the checkpoint last used by the replica.
>  _ = s:auto_increment{}
>  box.snapshot()
> -#box.internal.gc.info().checkpoints == 2 or box.internal.gc.info()
> +#box.internal.gc.info().checkpoints == 3 or box.internal.gc.info()
> 
>  -- The checkpoint should only be deleted after the replica
>  -- is unregistered.
> diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
> index 6e6bb37..6df964d 100644
> --- a/test/replication/suite.cfg
> +++ b/test/replication/suite.cfg
> @@ -4,6 +4,7 @@
>      "status.test.lua": {},
>      "wal_off.test.lua": {},
>      "hot_standby.test.lua": {},
> +    "transaction.test.lua": {},
>      "*": {
>          "memtx": {"engine": "memtx"},
>          "vinyl": {"engine": "vinyl"}
> diff --git a/test/replication/transaction.result
> b/test/replication/transaction.result new file mode 100644
> index 0000000..a680f9e
> --- /dev/null
> +++ b/test/replication/transaction.result
> @@ -0,0 +1,241 @@
> +env = require('test_run')
> +---
> +...
> +test_run = env.new()
> +---
> +...
> +box.schema.user.grant('guest', 'replication')
> +---
> +...
> +--
> +-- gh-2798: transactional replication. Ensure box transactions are
> +-- atomic on replica.
> +--
> +s = box.schema.space.create('test', {engine = 'memtx'})
> +---
> +...
> +pk = s:create_index('pk')
> +---
> +...
> +s2 = box.schema.space.create('test2', {engine = 'vinyl'})
> +---
> +...
> +pk2 = s2:create_index('pk')
> +---
> +...
> +fiber = require('fiber')
> +---
> +...
> +-- Test multiengine transaction. It is batched into a single xlog
> +-- transaction but on a replica must not be applied as a single
> +-- box transaction.
> +test_run:cmd("setopt delimiter ';'")
> +---
> +- true
> +...
> +function multiengine()
> +	fiber.create(function() s:replace{1, 2, 3} end)
> +	s2:replace{4, 5, 6}
> +end;
> +---
> +...
> +function multiddl()
> +	fiber.create(function()
> +		box.schema.space.create('test3')
> +		box.space.test3:create_index('pk')
> +	end)
> +	box.schema.space.create('test4')
> +end;
> +---
> +...
> +function mixed()
> +	fiber.create(function() box.space.test3:replace{1, 2, 3} end)
> +	box.schema.space.create('test5')
> +end;
> +---
> +...
> +function duplicate_key()
> +	box.begin()
> +	box.space.test3:insert{3,4,5}
> +	box.space.test3:insert{4,5,6}
> +	box.space.test3:insert{2,3,4}
> +	box.commit()
> +end;
> +---
> +...
> +test_run:cmd("setopt delimiter ''");
> +---
> +- true
> +...
> +test_run:cmd("create server replica with rpl_master=default,
> script='replication/replica.lua'") +---
> +- true
> +...
> +test_run:cmd("start server replica")
> +---
> +- true
> +...
> +test_run:cmd("switch replica")
> +---
> +- true
> +...
> +fiber = require('fiber')
> +---
> +...
> +while box.space.test == nil or box.space.test2 == nil do fiber.sleep(0.1)
> end +---
> +...
> +test_run:cmd("switch default")
> +---
> +- true
> +...
> +multiengine()
> +---
> +...
> +multiddl()
> +---
> +...
> +mixed()
> +---
> +...
> +test_run:cmd("switch replica")
> +---
> +- true
> +...
> +log = require('log')
> +---
> +...
> +-- Wait for multiengine batch.
> +max_sleep = 5
> +---
> +...
> +test_run:cmd("setopt delimiter ';'")
> +---
> +- true
> +...
> +while box.space.test:count() ~= 1 and box.space.test2:count() ~= 1 and
> max_sleep ~= 0 do +	fiber.sleep(0.1)
> +	max_sleep = max_sleep - 0.1
> +end;
> +---
> +...
> +test_run:cmd("setopt delimiter ''");
> +---
> +- true
> +...
> +if max_sleep == 0 then log.error('Error with multiengine') assert(false)
> end +---
> +...
> +-- Wait for multiddl batch.
> +max_sleep = 5
> +---
> +...
> +test_run:cmd("setopt delimiter ';'")
> +---
> +- true
> +...
> +while box.space.test3 == nil or box.space.test4 == nil and max_sleep ~= 0
> do +	fiber.sleep(0.1)
> +	max_sleep = max_sleep - 0.1
> +end;
> +---
> +...
> +test_run:cmd("setopt delimiter ''");
> +---
> +- true
> +...
> +if max_sleep == 0 then log.error('Error with multiddl') assert(false) end
> +---
> +...
> +-- Wait for mixed: ddl + not ddl.
> +max_sleep = 5
> +---
> +...
> +test_run:cmd("setopt delimiter ';'")
> +---
> +- true
> +...
> +while box.space.test5 == nil or box.space.test3:count() ~= 1 and max_sleep
> ~= 0 do +	fiber.sleep(0.1)
> +	max_sleep = max_sleep - 0.1
> +end;
> +---
> +...
> +test_run:cmd("setopt delimiter ''");
> +---
> +- true
> +...
> +if max_sleep == 0 then log.error('Error with ddl + not ddl') assert(false)
> end +---
> +...
> +--
> +-- Test rollback on duplicate key. Key must not be duplicate on
> +-- master, but be duplicate on replica. In such a case a whole
> +-- transaction can not be applied.
> +--
> +box.space.test3:insert{2,3,4} -- This key will be sent by master.
> +---
> +- [2, 3, 4]
> +...
> +test_run:cmd("switch default")
> +---
> +- true
> +...
> +duplicate_key()
> +---
> +...
> +test_run:cmd("switch replica")
> +---
> +- true
> +...
> +max_sleep = 5
> +---
> +...
> +test_run:cmd("setopt delimiter ';'")
> +---
> +- true
> +...
> +while box.info.replication[1].upstream.status == 'follow' and max_sleep ~=
> 0 do +	fiber.sleep(0.1)
> +	max_sleep = max_sleep - 0.1
> +end;
> +---
> +...
> +test_run:cmd("setopt delimiter ''");
> +---
> +- true
> +...
> +if max_sleep == 0 then log.error('Error with consistency') assert(false)
> end +---
> +...
> +box.info.replication[1].upstream.status
> +---
> +- stopped
> +...
> +-- Rows from the master are not applied.
> +box.space.test3:select{}
> +---
> +- - [1, 2, 3]
> +  - [2, 3, 4]
> +...
> +test_run:cmd("switch default")
> +---
> +- true
> +...
> +test_run:cmd("stop server replica")
> +---
> +- true
> +...
> +test_run:cmd("cleanup server replica")
> +---
> +- true
> +...
> +box.schema.user.revoke('guest', 'replication')
> +---
> +...
> +s:drop()
> +---
> +...
> +s2:drop()
> +---
> +...
> diff --git a/test/replication/transaction.test.lua
> b/test/replication/transaction.test.lua new file mode 100644
> index 0000000..bfad254
> --- /dev/null
> +++ b/test/replication/transaction.test.lua
> @@ -0,0 +1,118 @@
> +env = require('test_run')
> +test_run = env.new()
> +box.schema.user.grant('guest', 'replication')
> +
> +--
> +-- gh-2798: transactional replication. Ensure box transactions are
> +-- atomic on replica.
> +--
> +
> +s = box.schema.space.create('test', {engine = 'memtx'})
> +pk = s:create_index('pk')
> +s2 = box.schema.space.create('test2', {engine = 'vinyl'})
> +pk2 = s2:create_index('pk')
> +fiber = require('fiber')
> +
> +-- Test multiengine transaction. It is batched into a single xlog
> +-- transaction but on a replica must not be applied as a single
> +-- box transaction.
> +test_run:cmd("setopt delimiter ';'")
> +function multiengine()
> +	fiber.create(function() s:replace{1, 2, 3} end)
> +	s2:replace{4, 5, 6}
> +end;
> +function multiddl()
> +	fiber.create(function()
> +		box.schema.space.create('test3')
> +		box.space.test3:create_index('pk')
> +	end)
> +	box.schema.space.create('test4')
> +end;
> +function mixed()
> +	fiber.create(function() box.space.test3:replace{1, 2, 3} end)
> +	box.schema.space.create('test5')
> +end;
> +function duplicate_key()
> +	box.begin()
> +	box.space.test3:insert{3,4,5}
> +	box.space.test3:insert{4,5,6}
> +	box.space.test3:insert{2,3,4}
> +	box.commit()
> +end;
> +test_run:cmd("setopt delimiter ''");
> +
> +test_run:cmd("create server replica with rpl_master=default,
> script='replication/replica.lua'") +test_run:cmd("start server replica")
> +test_run:cmd("switch replica")
> +
> +fiber = require('fiber')
> +while box.space.test == nil or box.space.test2 == nil do fiber.sleep(0.1)
> end +test_run:cmd("switch default")
> +
> +multiengine()
> +multiddl()
> +mixed()
> +test_run:cmd("switch replica")
> +
> +log = require('log')
> +-- Wait for multiengine batch.
> +max_sleep = 5
> +test_run:cmd("setopt delimiter ';'")
> +while box.space.test:count() ~= 1 and box.space.test2:count() ~= 1 and
> max_sleep ~= 0 do +	fiber.sleep(0.1)
> +	max_sleep = max_sleep - 0.1
> +end;
> +test_run:cmd("setopt delimiter ''");
> +if max_sleep == 0 then log.error('Error with multiengine') assert(false)
> end +
> +-- Wait for multiddl batch.
> +max_sleep = 5
> +test_run:cmd("setopt delimiter ';'")
> +while box.space.test3 == nil or box.space.test4 == nil and max_sleep ~= 0
> do +	fiber.sleep(0.1)
> +	max_sleep = max_sleep - 0.1
> +end;
> +test_run:cmd("setopt delimiter ''");
> +if max_sleep == 0 then log.error('Error with multiddl') assert(false) end
> +
> +-- Wait for mixed: ddl + not ddl.
> +max_sleep = 5
> +test_run:cmd("setopt delimiter ';'")
> +while box.space.test5 == nil or box.space.test3:count() ~= 1 and max_sleep
> ~= 0 do +	fiber.sleep(0.1)
> +	max_sleep = max_sleep - 0.1
> +end;
> +test_run:cmd("setopt delimiter ''");
> +if max_sleep == 0 then log.error('Error with ddl + not ddl') assert(false)
> end +
> +--
> +-- Test rollback on duplicate key. Key must not be duplicate on
> +-- master, but be duplicate on replica. In such a case a whole
> +-- transaction can not be applied.
> +--
> +box.space.test3:insert{2,3,4} -- This key will be sent by master.
> +
> +test_run:cmd("switch default")
> +duplicate_key()
> +
> +test_run:cmd("switch replica")
> +max_sleep = 5
> +test_run:cmd("setopt delimiter ';'")
> +while box.info.replication[1].upstream.status == 'follow' and max_sleep ~=
> 0 do +	fiber.sleep(0.1)
> +	max_sleep = max_sleep - 0.1
> +end;
> +test_run:cmd("setopt delimiter ''");
> +if max_sleep == 0 then log.error('Error with consistency') assert(false)
> end +
> +box.info.replication[1].upstream.status
> +-- Rows from the master are not applied.
> +box.space.test3:select{}
> +
> +test_run:cmd("switch default")
> +test_run:cmd("stop server replica")
> +test_run:cmd("cleanup server replica")
> +
> +box.schema.user.revoke('guest', 'replication')
> +s:drop()
> +s2:drop()

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: This is a digitally signed message part.
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20180320/335a272b/attachment.sig>


More information about the Tarantool-patches mailing list