Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol
@ 2019-01-06 13:05 Georgy Kirichenko
  2019-01-06 13:05 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
  2019-01-11 13:30 ` [tarantool-patches] Re: [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
  0 siblings, 2 replies; 5+ messages in thread
From: Georgy Kirichenko @ 2019-01-06 13:05 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

This patchset introduces transactional replication and consist of two
commits:
 * the first one forms transaction boundaries in a xstream
 * the second one forms transactions in applier buffers and then
   applies them with correct begin/commit boundaries.

Note: distributed transaction are not supported so journal forms a
separate transaction for all local triggers effects.

Changes in v2:
 - Fixed local transaction extraction

Georgy Kirichenko (2):
  Journal transaction boundaries
  Transaction support for applier

 src/box/applier.cc         | 202 ++++++++++++++++++++++++++-----------
 src/box/iproto_constants.h |   3 +
 src/box/wal.c              |  36 ++++++-
 src/box/xrow.c             |  38 +++++++
 src/box/xrow.h             |   5 +-
 test/unit/xrow.cc          |   3 +
 test/vinyl/errinj.result   |   8 +-
 test/vinyl/info.result     |  38 +++----
 test/vinyl/layout.result   |  24 ++---
 9 files changed, 263 insertions(+), 94 deletions(-)

-- 
2.20.1

^ permalink raw reply	[flat|nested] 5+ messages in thread

* [tarantool-patches] [PATCH v2 2/2] Transaction support for applier
  2019-01-06 13:05 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
@ 2019-01-06 13:05 ` Georgy Kirichenko
  2019-01-11 13:30 ` [tarantool-patches] Re: [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
  1 sibling, 0 replies; 5+ messages in thread
From: Georgy Kirichenko @ 2019-01-06 13:05 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Applier fetch incoming rows to form a transaction and then apply it.
Implementation assumes that transaction could not mix in a replication
stream. Also distributed transaction are not supported yet.

Closes: #2798
Needed for: #980
---
 src/box/applier.cc | 202 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 145 insertions(+), 57 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 6c0eb45d5..7e208aaa2 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -48,6 +48,7 @@
 #include "error.h"
 #include "session.h"
 #include "cfg.h"
+#include "txn.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -380,6 +381,102 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
+/**
+ * Read one transaction from network.
+ * Transaction rows are placed into row_buf as an array, row's bodies are
+ * placed into obuf because it is not allowed to relocate row's bodies.
+ * Also we could not use applier input buffer because rpos adjusted after xrow
+ * decoding and corresponding space going to reuse.
+ *
+ * Note: current implementation grants that transaction could not be mixed, so
+ * we read each transaction from first xrow until xrow with txn_last = true.
+ */
+static int64_t
+applier_read_tx(struct applier *applier, struct ibuf *row_buf,
+		struct obuf *data_buf)
+{
+	struct xrow_header *row;
+	struct ev_io *coio = &applier->io;
+	struct ibuf *ibuf = &applier->ibuf;
+	int64_t txn_id = 0;
+	uint32_t txn_replica_id = 0;
+
+	do {
+		row = (struct xrow_header *)ibuf_alloc(row_buf,
+						       sizeof(struct xrow_header));
+		if (row == NULL) {
+			diag_set(OutOfMemory, sizeof(struct xrow_header),
+				 "slab", "struct xrow_header");
+			goto error;
+		}
+
+		double timeout = replication_disconnect_timeout();
+		try {
+			/* TODO: we should have a C version of this function. */
+			coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+		} catch (...) {
+			goto error;
+		}
+
+		if (iproto_type_is_error(row->type)) {
+			xrow_decode_error(row);
+			goto error;
+		}
+
+		/* Replication request. */
+		if (row->replica_id == REPLICA_ID_NIL ||
+		    row->replica_id >= VCLOCK_MAX) {
+			/*
+			 * A safety net, this can only occur
+			 * if we're fed a strangely broken xlog.
+			 */
+			diag_set(ClientError, ER_UNKNOWN_REPLICA,
+				 int2str(row->replica_id),
+				 tt_uuid_str(&REPLICASET_UUID));
+			goto error;
+		}
+		if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
+			/*
+			 * First row in a transaction. In order to enforce
+			 * consistency check that first row lsn and replica id
+			 * match with transaction.
+			 */
+			txn_id = row->lsn;
+			txn_replica_id = row->replica_id;
+		}
+		if (txn_id != row->txn_id ||
+			   txn_replica_id != row->txn_replica_id) {
+			/* We are not able to handle interleaving transactions. */
+			diag_set(ClientError, ER_UNSUPPORTED,
+				 "replications",
+				 "interleaving transactions");
+			goto error;
+		}
+
+
+		applier->lag = ev_now(loop()) - row->tm;
+		applier->last_row_time = ev_monotonic_now(loop());
+
+		if (row->body->iov_base != NULL) {
+			void *new_base = obuf_alloc(data_buf, row->body->iov_len);
+			if (new_base == NULL) {
+				diag_set(OutOfMemory, row->body->iov_len,
+					 "slab", "xrow_data");
+				goto error;
+			}
+			memcpy(new_base, row->body->iov_base, row->body->iov_len);
+			row->body->iov_base = new_base;
+		}
+
+	} while (row->txn_last == 0);
+
+	return 0;
+error:
+	ibuf_reset(row_buf);
+	obuf_reset(data_buf);
+	return -1;
+}
+
 /**
  * Execute and process SUBSCRIBE request (follow updates from a master).
  */
@@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier)
 	struct ibuf *ibuf = &applier->ibuf;
 	struct xrow_header row;
 	struct vclock remote_vclock_at_subscribe;
+	struct ibuf row_buf;
+	struct obuf data_buf;
+	ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header));
+	obuf_create(&data_buf, &cord()->slabc, 0x10000);
 
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
 				 &replicaset.vclock);
@@ -475,80 +576,67 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		/*
-		 * Tarantool < 1.7.7 does not send periodic heartbeat
-		 * messages so we can't assume that if we haven't heard
-		 * from the master for quite a while the connection is
-		 * broken - the master might just be idle.
-		 */
-		if (applier->version_id < version_id(1, 7, 7)) {
-			coio_read_xrow(coio, ibuf, &row);
-		} else {
-			double timeout = replication_disconnect_timeout();
-			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
-		}
+		if (applier_read_tx(applier, &row_buf, &data_buf) != 0)
+			diag_raise();
 
-		if (iproto_type_is_error(row.type))
-			xrow_decode_error_xc(&row);  /* error */
-		/* Replication request. */
-		if (row.replica_id == REPLICA_ID_NIL ||
-		    row.replica_id >= VCLOCK_MAX) {
-			/*
-			 * A safety net, this can only occur
-			 * if we're fed a strangely broken xlog.
-			 */
-			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
-				  int2str(row.replica_id),
-				  tt_uuid_str(&REPLICASET_UUID));
-		}
+		struct txn *txn = NULL;
+		struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos;
+		struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1;
 
-		applier->lag = ev_now(loop()) - row.tm;
+		applier->lag = ev_now(loop()) - last_row->tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-		struct replica *replica = replica_by_id(row.replica_id);
+		struct replica *replica = replica_by_id(first_row->txn_replica_id);
 		struct latch *latch = (replica ? &replica->order_latch :
 				       &replicaset.applier.order_latch);
-		/*
-		 * In a full mesh topology, the same set
-		 * of changes may arrive via two
-		 * concurrently running appliers. Thanks
-		 * to vclock_follow() above, the first row
-		 * in the set will be skipped - but the
-		 * remaining may execute out of order,
-		 * when the following xstream_write()
-		 * yields on WAL. Hence we need a latch to
-		 * strictly order all changes which belong
-		 * to the same server id.
-		 */
 		latch_lock(latch);
+		/* First row identifies a transaction. */
+		assert(first_row->lsn == first_row->txn_id);
+		assert(first_row->replica_id == first_row->txn_replica_id);
 		if (vclock_get(&replicaset.applier.vclock,
-			       row.replica_id) < row.lsn) {
+			       first_row->replica_id) < first_row->lsn) {
 			/* Preserve old lsn value. */
 			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
-						     row.replica_id);
-			vclock_follow_xrow(&replicaset.applier.vclock, &row);
-			int res = xstream_write(applier->subscribe_stream, &row);
-			struct error *e = diag_last_error(diag_get());
-			if (res != 0 && e->type == &type_ClientError &&
-			    box_error_code(e) == ER_TUPLE_FOUND &&
-			    replication_skip_conflict) {
-				/**
-				 * Silently skip ER_TUPLE_FOUND error if such
-				 * option is set in config.
-				 */
-				diag_clear(diag_get());
-				row.type = IPROTO_NOP;
-				row.bodycnt = 0;
-				res = xstream_write(applier->subscribe_stream,
-						    &row);
+						     first_row->replica_id);
+
+			struct xrow_header *row = first_row;
+			if (first_row != last_row)
+				txn = txn_begin(false);
+			int res = 0;
+			while (row <= last_row && res == 0) {
+				vclock_follow_xrow(&replicaset.applier.vclock, row);
+				res = xstream_write(applier->subscribe_stream, row);
+				struct error *e;
+				if (res != 0 &&
+				    (e = diag_last_error(diag_get()))->type ==
+				    &type_ClientError &&
+				    box_error_code(e) == ER_TUPLE_FOUND &&
+				    replication_skip_conflict) {
+					/**
+					 * Silently skip ER_TUPLE_FOUND error
+					 * if such option is set in config.
+					 */
+					diag_clear(diag_get());
+					row->type = IPROTO_NOP;
+					row->bodycnt = 0;
+					res = xstream_write(applier->subscribe_stream,
+							    row);
+				}
+				++row;
 			}
+			if (res == 0 && txn != NULL)
+				res = txn_commit(txn);
 			if (res != 0) {
 				/* Rollback lsn to have a chance for a retry. */
 				vclock_set(&replicaset.applier.vclock,
-					   row.replica_id, old_lsn);
+					   first_row->replica_id, old_lsn);
+				obuf_reset(&data_buf);
+				ibuf_reset(&row_buf);
 				latch_unlock(latch);
 				diag_raise();
 			}
 		}
+		obuf_reset(&data_buf);
+		ibuf_reset(&row_buf);
 		latch_unlock(latch);
 		/*
 		 * Stay 'orphan' until appliers catch up with
-- 
2.20.1

^ permalink raw reply	[flat|nested] 5+ messages in thread

* [tarantool-patches] Re: [PATCH v2 0/2] Transaction boundaries in replication protocol
  2019-01-06 13:05 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
  2019-01-06 13:05 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
@ 2019-01-11 13:30 ` Georgy Kirichenko
  1 sibling, 0 replies; 5+ messages in thread
From: Georgy Kirichenko @ 2019-01-11 13:30 UTC (permalink / raw)
  To: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 1236 bytes --]

Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-2798-transaction-boundaries
Issue: https://github.com/tarantool/tarantool/issues/2798

On Sunday, January 6, 2019 4:05:51 PM MSK Georgy Kirichenko wrote:
> This patchset introduces transactional replication and consist of two
> commits:
>  * the first one forms transaction boundaries in a xstream
>  * the second one forms transactions in applier buffers and then
>    applies them with correct begin/commit boundaries.
> 
> Note: distributed transaction are not supported so journal forms a
> separate transaction for all local triggers effects.
> 
> Changes in v2:
>  - Fixed local transaction extraction
> 
> Georgy Kirichenko (2):
>   Journal transaction boundaries
>   Transaction support for applier
> 
>  src/box/applier.cc         | 202 ++++++++++++++++++++++++++-----------
>  src/box/iproto_constants.h |   3 +
>  src/box/wal.c              |  36 ++++++-
>  src/box/xrow.c             |  38 +++++++
>  src/box/xrow.h             |   5 +-
>  test/unit/xrow.cc          |   3 +
>  test/vinyl/errinj.result   |   8 +-
>  test/vinyl/info.result     |  38 +++----
>  test/vinyl/layout.result   |  24 ++---
>  9 files changed, 263 insertions(+), 94 deletions(-)


[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [tarantool-patches] [PATCH v2 2/2] Transaction support for applier
  2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
@ 2019-01-28 13:35   ` Vladimir Davydov
  0 siblings, 0 replies; 5+ messages in thread
From: Vladimir Davydov @ 2019-01-28 13:35 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Jan 22, 2019 at 01:57:37PM +0300, Georgy Kirichenko wrote:
> Applier fetch incoming rows to form a transaction and then apply it.
> Implementation assumes that transaction could not mix in a
> replication stream. Also distributed transaction are not supported yet.
> 
> Closes: #2798
> Needed for: #980
> ---
>  src/box/applier.cc | 207 ++++++++++++++++++++++++++++++++-------------
>  1 file changed, 148 insertions(+), 59 deletions(-)

Without a test, this patch is inadmissible. Vlad mentioned that he has
some tests left from his old implementation. Please salvage those.

> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index adbe88679..0e3832ad8 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -48,6 +48,7 @@
>  #include "error.h"
>  #include "session.h"
>  #include "cfg.h"
> +#include "txn.h"
>  
>  STRS(applier_state, applier_STATE);
>  
> @@ -380,6 +381,102 @@ applier_join(struct applier *applier)
>  	applier_set_state(applier, APPLIER_READY);
>  }
>  
> +/**
> + * Read one transaction from network.
> + * Transaction rows are placed into row_buf as an array, row's bodies are
> + * placed into obuf because it is not allowed to relocate row's bodies.
> + * Also we could not use applier input buffer because rpos adjusted after xrow
> + * decoding and corresponding space going to reuse.
> + *
> + * Note: current implementation grants that transaction could not be mixed, so
> + * we read each transaction from first xrow until xrow with txn_last = true.
> + */
> +static int64_t
> +applier_read_tx(struct applier *applier, struct ibuf *row_buf,
> +		struct obuf *data_buf)
> +{
> +	struct xrow_header *row;
> +	struct ev_io *coio = &applier->io;
> +	struct ibuf *ibuf = &applier->ibuf;
> +	int64_t txn_id = 0;
> +	uint32_t txn_replica_id = 0;
> +
> +	do {
> +		row = (struct xrow_header *)ibuf_alloc(row_buf,
> +						       sizeof(struct xrow_header));
> +		if (row == NULL) {
> +			diag_set(OutOfMemory, sizeof(struct xrow_header),
> +				 "slab", "struct xrow_header");
> +			goto error;
> +		}
> +
> +		double timeout = replication_disconnect_timeout();
> +		try {
> +			/* TODO: we should have a C version of this function. */
> +			coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
> +		} catch (...) {
> +			goto error;
> +		}
> +
> +		if (iproto_type_is_error(row->type)) {
> +			xrow_decode_error(row);
> +			goto error;
> +		}
> +
> +		/* Replication request. */
> +		if (row->replica_id == REPLICA_ID_NIL ||
> +		    row->replica_id >= VCLOCK_MAX) {
> +			/*
> +			 * A safety net, this can only occur
> +			 * if we're fed a strangely broken xlog.
> +			 */
> +			diag_set(ClientError, ER_UNKNOWN_REPLICA,
> +				 int2str(row->replica_id),
> +				 tt_uuid_str(&REPLICASET_UUID));
> +			goto error;
> +		}
> +		if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
> +			/*
> +			 * First row in a transaction. In order to enforce
> +			 * consistency check that first row lsn and replica id
> +			 * match with transaction.
> +			 */
> +			txn_id = row->lsn;
> +			txn_replica_id = row->replica_id;
> +		}
> +		if (txn_id != row->txn_id ||
> +			   txn_replica_id != row->txn_replica_id) {
> +			/* We are not able to handle interleaving transactions. */
> +			diag_set(ClientError, ER_UNSUPPORTED,
> +				 "replications",
> +				 "interleaving transactions");
> +			goto error;
> +		}

Accumulating rows feels like the iproto realm. I don't think that it's a
good idea to implement a dirty ad-hoc solution for this. Instead we
should move applier to iproto IMO. This would probably allow us to reuse
the code for interactive iproto transactions - the two issues look very
similar to me and I think we should use the same protocol and code to
get them both working.

> +
> +
> +		applier->lag = ev_now(loop()) - row->tm;
> +		applier->last_row_time = ev_monotonic_now(loop());
> +
> +		if (row->body->iov_base != NULL) {
> +			void *new_base = obuf_alloc(data_buf, row->body->iov_len);
> +			if (new_base == NULL) {
> +				diag_set(OutOfMemory, row->body->iov_len,
> +					 "slab", "xrow_data");
> +				goto error;
> +			}
> +			memcpy(new_base, row->body->iov_base, row->body->iov_len);
> +			row->body->iov_base = new_base;
> +		}
> +
> +	} while (row->txn_last == 0);
> +
> +	return 0;
> +error:
> +	ibuf_reset(row_buf);
> +	obuf_reset(data_buf);
> +	return -1;
> +}
> +
>  /**
>   * Execute and process SUBSCRIBE request (follow updates from a master).
>   */
> @@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier)
>  	struct ibuf *ibuf = &applier->ibuf;
>  	struct xrow_header row;
>  	struct vclock remote_vclock_at_subscribe;
> +	struct ibuf row_buf;
> +	struct obuf data_buf;
> +	ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header));
> +	obuf_create(&data_buf, &cord()->slabc, 0x10000);
>  
>  	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
>  				 &replicaset.vclock);
> @@ -475,87 +576,75 @@ applier_subscribe(struct applier *applier)
>  			applier_set_state(applier, APPLIER_FOLLOW);
>  		}
>  
> -		/*
> -		 * Tarantool < 1.7.7 does not send periodic heartbeat
> -		 * messages so we can't assume that if we haven't heard
> -		 * from the master for quite a while the connection is
> -		 * broken - the master might just be idle.
> -		 */
> -		if (applier->version_id < version_id(1, 7, 7)) {
> -			coio_read_xrow(coio, ibuf, &row);
> -		} else {
> -			double timeout = replication_disconnect_timeout();
> -			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
> -		}
> +		if (applier_read_tx(applier, &row_buf, &data_buf) != 0)
> +			diag_raise();
>  
> -		if (iproto_type_is_error(row.type))
> -			xrow_decode_error_xc(&row);  /* error */
> -		/* Replication request. */
> -		if (row.replica_id == REPLICA_ID_NIL ||
> -		    row.replica_id >= VCLOCK_MAX) {
> -			/*
> -			 * A safety net, this can only occur
> -			 * if we're fed a strangely broken xlog.
> -			 */
> -			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
> -				  int2str(row.replica_id),
> -				  tt_uuid_str(&REPLICASET_UUID));
> -		}
> +		struct txn *txn = NULL;
> +		struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos;
> +		struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1;
>  
> -		applier->lag = ev_now(loop()) - row.tm;
> +		applier->lag = ev_now(loop()) - last_row->tm;
>  		applier->last_row_time = ev_monotonic_now(loop());
> -		struct replica *replica = replica_by_id(row.replica_id);
> +		struct replica *replica = replica_by_id(first_row->txn_replica_id);
>  		struct latch *latch = (replica ? &replica->order_latch :
>  				       &replicaset.applier.order_latch);
> -		/*
> -		 * In a full mesh topology, the same set
> -		 * of changes may arrive via two
> -		 * concurrently running appliers. Thanks
> -		 * to vclock_follow() above, the first row
> -		 * in the set will be skipped - but the
> -		 * remaining may execute out of order,
> -		 * when the following xstream_write()
> -		 * yields on WAL. Hence we need a latch to
> -		 * strictly order all changes which belong
> -		 * to the same server id.
> -		 */
>  		latch_lock(latch);
> +		/* First row identifies a transaction. */
> +		assert(first_row->lsn == first_row->txn_id);
> +		assert(first_row->replica_id == first_row->txn_replica_id);
>  		if (vclock_get(&replicaset.applier.vclock,
> -			       row.replica_id) < row.lsn) {
> -			if (row.replica_id == instance_id &&
> +			       first_row->replica_id) < first_row->lsn) {
> +			if (first_row->replica_id == instance_id &&
>  			    vclock_get(&replicaset.vclock, instance_id) <
> -			    row.lsn) {
> +			    first_row->lsn) {
>  				/* Local row returned back. */
>  				goto done;
>  			}
>  			/* Preserve old lsn value. */
>  			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
> -						     row.replica_id);
> -			vclock_follow_xrow(&replicaset.applier.vclock, &row);
> -			int res = xstream_write(applier->subscribe_stream, &row);
> -			struct error *e = diag_last_error(diag_get());
> -			if (res != 0 && e->type == &type_ClientError &&
> -			    box_error_code(e) == ER_TUPLE_FOUND &&
> -			    replication_skip_conflict) {
> -				/**
> -				 * Silently skip ER_TUPLE_FOUND error if such
> -				 * option is set in config.
> -				 */
> -				diag_clear(diag_get());
> -				row.type = IPROTO_NOP;
> -				row.bodycnt = 0;
> -				res = xstream_write(applier->subscribe_stream,
> -						    &row);
> +						     first_row->replica_id);
> +
> +			struct xrow_header *row = first_row;
> +			if (first_row != last_row)
> +				txn = txn_begin(false);

So we have xstream_write to hide box internals, but we still use
txn_begin/commit. This looks ugly. We should encapsulate those somehow
as well, I guess.

> +			int res = 0;
> +			while (row <= last_row && res == 0) {
> +				vclock_follow_xrow(&replicaset.applier.vclock, row);
> +				res = xstream_write(applier->subscribe_stream, row);
> +				struct error *e;
> +				if (res != 0 &&
> +				    (e = diag_last_error(diag_get()))->type ==
> +				    &type_ClientError &&
> +				    box_error_code(e) == ER_TUPLE_FOUND &&
> +				    replication_skip_conflict) {
> +					/**
> +					 * Silently skip ER_TUPLE_FOUND error
> +					 * if such option is set in config.
> +					 */
> +					diag_clear(diag_get());
> +					row->type = IPROTO_NOP;
> +					row->bodycnt = 0;
> +					res = xstream_write(applier->subscribe_stream,
> +							    row);
> +				}
> +				++row;
>  			}
> +			if (res == 0 && txn != NULL)
> +				res = txn_commit(txn);
> +
>  			if (res != 0) {
>  				/* Rollback lsn to have a chance for a retry. */
>  				vclock_set(&replicaset.applier.vclock,
> -					   row.replica_id, old_lsn);
> +					   first_row->replica_id, old_lsn);
> +				obuf_reset(&data_buf);
> +				ibuf_reset(&row_buf);
>  				latch_unlock(latch);
>  				diag_raise();
>  			}
>  		}
>  done:
> +		obuf_reset(&data_buf);
> +		ibuf_reset(&row_buf);
>  		latch_unlock(latch);
>  		/*
>  		 * Stay 'orphan' until appliers catch up with

^ permalink raw reply	[flat|nested] 5+ messages in thread

* [tarantool-patches] [PATCH v2 2/2] Transaction support for applier
  2019-01-22 10:57 [tarantool-patches] " Georgy Kirichenko
@ 2019-01-22 10:57 ` Georgy Kirichenko
  2019-01-28 13:35   ` Vladimir Davydov
  0 siblings, 1 reply; 5+ messages in thread
From: Georgy Kirichenko @ 2019-01-22 10:57 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Applier fetch incoming rows to form a transaction and then apply it.
Implementation assumes that transaction could not mix in a
replication stream. Also distributed transaction are not supported yet.

Closes: #2798
Needed for: #980
---
 src/box/applier.cc | 207 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 148 insertions(+), 59 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index adbe88679..0e3832ad8 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -48,6 +48,7 @@
 #include "error.h"
 #include "session.h"
 #include "cfg.h"
+#include "txn.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -380,6 +381,102 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
+/**
+ * Read one transaction from network.
+ * Transaction rows are placed into row_buf as an array, row's bodies are
+ * placed into obuf because it is not allowed to relocate row's bodies.
+ * Also we could not use applier input buffer because rpos adjusted after xrow
+ * decoding and corresponding space going to reuse.
+ *
+ * Note: current implementation grants that transaction could not be mixed, so
+ * we read each transaction from first xrow until xrow with txn_last = true.
+ */
+static int64_t
+applier_read_tx(struct applier *applier, struct ibuf *row_buf,
+		struct obuf *data_buf)
+{
+	struct xrow_header *row;
+	struct ev_io *coio = &applier->io;
+	struct ibuf *ibuf = &applier->ibuf;
+	int64_t txn_id = 0;
+	uint32_t txn_replica_id = 0;
+
+	do {
+		row = (struct xrow_header *)ibuf_alloc(row_buf,
+						       sizeof(struct xrow_header));
+		if (row == NULL) {
+			diag_set(OutOfMemory, sizeof(struct xrow_header),
+				 "slab", "struct xrow_header");
+			goto error;
+		}
+
+		double timeout = replication_disconnect_timeout();
+		try {
+			/* TODO: we should have a C version of this function. */
+			coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+		} catch (...) {
+			goto error;
+		}
+
+		if (iproto_type_is_error(row->type)) {
+			xrow_decode_error(row);
+			goto error;
+		}
+
+		/* Replication request. */
+		if (row->replica_id == REPLICA_ID_NIL ||
+		    row->replica_id >= VCLOCK_MAX) {
+			/*
+			 * A safety net, this can only occur
+			 * if we're fed a strangely broken xlog.
+			 */
+			diag_set(ClientError, ER_UNKNOWN_REPLICA,
+				 int2str(row->replica_id),
+				 tt_uuid_str(&REPLICASET_UUID));
+			goto error;
+		}
+		if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
+			/*
+			 * First row in a transaction. In order to enforce
+			 * consistency check that first row lsn and replica id
+			 * match with transaction.
+			 */
+			txn_id = row->lsn;
+			txn_replica_id = row->replica_id;
+		}
+		if (txn_id != row->txn_id ||
+			   txn_replica_id != row->txn_replica_id) {
+			/* We are not able to handle interleaving transactions. */
+			diag_set(ClientError, ER_UNSUPPORTED,
+				 "replications",
+				 "interleaving transactions");
+			goto error;
+		}
+
+
+		applier->lag = ev_now(loop()) - row->tm;
+		applier->last_row_time = ev_monotonic_now(loop());
+
+		if (row->body->iov_base != NULL) {
+			void *new_base = obuf_alloc(data_buf, row->body->iov_len);
+			if (new_base == NULL) {
+				diag_set(OutOfMemory, row->body->iov_len,
+					 "slab", "xrow_data");
+				goto error;
+			}
+			memcpy(new_base, row->body->iov_base, row->body->iov_len);
+			row->body->iov_base = new_base;
+		}
+
+	} while (row->txn_last == 0);
+
+	return 0;
+error:
+	ibuf_reset(row_buf);
+	obuf_reset(data_buf);
+	return -1;
+}
+
 /**
  * Execute and process SUBSCRIBE request (follow updates from a master).
  */
@@ -396,6 +493,10 @@ applier_subscribe(struct applier *applier)
 	struct ibuf *ibuf = &applier->ibuf;
 	struct xrow_header row;
 	struct vclock remote_vclock_at_subscribe;
+	struct ibuf row_buf;
+	struct obuf data_buf;
+	ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header));
+	obuf_create(&data_buf, &cord()->slabc, 0x10000);
 
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
 				 &replicaset.vclock);
@@ -475,87 +576,75 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		/*
-		 * Tarantool < 1.7.7 does not send periodic heartbeat
-		 * messages so we can't assume that if we haven't heard
-		 * from the master for quite a while the connection is
-		 * broken - the master might just be idle.
-		 */
-		if (applier->version_id < version_id(1, 7, 7)) {
-			coio_read_xrow(coio, ibuf, &row);
-		} else {
-			double timeout = replication_disconnect_timeout();
-			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
-		}
+		if (applier_read_tx(applier, &row_buf, &data_buf) != 0)
+			diag_raise();
 
-		if (iproto_type_is_error(row.type))
-			xrow_decode_error_xc(&row);  /* error */
-		/* Replication request. */
-		if (row.replica_id == REPLICA_ID_NIL ||
-		    row.replica_id >= VCLOCK_MAX) {
-			/*
-			 * A safety net, this can only occur
-			 * if we're fed a strangely broken xlog.
-			 */
-			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
-				  int2str(row.replica_id),
-				  tt_uuid_str(&REPLICASET_UUID));
-		}
+		struct txn *txn = NULL;
+		struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos;
+		struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1;
 
-		applier->lag = ev_now(loop()) - row.tm;
+		applier->lag = ev_now(loop()) - last_row->tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-		struct replica *replica = replica_by_id(row.replica_id);
+		struct replica *replica = replica_by_id(first_row->txn_replica_id);
 		struct latch *latch = (replica ? &replica->order_latch :
 				       &replicaset.applier.order_latch);
-		/*
-		 * In a full mesh topology, the same set
-		 * of changes may arrive via two
-		 * concurrently running appliers. Thanks
-		 * to vclock_follow() above, the first row
-		 * in the set will be skipped - but the
-		 * remaining may execute out of order,
-		 * when the following xstream_write()
-		 * yields on WAL. Hence we need a latch to
-		 * strictly order all changes which belong
-		 * to the same server id.
-		 */
 		latch_lock(latch);
+		/* First row identifies a transaction. */
+		assert(first_row->lsn == first_row->txn_id);
+		assert(first_row->replica_id == first_row->txn_replica_id);
 		if (vclock_get(&replicaset.applier.vclock,
-			       row.replica_id) < row.lsn) {
-			if (row.replica_id == instance_id &&
+			       first_row->replica_id) < first_row->lsn) {
+			if (first_row->replica_id == instance_id &&
 			    vclock_get(&replicaset.vclock, instance_id) <
-			    row.lsn) {
+			    first_row->lsn) {
 				/* Local row returned back. */
 				goto done;
 			}
 			/* Preserve old lsn value. */
 			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
-						     row.replica_id);
-			vclock_follow_xrow(&replicaset.applier.vclock, &row);
-			int res = xstream_write(applier->subscribe_stream, &row);
-			struct error *e = diag_last_error(diag_get());
-			if (res != 0 && e->type == &type_ClientError &&
-			    box_error_code(e) == ER_TUPLE_FOUND &&
-			    replication_skip_conflict) {
-				/**
-				 * Silently skip ER_TUPLE_FOUND error if such
-				 * option is set in config.
-				 */
-				diag_clear(diag_get());
-				row.type = IPROTO_NOP;
-				row.bodycnt = 0;
-				res = xstream_write(applier->subscribe_stream,
-						    &row);
+						     first_row->replica_id);
+
+			struct xrow_header *row = first_row;
+			if (first_row != last_row)
+				txn = txn_begin(false);
+			int res = 0;
+			while (row <= last_row && res == 0) {
+				vclock_follow_xrow(&replicaset.applier.vclock, row);
+				res = xstream_write(applier->subscribe_stream, row);
+				struct error *e;
+				if (res != 0 &&
+				    (e = diag_last_error(diag_get()))->type ==
+				    &type_ClientError &&
+				    box_error_code(e) == ER_TUPLE_FOUND &&
+				    replication_skip_conflict) {
+					/**
+					 * Silently skip ER_TUPLE_FOUND error
+					 * if such option is set in config.
+					 */
+					diag_clear(diag_get());
+					row->type = IPROTO_NOP;
+					row->bodycnt = 0;
+					res = xstream_write(applier->subscribe_stream,
+							    row);
+				}
+				++row;
 			}
+			if (res == 0 && txn != NULL)
+				res = txn_commit(txn);
+
 			if (res != 0) {
 				/* Rollback lsn to have a chance for a retry. */
 				vclock_set(&replicaset.applier.vclock,
-					   row.replica_id, old_lsn);
+					   first_row->replica_id, old_lsn);
+				obuf_reset(&data_buf);
+				ibuf_reset(&row_buf);
 				latch_unlock(latch);
 				diag_raise();
 			}
 		}
 done:
+		obuf_reset(&data_buf);
+		ibuf_reset(&row_buf);
 		latch_unlock(latch);
 		/*
 		 * Stay 'orphan' until appliers catch up with
-- 
2.20.1

^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2019-01-28 13:35 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-01-06 13:05 [tarantool-patches] [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
2019-01-06 13:05 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
2019-01-11 13:30 ` [tarantool-patches] Re: [PATCH v2 0/2] Transaction boundaries in replication protocol Georgy Kirichenko
2019-01-22 10:57 [tarantool-patches] " Georgy Kirichenko
2019-01-22 10:57 ` [tarantool-patches] [PATCH v2 2/2] Transaction support for applier Georgy Kirichenko
2019-01-28 13:35   ` Vladimir Davydov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox