Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries
       [not found] <cover.1550001848.git.georgy@tarantool.org>
@ 2019-02-12 20:04 ` Georgy Kirichenko
  2019-02-15 13:15   ` Vladimir Davydov
  2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 2/2] Transaction support for applier Georgy Kirichenko
  1 sibling, 1 reply; 5+ messages in thread
From: Georgy Kirichenko @ 2019-02-12 20:04 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Append txn_id and txn_commit to xrow_header structure, txn_id identifies
transaction id on replica where transaction was started. As transaction id
a lsn of the first row in the transaction is used. txn_commit is set to true
if it is the last row in a transaction, so we could commit transaction by the
last row or by additional NOP requests with txn_commit set as well as
start transaction with NOP and corresponding txn_id. In case of  replication
all local changes moved to an journal entry tail to form a separate transaction
(like autonomous transaction) to be able to replicate changes back.

As encoding/deconding rule assumed:
 * txn_id and txn_commit are encoded only for multi-row transactions.
   So if we do not have txn_id while row decoding then this means that it
   is a single row transaction.
 * TXN_ID field is differential encoded as lsn - txn_id value
 * txn_commit packed into TXN_FLAGS field

These rules provide compatibility with previous xlog format as well
as good compaction level.

Needed for: 2798
---
 src/box/iproto_constants.c |  4 ++--
 src/box/iproto_constants.h |  7 +++++++
 src/box/txn.c              | 21 +++++++++++++++++----
 src/box/txn.h              |  2 ++
 src/box/wal.c              | 28 ++++++++++++++++++++++++----
 src/box/xrow.c             | 36 ++++++++++++++++++++++++++++++++++++
 src/box/xrow.h             |  4 +++-
 test/unit/xrow.cc          |  2 ++
 8 files changed, 93 insertions(+), 11 deletions(-)

diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index 7fd295775..4d2e21752 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
 		/* 0x05 */	MP_UINT,   /* IPROTO_SCHEMA_VERSION */
 		/* 0x06 */	MP_UINT,   /* IPROTO_SERVER_VERSION */
 		/* 0x07 */	MP_UINT,   /* IPROTO_GROUP_ID */
+		/* 0x08 */	MP_UINT,   /* IPROTO_TXN_ID */
+		/* 0x09 */	MP_UINT,   /* IPROTO_TXN_FLAGS */
 	/* }}} */
 
 	/* {{{ unused */
-		/* 0x08 */	MP_UINT,
-		/* 0x09 */	MP_UINT,
 		/* 0x0a */	MP_UINT,
 		/* 0x0b */	MP_UINT,
 		/* 0x0c */	MP_UINT,
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 728514297..fd80e3111 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -49,6 +49,11 @@ enum {
 	XLOG_FIXHEADER_SIZE = 19
 };
 
+enum {
+	/** Set for the last xrow in a transaction. */
+	TXN_FLAG_COMMIT = 0x01,
+};
+
 enum iproto_key {
 	IPROTO_REQUEST_TYPE = 0x00,
 	IPROTO_SYNC = 0x01,
@@ -60,6 +65,8 @@ enum iproto_key {
 	IPROTO_SCHEMA_VERSION = 0x05,
 	IPROTO_SERVER_VERSION = 0x06,
 	IPROTO_GROUP_ID = 0x07,
+	IPROTO_TXN_ID = 0x08,
+	IPROTO_TXN_FLAGS = 0x09,
 	/* Leave a gap for other keys in the header. */
 	IPROTO_SPACE_ID = 0x10,
 	IPROTO_INDEX_ID = 0x11,
diff --git a/src/box/txn.c b/src/box/txn.c
index 7f4e85b47..de0152706 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -34,6 +34,7 @@
 #include "journal.h"
 #include <fiber.h>
 #include "xrow.h"
+#include "replication.h"
 
 double too_long_threshold;
 
@@ -150,6 +151,7 @@ txn_begin(bool is_autocommit)
 	txn->engine = NULL;
 	txn->engine_tx = NULL;
 	txn->psql_txn = NULL;
+	txn->remote_row_count = 0;
 	/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
 	fiber_set_txn(fiber(), txn);
 	return txn;
@@ -231,6 +233,10 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 	 * stmt->space can be NULL for IRPOTO_NOP.
 	 */
 	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
+		if (request->header &&
+		    request->header->replica_id != instance_id &&
+		    request->header->replica_id != 0)
+			++txn->remote_row_count;
 		if (txn_add_redo(stmt, request) != 0)
 			goto fail;
 		++txn->n_rows;
@@ -271,14 +277,21 @@ txn_write_to_wal(struct txn *txn)
 		return -1;
 
 	struct txn_stmt *stmt;
-	struct xrow_header **row = req->rows;
+	struct xrow_header **remote_row = req->rows;
+	struct xrow_header **local_row = req->rows + txn->remote_row_count;
 	stailq_foreach_entry(stmt, &txn->stmts, next) {
 		if (stmt->row == NULL)
-			continue; /* A read (e.g. select) request */
-		*row++ = stmt->row;
+			/* A read (e.g. select) request */
+			continue;
+		if (stmt->row->replica_id == instance_id ||
+		    stmt->row->replica_id == 0)
+			*local_row++ = stmt->row;
+		else
+			*remote_row++ = stmt->row;
 		req->approx_len += xrow_approx_len(stmt->row);
 	}
-	assert(row == req->rows + req->n_rows);
+	assert(remote_row == req->rows + txn->remote_row_count);
+	assert(local_row == req->rows + req->n_rows);
 
 	ev_tstamp start = ev_monotonic_now(loop());
 	int64_t res = journal_write(req);
diff --git a/src/box/txn.h b/src/box/txn.h
index de5cb0de4..143f21715 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -180,6 +180,8 @@ struct txn {
 	 /** Commit and rollback triggers */
 	struct rlist on_commit, on_rollback;
 	struct sql_txn *psql_txn;
+	/** Count of remote rows. */
+	uint32_t remote_row_count;
 };
 
 /* Pointer to the current transaction (if any) */
diff --git a/src/box/wal.c b/src/box/wal.c
index cdcaabc00..0ea3be68d 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -896,10 +896,11 @@ wal_writer_begin_rollback(struct wal_writer *writer)
 	cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback);
 }
 
-static void
-wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
+static int
+wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
 	       struct xrow_header **end)
 {
+	struct xrow_header **row = begin;
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
 		if ((*row)->replica_id == 0) {
@@ -909,6 +910,21 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
 			vclock_follow_xrow(vclock, *row);
 		}
 	}
+	while (begin < end && begin[0]->replica_id != instance_id)
+		++begin;
+	/* Setup txn_id and tnx_replica_id for locally generated rows. */
+	row = begin;
+	while (row < end) {
+		if (row[0]->replica_id != instance_id) {
+			diag_set(ClientError, ER_UNSUPPORTED,
+				 "Interleaved transactions");
+			return -1;
+		}
+		row[0]->txn_id = begin[0]->lsn;
+		row[0]->txn_commit = row == end - 1 ? 1 : 0;
+		++row;
+	}
+	return 0;
 }
 
 static void
@@ -979,7 +995,9 @@ wal_write_to_disk(struct cmsg *msg)
 	struct journal_entry *entry;
 	struct stailq_entry *last_committed = NULL;
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
-		wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
+		if (wal_assign_lsn(&vclock, entry->rows,
+				   entry->rows + entry->n_rows) < 0)
+			goto done;
 		entry->res = vclock_sum(&vclock);
 		rc = xlog_write_entry(l, entry);
 		if (rc < 0)
@@ -1173,7 +1191,9 @@ wal_write_in_wal_mode_none(struct journal *journal,
 			   struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
-	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
+	if (wal_assign_lsn(&writer->vclock, entry->rows,
+			   entry->rows + entry->n_rows) != 0)
+		return -1;
 	vclock_copy(&replicaset.vclock, &writer->vclock);
 	return vclock_sum(&writer->vclock);
 }
diff --git a/src/box/xrow.c b/src/box/xrow.c
index fec8873d0..29fa75de4 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -102,6 +102,8 @@ error:
 
 	if (mp_typeof(**pos) != MP_MAP)
 		goto error;
+	bool txn_is_set = false;
+	uint32_t txn_flags = 0;
 
 	uint32_t size = mp_decode_map(pos);
 	for (uint32_t i = 0; i < size; i++) {
@@ -133,12 +135,32 @@ error:
 		case IPROTO_SCHEMA_VERSION:
 			header->schema_version = mp_decode_uint(pos);
 			break;
+		case IPROTO_TXN_ID:
+			txn_is_set = true;
+			header->txn_id = mp_decode_uint(pos);
+			break;
+		case IPROTO_TXN_FLAGS:
+			txn_flags = mp_decode_uint(pos);
+			header->txn_commit = txn_flags & TXN_FLAG_COMMIT;
+			if ((txn_flags & ~TXN_FLAG_COMMIT) != 0)
+				/* Unknow flags. */
+				goto error;
+			break;
 		default:
 			/* unknown header */
 			mp_next(pos);
 		}
 	}
 	assert(*pos <= end);
+	if (!txn_is_set) {
+		/*
+		 * Transaction id is not set so it is a single statement
+		 * transaction.
+		 */
+		header->txn_commit = true;
+	}
+	header->txn_id = header->lsn + header->txn_id;
+
 	/* Nop requests aren't supposed to have a body. */
 	if (*pos < end && header->type != IPROTO_NOP) {
 		const char *body = *pos;
@@ -223,6 +245,20 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
 		d = mp_encode_double(d, header->tm);
 		map_size++;
 	}
+	if (header->txn_id != 0) {
+		if (header->txn_id != header->lsn || header->txn_commit == 0) {
+			/* Encode txn id for multi row transaction members. */
+			d = mp_encode_uint(d, IPROTO_TXN_ID);
+			d = mp_encode_uint(d, header->lsn - header->txn_id);
+			map_size++;
+		}
+		if (header->txn_commit && header->txn_id != header->lsn) {
+			/* Setup last row for multi row transaction. */
+			d = mp_encode_uint(d, IPROTO_TXN_FLAGS);
+			d = mp_encode_uint(d, TXN_FLAG_COMMIT);
+			map_size++;
+		}
+	}
 	assert(d <= data + XROW_HEADER_LEN_MAX);
 	mp_encode_map(data, map_size);
 	out->iov_len = d - (char *) out->iov_base;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 719add4f0..bc4c4a2d7 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -47,7 +47,7 @@ enum {
 	XROW_HEADER_IOVMAX = 1,
 	XROW_BODY_IOVMAX = 2,
 	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
-	XROW_HEADER_LEN_MAX = 40,
+	XROW_HEADER_LEN_MAX = 52,
 	XROW_BODY_LEN_MAX = 128,
 	IPROTO_HEADER_LEN = 28,
 	/** 7 = sizeof(iproto_body_bin). */
@@ -63,6 +63,8 @@ struct xrow_header {
 	uint64_t sync;
 	int64_t lsn; /* LSN must be signed for correct comparison */
 	double tm;
+	int64_t txn_id;
+	bool txn_commit;
 
 	int bodycnt;
 	uint32_t schema_version;
diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
index 022d1f998..bc99285de 100644
--- a/test/unit/xrow.cc
+++ b/test/unit/xrow.cc
@@ -215,6 +215,8 @@ test_xrow_header_encode_decode()
 	header.lsn = 400;
 	header.tm = 123.456;
 	header.bodycnt = 0;
+	header.txn_id = header.lsn;
+	header.txn_commit = true;
 	uint64_t sync = 100500;
 	struct iovec vec[1];
 	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
-- 
2.20.1

^ permalink raw reply	[flat|nested] 5+ messages in thread

* [tarantool-patches] [PATCH v3 2/2] Transaction support for applier
       [not found] <cover.1550001848.git.georgy@tarantool.org>
  2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries Georgy Kirichenko
@ 2019-02-12 20:04 ` Georgy Kirichenko
  2019-02-18  9:36   ` Vladimir Davydov
  1 sibling, 1 reply; 5+ messages in thread
From: Georgy Kirichenko @ 2019-02-12 20:04 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Applier fetch incoming rows to form a transaction and then apply it.
Implementation assumes that transaction could not mix in a
replication stream. Also distributed transaction are not supported yet.

Closes: #2798
Needed for: #980
---
 src/box/applier.cc                    | 185 +++++++++++++++-----
 test/replication/transaction.result   | 240 ++++++++++++++++++++++++++
 test/replication/transaction.test.lua |  86 +++++++++
 3 files changed, 471 insertions(+), 40 deletions(-)
 create mode 100644 test/replication/transaction.result
 create mode 100644 test/replication/transaction.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 7f37fe2ee..59c33bb84 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -48,6 +48,7 @@
 #include "error.h"
 #include "session.h"
 #include "cfg.h"
+#include "txn.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -378,6 +379,105 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
+/**
+ * Read one transaction from network.
+ * Transaction rows are placed into row_buf as an array, row's bodies are
+ * placed into obuf because it is not allowed to relocate row's bodies.
+ * Also we could not use applier input buffer because rpos adjusted after xrow
+ * decoding and corresponding space going to reuse.
+ *
+ * Note: current implementation grants that transaction could not be mixed, so
+ * we read each transaction from first xrow until xrow with txn_last = true.
+ */
+static int64_t
+applier_read_tx(struct applier *applier, struct ibuf *row_buf,
+		struct obuf *data_buf)
+{
+	struct xrow_header *row;
+	struct ev_io *coio = &applier->io;
+	struct ibuf *ibuf = &applier->ibuf;
+	int64_t txn_id = 0;
+
+	do {
+		row = (struct xrow_header *)ibuf_alloc(row_buf,
+						       sizeof(struct xrow_header));
+		if (row == NULL) {
+			diag_set(OutOfMemory, sizeof(struct xrow_header),
+				 "slab", "struct xrow_header");
+			goto error;
+		}
+
+		double timeout = replication_disconnect_timeout();
+		try {
+			/* TODO: we should have a C version of this function. */
+			coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+		} catch (...) {
+			goto error;
+		}
+
+		if (iproto_type_is_error(row->type)) {
+			xrow_decode_error(row);
+			goto error;
+		}
+
+		/* Replication request. */
+		if (row->replica_id == REPLICA_ID_NIL ||
+		    row->replica_id >= VCLOCK_MAX) {
+			/*
+			 * A safety net, this can only occur
+			 * if we're fed a strangely broken xlog.
+			 */
+			diag_set(ClientError, ER_UNKNOWN_REPLICA,
+				 int2str(row->replica_id),
+				 tt_uuid_str(&REPLICASET_UUID));
+			goto error;
+		}
+		if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
+			/*
+			 * First row in a transaction. In order to enforce
+			 * consistency check that first row lsn and replica id
+			 * match with transaction.
+			 */
+			txn_id = row->txn_id;
+			if (row->lsn != txn_id) {
+				/* There is not a first row in the transactions. */
+				diag_set(ClientError, ER_PROTOCOL,
+					 "Not a first row in a transaction");
+				goto error;
+			}
+		}
+		if (txn_id != row->txn_id) {
+			/* We are not able to handle interleaving transactions. */
+			diag_set(ClientError, ER_UNSUPPORTED,
+				 "replications",
+				 "interleaving transactions");
+			goto error;
+		}
+
+
+		applier->lag = ev_now(loop()) - row->tm;
+		applier->last_row_time = ev_monotonic_now(loop());
+
+		if (row->body->iov_base != NULL) {
+			void *new_base = obuf_alloc(data_buf, row->body->iov_len);
+			if (new_base == NULL) {
+				diag_set(OutOfMemory, row->body->iov_len,
+					 "slab", "xrow_data");
+				goto error;
+			}
+			memcpy(new_base, row->body->iov_base, row->body->iov_len);
+			row->body->iov_base = new_base;
+		}
+
+	} while (row->txn_commit == 0);
+
+	return 0;
+error:
+	ibuf_reset(row_buf);
+	obuf_reset(data_buf);
+	return -1;
+}
+
 /**
  * Execute and process SUBSCRIBE request (follow updates from a master).
  */
@@ -392,6 +492,10 @@ applier_subscribe(struct applier *applier)
 	struct xrow_header row;
 	struct vclock remote_vclock_at_subscribe;
 	struct tt_uuid cluster_id = uuid_nil;
+	struct ibuf row_buf;
+	struct obuf data_buf;
+	ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header));
+	obuf_create(&data_buf, &cord()->slabc, 0x10000);
 
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
 				 &replicaset.vclock);
@@ -501,36 +605,16 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		/*
-		 * Tarantool < 1.7.7 does not send periodic heartbeat
-		 * messages so we can't assume that if we haven't heard
-		 * from the master for quite a while the connection is
-		 * broken - the master might just be idle.
-		 */
-		if (applier->version_id < version_id(1, 7, 7)) {
-			coio_read_xrow(coio, ibuf, &row);
-		} else {
-			double timeout = replication_disconnect_timeout();
-			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
-		}
+		if (applier_read_tx(applier, &row_buf, &data_buf) != 0)
+			diag_raise();
 
-		if (iproto_type_is_error(row.type))
-			xrow_decode_error_xc(&row);  /* error */
-		/* Replication request. */
-		if (row.replica_id == REPLICA_ID_NIL ||
-		    row.replica_id >= VCLOCK_MAX) {
-			/*
-			 * A safety net, this can only occur
-			 * if we're fed a strangely broken xlog.
-			 */
-			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
-				  int2str(row.replica_id),
-				  tt_uuid_str(&REPLICASET_UUID));
-		}
+		struct txn *txn = NULL;
+		struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos;
+		struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1;
 
-		applier->lag = ev_now(loop()) - row.tm;
+		applier->lag = ev_now(loop()) - last_row->tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-		struct replica *replica = replica_by_id(row.replica_id);
+		struct replica *replica = replica_by_id(first_row->replica_id);
 		struct latch *latch = (replica ? &replica->order_latch :
 				       &replicaset.applier.order_latch);
 		/*
@@ -540,24 +624,45 @@ applier_subscribe(struct applier *applier)
 		 * that belong to the same server id.
 		 */
 		latch_lock(latch);
-		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			int res = xstream_write(applier->subscribe_stream, &row);
-			if (res != 0) {
-				struct error *e = diag_last_error(diag_get());
-				/**
-				 * Silently skip ER_TUPLE_FOUND error if such
-				 * option is set in config.
-				 */
-				if (e->type == &type_ClientError &&
+		if (vclock_get(&replicaset.vclock,
+			       first_row->replica_id) < first_row->lsn) {
+			struct xrow_header *row = first_row;
+			if (first_row != last_row)
+				txn = txn_begin(false);
+			int res = 0;
+			while (row <= last_row && res == 0) {
+				res = xstream_write(applier->subscribe_stream, row);
+				struct error *e;
+				if (res != 0 &&
+				    (e = diag_last_error(diag_get()))->type ==
+				    &type_ClientError &&
 				    box_error_code(e) == ER_TUPLE_FOUND &&
-				    replication_skip_conflict)
+				    replication_skip_conflict) {
+					/**
+					 * Silently skip ER_TUPLE_FOUND error
+					 * if such option is set in config.
+					 */
 					diag_clear(diag_get());
-				else {
-					latch_unlock(latch);
-					diag_raise();
+					row->type = IPROTO_NOP;
+					row->bodycnt = 0;
+					res = xstream_write(applier->subscribe_stream,
+							    row);
 				}
+				++row;
+			}
+			if (res == 0 && txn != NULL)
+				res = txn_commit(txn);
+
+			if (res != 0) {
+				txn_rollback();
+				obuf_reset(&data_buf);
+				ibuf_reset(&row_buf);
+				latch_unlock(latch);
+				diag_raise();
 			}
 		}
+		obuf_reset(&data_buf);
+		ibuf_reset(&row_buf);
 		latch_unlock(latch);
 
 		if (applier->state == APPLIER_SYNC ||
diff --git a/test/replication/transaction.result b/test/replication/transaction.result
new file mode 100644
index 000000000..009f84430
--- /dev/null
+++ b/test/replication/transaction.result
@@ -0,0 +1,240 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+---
+...
+_ = s:create_index('pk')
+---
+...
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+---
+- [4, 'r']
+...
+v1 = box.info.vclock
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+---
+...
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- set conflict to third transaction
+box.space.test:delete({3})
+---
+...
+box.space.test:replace({6, 'r'})
+---
+- [6, 'r']
+...
+-- restart replication
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+---
+- false
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+  - [6, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- check restart does not help
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+  - [6, 'r']
+...
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}, replication_skip_conflict = true}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- check last transaction applied without conflicting row
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+---
+- [8, 'r']
+...
+box.space.test:replace({9, 'r'})
+---
+- [9, 'r']
+...
+-- issue a conflicting tx
+test_run:cmd("switch default")
+---
+- true
+...
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- vclock should be increased but rows skipped
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+  - [8, 'r']
+  - [9, 'r']
+...
+-- check restart does not change something
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+  - [8, 'r']
+  - [9, 'r']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+s:drop()
+---
+...
diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
new file mode 100644
index 000000000..47003c644
--- /dev/null
+++ b/test/replication/transaction.test.lua
@@ -0,0 +1,86 @@
+env = require('test_run')
+test_run = env.new()
+box.schema.user.grant('guest', 'replication')
+
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+_ = s:create_index('pk')
+
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+v1 = box.info.vclock
+
+test_run:cmd("switch default")
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+-- set conflict to third transaction
+box.space.test:delete({3})
+box.space.test:replace({6, 'r'})
+-- restart replication
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+
+-- check restart does not help
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+box.cfg{replication = {}, replication_skip_conflict = true}
+box.cfg{replication = replication}
+
+-- check last transaction applied without conflicting row
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+box.space.test:replace({9, 'r'})
+
+-- issue a conflicting tx
+test_run:cmd("switch default")
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- vclock should be increased but rows skipped
+box.space.test:select()
+
+-- check restart does not change something
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+box.schema.user.revoke('guest', 'replication')
+s:drop()
-- 
2.20.1

^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries
  2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries Georgy Kirichenko
@ 2019-02-15 13:15   ` Vladimir Davydov
  2019-02-19 14:59     ` [tarantool-patches] " Konstantin Osipov
  0 siblings, 1 reply; 5+ messages in thread
From: Vladimir Davydov @ 2019-02-15 13:15 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Feb 12, 2019 at 11:04:31PM +0300, Georgy Kirichenko wrote:
> Append txn_id and txn_commit to xrow_header structure, txn_id identifies
> transaction id on replica where transaction was started. As transaction id
> a lsn of the first row in the transaction is used. txn_commit is set to true
> if it is the last row in a transaction, so we could commit transaction by the
> last row or by additional NOP requests with txn_commit set as well as
> start transaction with NOP and corresponding txn_id. In case of  replication
> all local changes moved to an journal entry tail to form a separate transaction
> (like autonomous transaction) to be able to replicate changes back.
> 
> As encoding/deconding rule assumed:
>  * txn_id and txn_commit are encoded only for multi-row transactions.
>    So if we do not have txn_id while row decoding then this means that it
>    is a single row transaction.
>  * TXN_ID field is differential encoded as lsn - txn_id value
>  * txn_commit packed into TXN_FLAGS field
> 
> These rules provide compatibility with previous xlog format as well
> as good compaction level.
> 
> Needed for: 2798
> ---
>  src/box/iproto_constants.c |  4 ++--
>  src/box/iproto_constants.h |  7 +++++++
>  src/box/txn.c              | 21 +++++++++++++++++----
>  src/box/txn.h              |  2 ++
>  src/box/wal.c              | 28 ++++++++++++++++++++++++----
>  src/box/xrow.c             | 36 ++++++++++++++++++++++++++++++++++++
>  src/box/xrow.h             |  4 +++-
>  test/unit/xrow.cc          |  2 ++
>  8 files changed, 93 insertions(+), 11 deletions(-)
> 
> diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
> index 7fd295775..4d2e21752 100644
> --- a/src/box/iproto_constants.c
> +++ b/src/box/iproto_constants.c
> @@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
>  		/* 0x05 */	MP_UINT,   /* IPROTO_SCHEMA_VERSION */
>  		/* 0x06 */	MP_UINT,   /* IPROTO_SERVER_VERSION */
>  		/* 0x07 */	MP_UINT,   /* IPROTO_GROUP_ID */
> +		/* 0x08 */	MP_UINT,   /* IPROTO_TXN_ID */
> +		/* 0x09 */	MP_UINT,   /* IPROTO_TXN_FLAGS */

I don't quite like the name, because we encode not a txn id, but the
statement number in a txn. Let's rename it to IPROTO_TXN_STMT_NO or
IPROTO_TXN_STMT_ID or IPROTO_TXN_STMT_SEQ?

xrow_header::txn_id is OK though.

>  	/* }}} */
>  
>  	/* {{{ unused */
> -		/* 0x08 */	MP_UINT,
> -		/* 0x09 */	MP_UINT,
>  		/* 0x0a */	MP_UINT,
>  		/* 0x0b */	MP_UINT,
>  		/* 0x0c */	MP_UINT,

You forgot to patch iproto_key_strs.

Let's please add a test that ensures that those new headers are written
to xlogs. You could use xlog reader for that.

> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index 728514297..fd80e3111 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -49,6 +49,11 @@ enum {
>  	XLOG_FIXHEADER_SIZE = 19
>  };
>  

/* IPROTO_TXN_FLAGS bits. */

> +enum {
> +	/** Set for the last xrow in a transaction. */
> +	TXN_FLAG_COMMIT = 0x01,

IPROTO_TXN_COMMIT?

> +};
> +
>  enum iproto_key {
>  	IPROTO_REQUEST_TYPE = 0x00,
>  	IPROTO_SYNC = 0x01,
> @@ -60,6 +65,8 @@ enum iproto_key {
>  	IPROTO_SCHEMA_VERSION = 0x05,
>  	IPROTO_SERVER_VERSION = 0x06,
>  	IPROTO_GROUP_ID = 0x07,
> +	IPROTO_TXN_ID = 0x08,
> +	IPROTO_TXN_FLAGS = 0x09,
>  	/* Leave a gap for other keys in the header. */
>  	IPROTO_SPACE_ID = 0x10,
>  	IPROTO_INDEX_ID = 0x11,
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 7f4e85b47..de0152706 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -34,6 +34,7 @@
>  #include "journal.h"
>  #include <fiber.h>
>  #include "xrow.h"
> +#include "replication.h"
>  
>  double too_long_threshold;
>  
> @@ -150,6 +151,7 @@ txn_begin(bool is_autocommit)
>  	txn->engine = NULL;
>  	txn->engine_tx = NULL;
>  	txn->psql_txn = NULL;
> +	txn->remote_row_count = 0;

Nit: let's rename it to n_remote_rows to match n_rows and keep the two
member initializers together.

>  	/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
>  	fiber_set_txn(fiber(), txn);
>  	return txn;
> @@ -231,6 +233,10 @@ txn_commit_stmt(struct txn *txn, struct request *request)
>  	 * stmt->space can be NULL for IRPOTO_NOP.
>  	 */
>  	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
> +		if (request->header &&
> +		    request->header->replica_id != instance_id &&
> +		    request->header->replica_id != 0)
> +			++txn->remote_row_count;

Nit: if we moved this after txn_add_redo(), then we wouldn't have to
check if request->header is set.

>  		if (txn_add_redo(stmt, request) != 0)
>  			goto fail;
>  		++txn->n_rows;
> @@ -271,14 +277,21 @@ txn_write_to_wal(struct txn *txn)
>  		return -1;
>  
>  	struct txn_stmt *stmt;
> -	struct xrow_header **row = req->rows;
> +	struct xrow_header **remote_row = req->rows;
> +	struct xrow_header **local_row = req->rows + txn->remote_row_count;
>  	stailq_foreach_entry(stmt, &txn->stmts, next) {
>  		if (stmt->row == NULL)
> -			continue; /* A read (e.g. select) request */
> -		*row++ = stmt->row;
> +			/* A read (e.g. select) request */
> +			continue;

Nit: pointless change, please remove.

> +		if (stmt->row->replica_id == instance_id ||
> +		    stmt->row->replica_id == 0)
> +			*local_row++ = stmt->row;
> +		else
> +			*remote_row++ = stmt->row;

This piece of code looks nice, but it definitely needs a comment: what
we do, why we do that...

Anyway, we should add a test for this change. May be, it's even worth
submitting this change in a separate patch.

>  		req->approx_len += xrow_approx_len(stmt->row);
>  	}
> -	assert(row == req->rows + req->n_rows);
> +	assert(remote_row == req->rows + txn->remote_row_count);
> +	assert(local_row == req->rows + req->n_rows);
>  
>  	ev_tstamp start = ev_monotonic_now(loop());
>  	int64_t res = journal_write(req);
> diff --git a/src/box/txn.h b/src/box/txn.h
> index de5cb0de4..143f21715 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -180,6 +180,8 @@ struct txn {
>  	 /** Commit and rollback triggers */
>  	struct rlist on_commit, on_rollback;
>  	struct sql_txn *psql_txn;
> +	/** Count of remote rows. */
> +	uint32_t remote_row_count;

Nit: please use int rather than uint32_t and move the definition after
n_rows, because those are closely related.

>  };
>  
>  /* Pointer to the current transaction (if any) */
> diff --git a/src/box/wal.c b/src/box/wal.c
> index cdcaabc00..0ea3be68d 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -896,10 +896,11 @@ wal_writer_begin_rollback(struct wal_writer *writer)
>  	cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback);
>  }
>  
> -static void
> -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> +static int
> +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
>  	       struct xrow_header **end)
>  {
> +	struct xrow_header **row = begin;
>  	/** Assign LSN to all local rows. */
>  	for ( ; row < end; row++) {
>  		if ((*row)->replica_id == 0) {
> @@ -909,6 +910,21 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
>  			vclock_follow_xrow(vclock, *row);
>  		}
>  	}
> +	while (begin < end && begin[0]->replica_id != instance_id)
> +		++begin;
> +	/* Setup txn_id and tnx_replica_id for locally generated rows. */
> +	row = begin;
> +	while (row < end) {
> +		if (row[0]->replica_id != instance_id) {
> +			diag_set(ClientError, ER_UNSUPPORTED,
> +				 "Interleaved transactions");
> +			return -1;

Do we really need to bother about it here, in WAL? IMO a check in
applier would be enough.

> +		}
> +		row[0]->txn_id = begin[0]->lsn;
> +		row[0]->txn_commit = row == end - 1 ? 1 : 0;
> +		++row;

Why can't we do this while we are iterating over rows just a few lines
above, assigning LSNs?

> +	}
> +	return 0;
>  }
>  
>  static void
> @@ -979,7 +995,9 @@ wal_write_to_disk(struct cmsg *msg)
>  	struct journal_entry *entry;
>  	struct stailq_entry *last_committed = NULL;
>  	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
> -		wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
> +		if (wal_assign_lsn(&vclock, entry->rows,
> +				   entry->rows + entry->n_rows) < 0)
> +			goto done;
>  		entry->res = vclock_sum(&vclock);
>  		rc = xlog_write_entry(l, entry);
>  		if (rc < 0)
> @@ -1173,7 +1191,9 @@ wal_write_in_wal_mode_none(struct journal *journal,
>  			   struct journal_entry *entry)
>  {
>  	struct wal_writer *writer = (struct wal_writer *) journal;
> -	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
> +	if (wal_assign_lsn(&writer->vclock, entry->rows,
> +			   entry->rows + entry->n_rows) != 0)
> +		return -1;
>  	vclock_copy(&replicaset.vclock, &writer->vclock);
>  	return vclock_sum(&writer->vclock);
>  }
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index fec8873d0..29fa75de4 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -102,6 +102,8 @@ error:
>  
>  	if (mp_typeof(**pos) != MP_MAP)
>  		goto error;
> +	bool txn_is_set = false;
> +	uint32_t txn_flags = 0;
>  
>  	uint32_t size = mp_decode_map(pos);
>  	for (uint32_t i = 0; i < size; i++) {
> @@ -133,12 +135,32 @@ error:
>  		case IPROTO_SCHEMA_VERSION:
>  			header->schema_version = mp_decode_uint(pos);
>  			break;
> +		case IPROTO_TXN_ID:
> +			txn_is_set = true;
> +			header->txn_id = mp_decode_uint(pos);
> +			break;
> +		case IPROTO_TXN_FLAGS:
> +			txn_flags = mp_decode_uint(pos);
> +			header->txn_commit = txn_flags & TXN_FLAG_COMMIT;
> +			if ((txn_flags & ~TXN_FLAG_COMMIT) != 0)
> +				/* Unknow flags. */
> +				goto error;

We silently ignore unknown headers. I think we can silently ignore
unknown flags as well.

> +			break;
>  		default:
>  			/* unknown header */
>  			mp_next(pos);
>  		}
>  	}
>  	assert(*pos <= end);
> +	if (!txn_is_set) {
> +		/*
> +		 * Transaction id is not set so it is a single statement
> +		 * transaction.
> +		 */
> +		header->txn_commit = true;
> +	}
> +	header->txn_id = header->lsn + header->txn_id;
> +
>  	/* Nop requests aren't supposed to have a body. */
>  	if (*pos < end && header->type != IPROTO_NOP) {
>  		const char *body = *pos;
> @@ -223,6 +245,20 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
>  		d = mp_encode_double(d, header->tm);
>  		map_size++;
>  	}
> +	if (header->txn_id != 0) {
> +		if (header->txn_id != header->lsn || header->txn_commit == 0) {

Nit: txn_commit is a bool so

	s/header->txn_commit == 0/!header->txn_commit

> +			/* Encode txn id for multi row transaction members. */
> +			d = mp_encode_uint(d, IPROTO_TXN_ID);
> +			d = mp_encode_uint(d, header->lsn - header->txn_id);
> +			map_size++;
> +		}
> +		if (header->txn_commit && header->txn_id != header->lsn) {
> +			/* Setup last row for multi row transaction. */
> +			d = mp_encode_uint(d, IPROTO_TXN_FLAGS);
> +			d = mp_encode_uint(d, TXN_FLAG_COMMIT);
> +			map_size++;
> +		}
> +	}
>  	assert(d <= data + XROW_HEADER_LEN_MAX);
>  	mp_encode_map(data, map_size);
>  	out->iov_len = d - (char *) out->iov_base;
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 719add4f0..bc4c4a2d7 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -47,7 +47,7 @@ enum {
>  	XROW_HEADER_IOVMAX = 1,
>  	XROW_BODY_IOVMAX = 2,
>  	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
> -	XROW_HEADER_LEN_MAX = 40,
> +	XROW_HEADER_LEN_MAX = 52,
>  	XROW_BODY_LEN_MAX = 128,
>  	IPROTO_HEADER_LEN = 28,
>  	/** 7 = sizeof(iproto_body_bin). */
> @@ -63,6 +63,8 @@ struct xrow_header {
>  	uint64_t sync;
>  	int64_t lsn; /* LSN must be signed for correct comparison */
>  	double tm;
> +	int64_t txn_id;
> +	bool txn_commit;

Please add a comment explaining how these are mapped to IPROTO headers.

>  
>  	int bodycnt;
>  	uint32_t schema_version;
> diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
> index 022d1f998..bc99285de 100644
> --- a/test/unit/xrow.cc
> +++ b/test/unit/xrow.cc
> @@ -215,6 +215,8 @@ test_xrow_header_encode_decode()
>  	header.lsn = 400;
>  	header.tm = 123.456;
>  	header.bodycnt = 0;
> +	header.txn_id = header.lsn;
> +	header.txn_commit = true;
>  	uint64_t sync = 100500;
>  	struct iovec vec[1];
>  	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");

^ permalink raw reply	[flat|nested] 5+ messages in thread

* Re: [tarantool-patches] [PATCH v3 2/2] Transaction support for applier
  2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 2/2] Transaction support for applier Georgy Kirichenko
@ 2019-02-18  9:36   ` Vladimir Davydov
  0 siblings, 0 replies; 5+ messages in thread
From: Vladimir Davydov @ 2019-02-18  9:36 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Feb 12, 2019 at 11:04:32PM +0300, Georgy Kirichenko wrote:
> Applier fetch incoming rows to form a transaction and then apply it.
> Implementation assumes that transaction could not mix in a
> replication stream. Also distributed transaction are not supported yet.
> 
> Closes: #2798
> Needed for: #980
> ---
>  src/box/applier.cc                    | 185 +++++++++++++++-----
>  test/replication/transaction.result   | 240 ++++++++++++++++++++++++++
>  test/replication/transaction.test.lua |  86 +++++++++
>  3 files changed, 471 insertions(+), 40 deletions(-)
>  create mode 100644 test/replication/transaction.result
>  create mode 100644 test/replication/transaction.test.lua
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 7f37fe2ee..59c33bb84 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -48,6 +48,7 @@
>  #include "error.h"
>  #include "session.h"
>  #include "cfg.h"
> +#include "txn.h"
>  
>  STRS(applier_state, applier_STATE);
>  
> @@ -378,6 +379,105 @@ applier_join(struct applier *applier)
>  	applier_set_state(applier, APPLIER_READY);
>  }
>  
> +/**
> + * Read one transaction from network.
> + * Transaction rows are placed into row_buf as an array, row's bodies are
> + * placed into obuf because it is not allowed to relocate row's bodies.
> + * Also we could not use applier input buffer because rpos adjusted after xrow
> + * decoding and corresponding space going to reuse.
> + *
> + * Note: current implementation grants that transaction could not be mixed, so
> + * we read each transaction from first xrow until xrow with txn_last = true.
> + */
> +static int64_t
> +applier_read_tx(struct applier *applier, struct ibuf *row_buf,
> +		struct obuf *data_buf)
> +{
> +	struct xrow_header *row;
> +	struct ev_io *coio = &applier->io;
> +	struct ibuf *ibuf = &applier->ibuf;
> +	int64_t txn_id = 0;
> +
> +	do {
> +		row = (struct xrow_header *)ibuf_alloc(row_buf,
> +						       sizeof(struct xrow_header));

Nit: the line's too for no reason, and there are more lines like that.
Please fix them where you can without making the code look ugly.

> +		if (row == NULL) {
> +			diag_set(OutOfMemory, sizeof(struct xrow_header),
> +				 "slab", "struct xrow_header");
> +			goto error;
> +		}
> +
> +		double timeout = replication_disconnect_timeout();
> +		try {
> +			/* TODO: we should have a C version of this function. */
> +			coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);

Nit: IMO better use guards to free resources, if any.

> +		} catch (...) {
> +			goto error;
> +		}
> +
> +		if (iproto_type_is_error(row->type)) {
> +			xrow_decode_error(row);
> +			goto error;
> +		}
> +
> +		/* Replication request. */
> +		if (row->replica_id == REPLICA_ID_NIL ||
> +		    row->replica_id >= VCLOCK_MAX) {
> +			/*
> +			 * A safety net, this can only occur
> +			 * if we're fed a strangely broken xlog.
> +			 */
> +			diag_set(ClientError, ER_UNKNOWN_REPLICA,
> +				 int2str(row->replica_id),
> +				 tt_uuid_str(&REPLICASET_UUID));
> +			goto error;
> +		}
> +		if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
> +			/*
> +			 * First row in a transaction. In order to enforce
> +			 * consistency check that first row lsn and replica id
> +			 * match with transaction.
> +			 */
> +			txn_id = row->txn_id;
> +			if (row->lsn != txn_id) {
> +				/* There is not a first row in the transactions. */
> +				diag_set(ClientError, ER_PROTOCOL,
> +					 "Not a first row in a transaction");
> +				goto error;
> +			}
> +		}
> +		if (txn_id != row->txn_id) {
> +			/* We are not able to handle interleaving transactions. */
> +			diag_set(ClientError, ER_UNSUPPORTED,
> +				 "replications",
> +				 "interleaving transactions");
> +			goto error;
> +		}
> +
> +
> +		applier->lag = ev_now(loop()) - row->tm;
> +		applier->last_row_time = ev_monotonic_now(loop());
> +
> +		if (row->body->iov_base != NULL) {
> +			void *new_base = obuf_alloc(data_buf, row->body->iov_len);
> +			if (new_base == NULL) {
> +				diag_set(OutOfMemory, row->body->iov_len,
> +					 "slab", "xrow_data");
> +				goto error;
> +			}
> +			memcpy(new_base, row->body->iov_base, row->body->iov_len);
> +			row->body->iov_base = new_base;

So we first read a row to an ibuf, then copy it to an obuf. I understand
that you do this, because xrow_header::body has pointers in it. Still it
looks rather awkward. May be, we'd better temporarily fix up those
pointers to store relative offsets instead?

> +		}
> +
> +	} while (row->txn_commit == 0);
> +
> +	return 0;
> +error:
> +	ibuf_reset(row_buf);
> +	obuf_reset(data_buf);

obuf_reset, ibuf_reset don't free up memory. You must use obuf_destroy,
ibuf_destroy.

> +	return -1;
> +}
> +
>  /**
>   * Execute and process SUBSCRIBE request (follow updates from a master).
>   */
> @@ -392,6 +492,10 @@ applier_subscribe(struct applier *applier)
>  	struct xrow_header row;
>  	struct vclock remote_vclock_at_subscribe;
>  	struct tt_uuid cluster_id = uuid_nil;
> +	struct ibuf row_buf;
> +	struct obuf data_buf;

IMO the buffers better be a part of the applier struct. Then you
wouldn't need to use a label to free them up in applier_read_tx and
hence could simply let the exception thrown by coio_read_xrow_timeout_xc
travel up the stack without wrapping it in that awkward try-catch block.

> +	ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header));
> +	obuf_create(&data_buf, &cord()->slabc, 0x10000);

This constant look like magic to me.

>  
>  	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
>  				 &replicaset.vclock);
> @@ -501,36 +605,16 @@ applier_subscribe(struct applier *applier)
>  			applier_set_state(applier, APPLIER_FOLLOW);
>  		}
>  
> -		/*
> -		 * Tarantool < 1.7.7 does not send periodic heartbeat
> -		 * messages so we can't assume that if we haven't heard
> -		 * from the master for quite a while the connection is
> -		 * broken - the master might just be idle.
> -		 */
> -		if (applier->version_id < version_id(1, 7, 7)) {
> -			coio_read_xrow(coio, ibuf, &row);

You silently dropped this branch. Please leave it be.

> -		} else {
> -			double timeout = replication_disconnect_timeout();
> -			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
> -		}
> +		if (applier_read_tx(applier, &row_buf, &data_buf) != 0)
> +			diag_raise();
>  
> -		if (iproto_type_is_error(row.type))
> -			xrow_decode_error_xc(&row);  /* error */
> -		/* Replication request. */
> -		if (row.replica_id == REPLICA_ID_NIL ||
> -		    row.replica_id >= VCLOCK_MAX) {
> -			/*
> -			 * A safety net, this can only occur
> -			 * if we're fed a strangely broken xlog.
> -			 */
> -			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
> -				  int2str(row.replica_id),
> -				  tt_uuid_str(&REPLICASET_UUID));
> -		}
> +		struct txn *txn = NULL;
> +		struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos;
> +		struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1;
>  
> -		applier->lag = ev_now(loop()) - row.tm;
> +		applier->lag = ev_now(loop()) - last_row->tm;
>  		applier->last_row_time = ev_monotonic_now(loop());
> -		struct replica *replica = replica_by_id(row.replica_id);
> +		struct replica *replica = replica_by_id(first_row->replica_id);
>  		struct latch *latch = (replica ? &replica->order_latch :
>  				       &replicaset.applier.order_latch);
>  		/*
> @@ -540,24 +624,45 @@ applier_subscribe(struct applier *applier)
>  		 * that belong to the same server id.
>  		 */
>  		latch_lock(latch);
> -		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> -			int res = xstream_write(applier->subscribe_stream, &row);
> -			if (res != 0) {
> -				struct error *e = diag_last_error(diag_get());
> -				/**
> -				 * Silently skip ER_TUPLE_FOUND error if such
> -				 * option is set in config.
> -				 */
> -				if (e->type == &type_ClientError &&
> +		if (vclock_get(&replicaset.vclock,
> +			       first_row->replica_id) < first_row->lsn) {
> +			struct xrow_header *row = first_row;
> +			if (first_row != last_row)
> +				txn = txn_begin(false);

We have a nice level of abstraction implemented by xstream, but now you
bluntly break it by calling txn_begin/commit directly. Please come up
with a better solution, e.g. you could extend the xstream interface with
write_tx method.

> +			int res = 0;
> +			while (row <= last_row && res == 0) {
> +				res = xstream_write(applier->subscribe_stream, row);
> +				struct error *e;
> +				if (res != 0 &&
> +				    (e = diag_last_error(diag_get()))->type ==
> +				    &type_ClientError &&
>  				    box_error_code(e) == ER_TUPLE_FOUND &&
> -				    replication_skip_conflict)
> +				    replication_skip_conflict) {
> +					/**
> +					 * Silently skip ER_TUPLE_FOUND error
> +					 * if such option is set in config.
> +					 */
>  					diag_clear(diag_get());
> -				else {
> -					latch_unlock(latch);
> -					diag_raise();
> +					row->type = IPROTO_NOP;
> +					row->bodycnt = 0;
> +					res = xstream_write(applier->subscribe_stream,
> +							    row);
>  				}
> +				++row;
> +			}
> +			if (res == 0 && txn != NULL)
> +				res = txn_commit(txn);
> +
> +			if (res != 0) {
> +				txn_rollback();
> +				obuf_reset(&data_buf);
> +				ibuf_reset(&row_buf);
> +				latch_unlock(latch);
> +				diag_raise();
>  			}
>  		}
> +		obuf_reset(&data_buf);
> +		ibuf_reset(&row_buf);
>  		latch_unlock(latch);
>  
>  		if (applier->state == APPLIER_SYNC ||
> diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
> new file mode 100644
> index 000000000..47003c644
> --- /dev/null
> +++ b/test/replication/transaction.test.lua
> @@ -0,0 +1,86 @@
> +env = require('test_run')
> +test_run = env.new()
> +box.schema.user.grant('guest', 'replication')
> +
> +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
> +_ = s:create_index('pk')
> +
> +-- transaction w/o conflict
> +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()

Whenever you add a test, please write a few words about what it does and
the resolution of which ticket it is supposed to test.

^ permalink raw reply	[flat|nested] 5+ messages in thread

* [tarantool-patches] Re: [PATCH v3 1/2] Journal transaction boundaries
  2019-02-15 13:15   ` Vladimir Davydov
@ 2019-02-19 14:59     ` Konstantin Osipov
  0 siblings, 0 replies; 5+ messages in thread
From: Konstantin Osipov @ 2019-02-19 14:59 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Vladimir Davydov <vdavydov.dev@gmail.com> [19/02/15 16:17]:
> > +		/* 0x08 */	MP_UINT,   /* IPROTO_TXN_ID */
> > +		/* 0x09 */	MP_UINT,   /* IPROTO_TXN_FLAGS */
> 
> I don't quite like the name, because we encode not a txn id, but the
> statement number in a txn. Let's rename it to IPROTO_TXN_STMT_NO or
> IPROTO_TXN_STMT_ID or IPROTO_TXN_STMT_SEQ?

How about IRPOTO_TSN and IPROTO_FLAGS?

> > +enum {
> > +	/** Set for the last xrow in a transaction. */
> > +	TXN_FLAG_COMMIT = 0x01,
> 
> IPROTO_TXN_COMMIT?

IRPOTO_FLAG_COMMIT?

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 5+ messages in thread

end of thread, other threads:[~2019-02-19 14:59 UTC | newest]

Thread overview: 5+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
     [not found] <cover.1550001848.git.georgy@tarantool.org>
2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries Georgy Kirichenko
2019-02-15 13:15   ` Vladimir Davydov
2019-02-19 14:59     ` [tarantool-patches] " Konstantin Osipov
2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 2/2] Transaction support for applier Georgy Kirichenko
2019-02-18  9:36   ` Vladimir Davydov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox