Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH v5 1/2] Journal transaction boundaries
       [not found] <cover.1550762885.git.georgy@tarantool.org>
@ 2019-02-21 15:29 ` Georgy Kirichenko
  2019-02-21 16:25   ` Vladimir Davydov
  2019-02-21 15:29 ` [tarantool-patches] [PATCH v5 2/2] Transaction support for applier Georgy Kirichenko
  1 sibling, 1 reply; 4+ messages in thread
From: Georgy Kirichenko @ 2019-02-21 15:29 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Append txn_id and is_commit to xrow_header structure, txn_id identifies
transaction id on replica where transaction was started. As transaction id
a lsn of the first row in the transaction is used. is_commit is set to true
for the last row in a transaction.

As encoding/deconding rule assumed:
 * txn_id encoded using transaction sequence number iproto field
   as IPROTO_TSN = lsn - txn_id,
 * is_commit packed into IPROTO_FLAGS field with a bit mask,
 * txn_id and is_commit are encoded only for multi-row transactions.
   So if we do not have txn_id after row decoding then this means that it
   is a single row transaction.

These rules provide compatibility with previous xlog format as well
as good compaction level.

Needed for: 2798
---
 src/box/iproto_constants.c     |   8 +--
 src/box/iproto_constants.h     |   7 +++
 src/box/lua/xlog.c             |  10 +++
 src/box/wal.c                  |   5 ++
 src/box/xrow.c                 |  41 +++++++++++++
 src/box/xrow.h                 |   4 +-
 test/unit/xrow.cc              |   2 +
 test/xlog/transaction.result   | 108 +++++++++++++++++++++++++++++++++
 test/xlog/transaction.test.lua |  46 ++++++++++++++
 9 files changed, 226 insertions(+), 5 deletions(-)
 create mode 100644 test/xlog/transaction.result
 create mode 100644 test/xlog/transaction.test.lua

diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index 7fd295775..09ded1ecb 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
 		/* 0x05 */	MP_UINT,   /* IPROTO_SCHEMA_VERSION */
 		/* 0x06 */	MP_UINT,   /* IPROTO_SERVER_VERSION */
 		/* 0x07 */	MP_UINT,   /* IPROTO_GROUP_ID */
+		/* 0x08 */	MP_UINT,   /* IPROTO_TSN */
+		/* 0x09 */	MP_UINT,   /* IPROTO_FLAGS */
 	/* }}} */
 
 	/* {{{ unused */
-		/* 0x08 */	MP_UINT,
-		/* 0x09 */	MP_UINT,
 		/* 0x0a */	MP_UINT,
 		/* 0x0b */	MP_UINT,
 		/* 0x0c */	MP_UINT,
@@ -136,8 +136,8 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = {
 	"schema version",   /* 0x05 */
 	"server version",   /* 0x06 */
 	"group id",         /* 0x07 */
-	NULL,               /* 0x08 */
-	NULL,               /* 0x09 */
+	"tsn",              /* 0x08 */
+	"flags",            /* 0x09 */
 	NULL,               /* 0x0a */
 	NULL,               /* 0x0b */
 	NULL,               /* 0x0c */
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 728514297..126d73352 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -49,6 +49,11 @@ enum {
 	XLOG_FIXHEADER_SIZE = 19
 };
 
+enum {
+	/** Set for the last xrow in a transaction. */
+	IPROTO_FLAG_COMMIT = 0x01,
+};
+
 enum iproto_key {
 	IPROTO_REQUEST_TYPE = 0x00,
 	IPROTO_SYNC = 0x01,
@@ -60,6 +65,8 @@ enum iproto_key {
 	IPROTO_SCHEMA_VERSION = 0x05,
 	IPROTO_SERVER_VERSION = 0x06,
 	IPROTO_GROUP_ID = 0x07,
+	IPROTO_TSN = 0x08,
+	IPROTO_FLAGS = 0x09,
 	/* Leave a gap for other keys in the header. */
 	IPROTO_SPACE_ID = 0x10,
 	IPROTO_INDEX_ID = 0x11,
diff --git a/src/box/lua/xlog.c b/src/box/lua/xlog.c
index 3c7cab38c..bd30187ae 100644
--- a/src/box/lua/xlog.c
+++ b/src/box/lua/xlog.c
@@ -221,6 +221,16 @@ lbox_xlog_parser_iterate(struct lua_State *L)
 		lua_pushnumber(L, row.tm);
 		lua_settable(L, -3); /* timestamp */
 	}
+	if (row.txn_id != row.lsn || !row.is_commit) {
+		lua_pushstring(L, "txn_id");
+		lua_pushnumber(L, row.txn_id);
+		lua_settable(L, -3); /* txn_id */
+	}
+	if (row.is_commit && row.txn_id != row.lsn) {
+		lua_pushstring(L, "commit");
+		lua_pushboolean(L, true);
+		lua_settable(L, -3); /* txn_commit flag */
+	}
 
 	lua_settable(L, -3); /* HEADER */
 
diff --git a/src/box/wal.c b/src/box/wal.c
index b2652bb17..78112da77 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -895,12 +895,17 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 	       struct xrow_header **row,
 	       struct xrow_header **end)
 {
+	int64_t txn_id = 0;
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
 		if ((*row)->replica_id == 0) {
 			(*row)->lsn = vclock_inc(vclock_diff, instance_id) +
 				      vclock_get(base, instance_id);
 			(*row)->replica_id = instance_id;
+			/* Use the lsn of the first local row as txn_id. */
+			txn_id = txn_id == 0? (*row)->lsn: txn_id;
+			(*row)->txn_id = txn_id;
+			(*row)->is_commit = row == end - 1;
 		} else {
 			vclock_follow(vclock_diff, (*row)->replica_id,
 				      (*row)->lsn - vclock_get(base,
diff --git a/src/box/xrow.c b/src/box/xrow.c
index fec8873d0..8496a694f 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -102,6 +102,8 @@ error:
 
 	if (mp_typeof(**pos) != MP_MAP)
 		goto error;
+	bool has_tsn = false;
+	uint32_t flags = 0;
 
 	uint32_t size = mp_decode_map(pos);
 	for (uint32_t i = 0; i < size; i++) {
@@ -133,12 +135,30 @@ error:
 		case IPROTO_SCHEMA_VERSION:
 			header->schema_version = mp_decode_uint(pos);
 			break;
+		case IPROTO_TSN:
+			has_tsn = true;
+			header->txn_id = mp_decode_uint(pos);
+			break;
+		case IPROTO_FLAGS:
+			flags = mp_decode_uint(pos);
+			header->is_commit = flags & IPROTO_FLAG_COMMIT;
+			break;
 		default:
 			/* unknown header */
 			mp_next(pos);
 		}
 	}
 	assert(*pos <= end);
+	if (!has_tsn) {
+		/*
+		 * Transaction id is not set so it is a single statement
+		 * transaction.
+		 */
+		header->is_commit = true;
+	}
+	/* Restore transaction id from lsn and transaction serial number. */
+	header->txn_id = header->lsn - header->txn_id;
+
 	/* Nop requests aren't supposed to have a body. */
 	if (*pos < end && header->type != IPROTO_NOP) {
 		const char *body = *pos;
@@ -223,6 +243,27 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
 		d = mp_encode_double(d, header->tm);
 		map_size++;
 	}
+	if (header->txn_id != 0) {
+		if (header->txn_id != header->lsn || header->is_commit == 0) {
+			/*
+			 * Encode a transaction identifier for multi row
+			 * transaction members.
+			 */
+			d = mp_encode_uint(d, IPROTO_TSN);
+			/*
+			 * Differential encoding: write a transaction serial
+			 * number (it is equal to lsn - transaction id) instead.
+			 */
+			d = mp_encode_uint(d, header->lsn - header->txn_id);
+			map_size++;
+		}
+		if (header->is_commit && header->txn_id != header->lsn) {
+			/* Setup last row for multi row transaction. */
+			d = mp_encode_uint(d, IPROTO_FLAGS);
+			d = mp_encode_uint(d, IPROTO_FLAG_COMMIT);
+			map_size++;
+		}
+	}
 	assert(d <= data + XROW_HEADER_LEN_MAX);
 	mp_encode_map(data, map_size);
 	out->iov_len = d - (char *) out->iov_base;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 2fce83bbc..ccd57f8b2 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -47,7 +47,7 @@ enum {
 	XROW_HEADER_IOVMAX = 1,
 	XROW_BODY_IOVMAX = 2,
 	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
-	XROW_HEADER_LEN_MAX = 40,
+	XROW_HEADER_LEN_MAX = 52,
 	XROW_BODY_LEN_MAX = 128,
 	IPROTO_HEADER_LEN = 28,
 	/** 7 = sizeof(iproto_body_bin). */
@@ -63,6 +63,8 @@ struct xrow_header {
 	uint64_t sync;
 	int64_t lsn; /* LSN must be signed for correct comparison */
 	double tm;
+	int64_t txn_id;
+	bool is_commit;
 
 	int bodycnt;
 	uint32_t schema_version;
diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
index 022d1f998..e0e7f1279 100644
--- a/test/unit/xrow.cc
+++ b/test/unit/xrow.cc
@@ -215,6 +215,8 @@ test_xrow_header_encode_decode()
 	header.lsn = 400;
 	header.tm = 123.456;
 	header.bodycnt = 0;
+	header.txn_id = header.lsn;
+	header.is_commit = true;
 	uint64_t sync = 100500;
 	struct iovec vec[1];
 	is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
diff --git a/test/xlog/transaction.result b/test/xlog/transaction.result
new file mode 100644
index 000000000..d2d9053ae
--- /dev/null
+++ b/test/xlog/transaction.result
@@ -0,0 +1,108 @@
+fio = require('fio')
+---
+...
+xlog = require('xlog').pairs
+---
+...
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function read_xlog(file)
+    local val = {}
+    for k, v in xlog(file) do
+        table.insert(val, setmetatable(v, { __serialize = "map"}))
+    end
+    return val
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- gh-2798 check for journal transaction encoding
+_ = box.schema.space.create('test'):create_index('pk')
+---
+...
+-- generate a new xlog
+box.snapshot()
+---
+- ok
+...
+lsn = box.info.lsn
+---
+...
+-- autocommit transaction
+box.space.test:replace({1})
+---
+- [1]
+...
+-- one row transaction
+box.begin() box.space.test:replace({2}) box.commit()
+---
+...
+-- two row transaction
+box.begin() for i = 3, 4 do box.space.test:replace({i}) end box.commit()
+---
+...
+-- four row transaction
+box.begin() for i = 5, 8 do box.space.test:replace({i}) end box.commit()
+---
+...
+-- open a new xlog
+box.snapshot()
+---
+- ok
+...
+-- read a previous one
+lsn_str = tostring(lsn)
+---
+...
+data = read_xlog(fio.pathjoin(box.cfg.wal_dir, string.rep('0', 20 - #lsn_str) .. tostring(lsn_str) .. '.xlog'))
+---
+...
+-- check nothing changed for single row transactions
+data[1].HEADER.txn_id == nil and data[1].HEADER.commit == nil
+---
+- true
+...
+data[2].HEADER.txn_id == nil and data[2].HEADER.commit == nil
+---
+- true
+...
+-- check two row transaction
+data[3].HEADER.txn_id == data[3].HEADER.lsn and data[3].HEADER.commit == nil
+---
+- true
+...
+data[4].HEADER.txn_id == data[3].HEADER.txn_id and data[4].HEADER.commit == true
+---
+- true
+...
+-- check four row transaction
+data[5].HEADER.txn_id == data[5].HEADER.lsn and data[5].HEADER.commit == nil
+---
+- true
+...
+data[6].HEADER.txn_id == data[5].HEADER.txn_id and data[6].HEADER.commit == nil
+---
+- true
+...
+data[7].HEADER.txn_id == data[5].HEADER.txn_id and data[7].HEADER.commit == nil
+---
+- true
+...
+data[8].HEADER.txn_id == data[5].HEADER.txn_id and data[8].HEADER.commit == true
+---
+- true
+...
+box.space.test:drop()
+---
+...
diff --git a/test/xlog/transaction.test.lua b/test/xlog/transaction.test.lua
new file mode 100644
index 000000000..062635098
--- /dev/null
+++ b/test/xlog/transaction.test.lua
@@ -0,0 +1,46 @@
+fio = require('fio')
+xlog = require('xlog').pairs
+env = require('test_run')
+test_run = env.new()
+
+test_run:cmd("setopt delimiter ';'")
+function read_xlog(file)
+    local val = {}
+    for k, v in xlog(file) do
+        table.insert(val, setmetatable(v, { __serialize = "map"}))
+    end
+    return val
+end;
+test_run:cmd("setopt delimiter ''");
+
+
+-- gh-2798 check for journal transaction encoding
+_ = box.schema.space.create('test'):create_index('pk')
+-- generate a new xlog
+box.snapshot()
+lsn = box.info.lsn
+-- autocommit transaction
+box.space.test:replace({1})
+-- one row transaction
+box.begin() box.space.test:replace({2}) box.commit()
+-- two row transaction
+box.begin() for i = 3, 4 do box.space.test:replace({i}) end box.commit()
+-- four row transaction
+box.begin() for i = 5, 8 do box.space.test:replace({i}) end box.commit()
+-- open a new xlog
+box.snapshot()
+-- read a previous one
+lsn_str = tostring(lsn)
+data = read_xlog(fio.pathjoin(box.cfg.wal_dir, string.rep('0', 20 - #lsn_str) .. tostring(lsn_str) .. '.xlog'))
+-- check nothing changed for single row transactions
+data[1].HEADER.txn_id == nil and data[1].HEADER.commit == nil
+data[2].HEADER.txn_id == nil and data[2].HEADER.commit == nil
+-- check two row transaction
+data[3].HEADER.txn_id == data[3].HEADER.lsn and data[3].HEADER.commit == nil
+data[4].HEADER.txn_id == data[3].HEADER.txn_id and data[4].HEADER.commit == true
+-- check four row transaction
+data[5].HEADER.txn_id == data[5].HEADER.lsn and data[5].HEADER.commit == nil
+data[6].HEADER.txn_id == data[5].HEADER.txn_id and data[6].HEADER.commit == nil
+data[7].HEADER.txn_id == data[5].HEADER.txn_id and data[7].HEADER.commit == nil
+data[8].HEADER.txn_id == data[5].HEADER.txn_id and data[8].HEADER.commit == true
+box.space.test:drop()
-- 
2.20.1

^ permalink raw reply	[flat|nested] 4+ messages in thread

* [tarantool-patches] [PATCH v5 2/2] Transaction support for applier
       [not found] <cover.1550762885.git.georgy@tarantool.org>
  2019-02-21 15:29 ` [tarantool-patches] [PATCH v5 1/2] Journal transaction boundaries Georgy Kirichenko
@ 2019-02-21 15:29 ` Georgy Kirichenko
  2019-03-01  9:50   ` Vladimir Davydov
  1 sibling, 1 reply; 4+ messages in thread
From: Georgy Kirichenko @ 2019-02-21 15:29 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Applier fetch incoming rows to form a transaction and then apply it.
In case of  replication all local changes moved to an journal entry
tail to form a separate transaction (like autonomous transaction)
to be able to replicate changes back. Applier assumes that transactions
could not be mixed in a replication stream.

Closes: #2798
Needed for: #980
---
 src/box/applier.cc                    | 216 ++++++++++++++++++-----
 src/box/applier.h                     |   5 +
 src/box/txn.c                         |  17 +-
 src/box/txn.h                         |   4 +
 test/replication/transaction.result   | 240 ++++++++++++++++++++++++++
 test/replication/transaction.test.lua |  86 +++++++++
 6 files changed, 520 insertions(+), 48 deletions(-)
 create mode 100644 test/replication/transaction.result
 create mode 100644 test/replication/transaction.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index e9addcb3e..b43860388 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -48,6 +48,14 @@
 #include "error.h"
 #include "session.h"
 #include "cfg.h"
+#include "txn.h"
+
+enum {
+	/* Initial capacity for tx in rows. */
+	APPLIER_ROW_COUNT = 32,
+	/* Initial size of tx data buffer. */
+	APPLIER_ROW_DATA_SIZE = 0x1000,
+};
 
 STRS(applier_state, applier_STATE);
 
@@ -379,6 +387,117 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
+/**
+ * Read one transaction from network using applier's input buffer.
+ * Transaction rows are placed into row_buf as an array, row's bodies are
+ * placed into obuf because it is not allowed to relocate row's bodies.
+ * We could not use applier input buffer for that because rpos is adjusted
+ * after each xrow decoding and corresponding network input space is going
+ * to be reused.
+ *
+ * Also preserved rows data into a detached space (row and data buffers)
+ * make us able to continue networking and then issue next transactions even
+ * if the current one is still in progress.
+ */
+static int64_t
+applier_read_tx(struct applier *applier, struct ibuf *row_buf,
+		struct obuf *data_buf)
+{
+	struct xrow_header *row;
+	struct ev_io *coio = &applier->io;
+	struct ibuf *ibuf = &applier->ibuf;
+	int64_t txn_id = 0;
+
+	do {
+		row = (struct xrow_header *)ibuf_alloc(row_buf,
+						       sizeof(struct xrow_header));
+		if (row == NULL) {
+			diag_set(OutOfMemory, sizeof(struct xrow_header),
+				 "slab", "struct xrow_header");
+			goto error;
+		}
+
+		double timeout = replication_disconnect_timeout();
+		/*
+		 * Unfortunately we do not have C-version of coio read xrow
+		 * functions yet so use try-catch guard as workaround.
+		 */
+		try {
+			/*
+			 * Tarantool < 1.7.7 does not send periodic heartbeat
+			 * messages so we can't assume that if we haven't heard
+			 * from the master for quite a while the connection is
+			 * broken - the master might just be idle.
+			 */
+			if (applier->version_id < version_id(1, 7, 7))
+				coio_read_xrow(coio, ibuf, row);
+			else
+				coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+		} catch (...) {
+			goto error;
+		}
+
+		if (iproto_type_is_error(row->type)) {
+			xrow_decode_error(row);
+			goto error;
+		}
+
+		/* Replication request. */
+		if (row->replica_id == REPLICA_ID_NIL ||
+		    row->replica_id >= VCLOCK_MAX) {
+			/*
+			 * A safety net, this can only occur
+			 * if we're fed a strangely broken xlog.
+			 */
+			diag_set(ClientError, ER_UNKNOWN_REPLICA,
+				 int2str(row->replica_id),
+				 tt_uuid_str(&REPLICASET_UUID));
+			goto error;
+		}
+		if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
+			/*
+			 * First row in a transaction. In order to enforce
+			 * consistency check that first row lsn and replica id
+			 * match with transaction.
+			 */
+			txn_id = row->txn_id;
+			if (row->lsn != txn_id) {
+				/* There is not a first row in the transactions. */
+				diag_set(ClientError, ER_PROTOCOL,
+					 "Not a first row in a transaction");
+				goto error;
+			}
+		}
+		if (txn_id != row->txn_id) {
+			/* We are not able to handle interleaving transactions. */
+			diag_set(ClientError, ER_UNSUPPORTED,
+				 "replications",
+				 "interleaving transactions");
+			goto error;
+		}
+
+
+		applier->lag = ev_now(loop()) - row->tm;
+		applier->last_row_time = ev_monotonic_now(loop());
+
+		if (row->body->iov_base != NULL) {
+			void *new_base = obuf_alloc(data_buf, row->body->iov_len);
+			if (new_base == NULL) {
+				diag_set(OutOfMemory, row->body->iov_len,
+					 "slab", "xrow_data");
+				goto error;
+			}
+			memcpy(new_base, row->body->iov_base, row->body->iov_len);
+			row->body->iov_base = new_base;
+		}
+
+	} while (row->is_commit == 0);
+
+	return 0;
+error:
+	return -1;
+}
+
 /**
  * Execute and process SUBSCRIBE request (follow updates from a master).
  */
@@ -390,6 +509,8 @@ applier_subscribe(struct applier *applier)
 	/* Send SUBSCRIBE request */
 	struct ev_io *coio = &applier->io;
 	struct ibuf *ibuf = &applier->ibuf;
+	struct ibuf *row_buf = &applier->row_buf;
+	struct obuf *data_buf = &applier->data_buf;
 	struct xrow_header row;
 	struct vclock remote_vclock_at_subscribe;
 	struct tt_uuid cluster_id = uuid_nil;
@@ -510,36 +631,16 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		/*
-		 * Tarantool < 1.7.7 does not send periodic heartbeat
-		 * messages so we can't assume that if we haven't heard
-		 * from the master for quite a while the connection is
-		 * broken - the master might just be idle.
-		 */
-		if (applier->version_id < version_id(1, 7, 7)) {
-			coio_read_xrow(coio, ibuf, &row);
-		} else {
-			double timeout = replication_disconnect_timeout();
-			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
-		}
+		if (applier_read_tx(applier, row_buf, data_buf) != 0)
+			diag_raise();
 
-		if (iproto_type_is_error(row.type))
-			xrow_decode_error_xc(&row);  /* error */
-		/* Replication request. */
-		if (row.replica_id == REPLICA_ID_NIL ||
-		    row.replica_id >= VCLOCK_MAX) {
-			/*
-			 * A safety net, this can only occur
-			 * if we're fed a strangely broken xlog.
-			 */
-			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
-				  int2str(row.replica_id),
-				  tt_uuid_str(&REPLICASET_UUID));
-		}
+		struct txn *txn = NULL;
+		struct xrow_header *first_row = (struct xrow_header *)row_buf->rpos;
+		struct xrow_header *last_row = (struct xrow_header *)row_buf->wpos - 1;
 
-		applier->lag = ev_now(loop()) - row.tm;
+		applier->lag = ev_now(loop()) - last_row->tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-		struct replica *replica = replica_by_id(row.replica_id);
+		struct replica *replica = replica_by_id(first_row->replica_id);
 		struct latch *latch = (replica ? &replica->order_latch :
 				       &replicaset.applier.order_latch);
 		/*
@@ -549,34 +650,51 @@ applier_subscribe(struct applier *applier)
 		 * that belong to the same server id.
 		 */
 		latch_lock(latch);
-		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			int res = xstream_write(applier->subscribe_stream, &row);
-			if (res != 0) {
-				struct error *e = diag_last_error(diag_get());
-				/*
-				 * In case of ER_TUPLE_FOUND error and enabled
-				 * replication_skip_conflict configuration
-				 * option, skip applying the foreign row and
-				 * replace it with NOP in the local write ahead
-				 * log.
-				 */
-				if (e->type == &type_ClientError &&
+		if (vclock_get(&replicaset.vclock,
+			       first_row->replica_id) < first_row->lsn) {
+			struct xrow_header *row = first_row;
+			if (first_row != last_row)
+				txn = txn_begin(false);
+			int res = 0;
+			while (row <= last_row && res == 0) {
+				res = xstream_write(applier->subscribe_stream, row);
+				struct error *e;
+				if (res != 0 &&
+				    (e = diag_last_error(diag_get()))->type ==
+					    &type_ClientError &&
 				    box_error_code(e) == ER_TUPLE_FOUND &&
 				    replication_skip_conflict) {
+					/*
+					 * In case of ER_TUPLE_FOUND error and enabled
+					 * replication_skip_conflict configuration
+					 * option, skip applying the foreign row and
+					 * replace it with NOP in the local write ahead
+					 * log.
+					 */
 					diag_clear(diag_get());
-					struct xrow_header nop;
-					nop.type = IPROTO_NOP;
-					nop.bodycnt = 0;
-					nop.replica_id = row.replica_id;
-					nop.lsn = row.lsn;
-					res = xstream_write(applier->subscribe_stream, &nop);
+					row->type = IPROTO_NOP;
+					row->bodycnt = 0;
+					res = xstream_write(applier->subscribe_stream, row);
 				}
+				++row;
+			}
+			if (res == 0 && txn != NULL)
+				res = txn_commit(txn);
+
+			if (res != 0) {
+				txn_rollback();
+				obuf_reset(data_buf);
+				ibuf_reset(row_buf);
+				latch_unlock(latch);
+				diag_raise();
 			}
 			if (res != 0) {
 				latch_unlock(latch);
 				diag_raise();
 			}
 		}
+		obuf_reset(data_buf);
+		ibuf_reset(row_buf);
 		latch_unlock(latch);
 
 		if (applier->state == APPLIER_SYNC ||
@@ -601,6 +719,9 @@ applier_disconnect(struct applier *applier, enum applier_state state)
 	coio_close(loop(), &applier->io);
 	/* Clear all unparsed input. */
 	ibuf_reinit(&applier->ibuf);
+	ibuf_reinit(&applier->row_buf);
+	obuf_destroy(&applier->data_buf);
+	obuf_create(&applier->data_buf, &cord()->slabc, APPLIER_ROW_DATA_SIZE);
 	fiber_gc();
 }
 
@@ -743,6 +864,9 @@ applier_new(const char *uri, struct xstream *join_stream,
 	}
 	coio_create(&applier->io, -1);
 	ibuf_create(&applier->ibuf, &cord()->slabc, 1024);
+	ibuf_create(&applier->row_buf, &cord()->slabc,
+		    APPLIER_ROW_COUNT * sizeof(struct xrow_header));
+	obuf_create(&applier->data_buf, &cord()->slabc, APPLIER_ROW_DATA_SIZE);
 
 	/* uri_parse() sets pointers to applier->source buffer */
 	snprintf(applier->source, sizeof(applier->source), "%s", uri);
@@ -766,6 +890,8 @@ applier_delete(struct applier *applier)
 {
 	assert(applier->reader == NULL && applier->writer == NULL);
 	ibuf_destroy(&applier->ibuf);
+	ibuf_destroy(&applier->row_buf);
+	obuf_destroy(&applier->data_buf);
 	assert(applier->io.fd == -1);
 	trigger_destroy(&applier->on_state);
 	fiber_cond_destroy(&applier->resume_cond);
diff --git a/src/box/applier.h b/src/box/applier.h
index 5a9c40fc8..f2337fed0 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -36,6 +36,7 @@
 #include <tarantool_ev.h>
 
 #include <small/ibuf.h>
+#include <small/obuf.h>
 
 #include "fiber_cond.h"
 #include "trigger.h"
@@ -120,6 +121,10 @@ struct applier {
 	struct xstream *join_stream;
 	/** xstream to process rows during final JOIN and SUBSCRIBE */
 	struct xstream *subscribe_stream;
+	/** Array to store the current transaction rows. */
+	struct ibuf row_buf;
+	/** Data buffer to store the current transaction row's bodyes. */
+	struct obuf data_buf;
 };
 
 /**
diff --git a/src/box/txn.c b/src/box/txn.c
index 7f4e85b47..5e3c659b9 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -34,6 +34,7 @@
 #include "journal.h"
 #include <fiber.h>
 #include "xrow.h"
+#include "replication.h"
 
 double too_long_threshold;
 
@@ -141,6 +142,7 @@ txn_begin(bool is_autocommit)
 	/* Initialize members explicitly to save time on memset() */
 	stailq_create(&txn->stmts);
 	txn->n_rows = 0;
+	txn->n_remote_rows = 0;
 	txn->is_autocommit = is_autocommit;
 	txn->has_triggers  = false;
 	txn->is_aborted = false;
@@ -233,6 +235,9 @@ txn_commit_stmt(struct txn *txn, struct request *request)
 	if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
 		if (txn_add_redo(stmt, request) != 0)
 			goto fail;
+		if (stmt->row->replica_id != 0 &&
+		    stmt->row->replica_id != instance_id)
+			++txn->n_remote_rows;
 		++txn->n_rows;
 	}
 	/*
@@ -271,14 +276,20 @@ txn_write_to_wal(struct txn *txn)
 		return -1;
 
 	struct txn_stmt *stmt;
-	struct xrow_header **row = req->rows;
+	struct xrow_header **remote_row = req->rows;
+	struct xrow_header **local_row = req->rows + txn->n_remote_rows;
 	stailq_foreach_entry(stmt, &txn->stmts, next) {
 		if (stmt->row == NULL)
 			continue; /* A read (e.g. select) request */
-		*row++ = stmt->row;
+		if (stmt->row->replica_id != 0 &&
+		    stmt->row->replica_id != instance_id)
+			*remote_row++ = stmt->row;
+		else
+			*local_row++ = stmt->row;
 		req->approx_len += xrow_approx_len(stmt->row);
 	}
-	assert(row == req->rows + req->n_rows);
+	assert(remote_row == req->rows + txn->n_remote_rows);
+	assert(local_row == req->rows + req->n_rows);
 
 	ev_tstamp start = ev_monotonic_now(loop());
 	int64_t res = journal_write(req);
diff --git a/src/box/txn.h b/src/box/txn.h
index de5cb0de4..2791fdf73 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -142,6 +142,10 @@ struct txn {
 	struct stailq stmts;
 	/** Total number of WAL rows in this txn. */
 	int n_rows;
+	/**
+	 * Count of rows generated on a remote replica.
+	 */
+	int n_remote_rows;
 	/**
 	 * True if this transaction is running in autocommit mode
 	 * (statement end causes an automatic transaction commit).
diff --git a/test/replication/transaction.result b/test/replication/transaction.result
new file mode 100644
index 000000000..009f84430
--- /dev/null
+++ b/test/replication/transaction.result
@@ -0,0 +1,240 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+---
+...
+_ = s:create_index('pk')
+---
+...
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+---
+- [4, 'r']
+...
+v1 = box.info.vclock
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+---
+...
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- set conflict to third transaction
+box.space.test:delete({3})
+---
+...
+box.space.test:replace({6, 'r'})
+---
+- [6, 'r']
+...
+-- restart replication
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+---
+- false
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+  - [6, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- check restart does not help
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+  - [6, 'r']
+...
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}, replication_skip_conflict = true}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- check last transaction applied without conflicting row
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+---
+- [8, 'r']
+...
+box.space.test:replace({9, 'r'})
+---
+- [9, 'r']
+...
+-- issue a conflicting tx
+test_run:cmd("switch default")
+---
+- true
+...
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- vclock should be increased but rows skipped
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+  - [8, 'r']
+  - [9, 'r']
+...
+-- check restart does not change something
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+  - [8, 'r']
+  - [9, 'r']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+s:drop()
+---
+...
diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
new file mode 100644
index 000000000..47003c644
--- /dev/null
+++ b/test/replication/transaction.test.lua
@@ -0,0 +1,86 @@
+env = require('test_run')
+test_run = env.new()
+box.schema.user.grant('guest', 'replication')
+
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+_ = s:create_index('pk')
+
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+v1 = box.info.vclock
+
+test_run:cmd("switch default")
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+-- set conflict to third transaction
+box.space.test:delete({3})
+box.space.test:replace({6, 'r'})
+-- restart replication
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+
+-- check restart does not help
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+box.cfg{replication = {}, replication_skip_conflict = true}
+box.cfg{replication = replication}
+
+-- check last transaction applied without conflicting row
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+box.space.test:replace({9, 'r'})
+
+-- issue a conflicting tx
+test_run:cmd("switch default")
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- vclock should be increased but rows skipped
+box.space.test:select()
+
+-- check restart does not change something
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+box.schema.user.revoke('guest', 'replication')
+s:drop()
-- 
2.20.1

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [tarantool-patches] [PATCH v5 1/2] Journal transaction boundaries
  2019-02-21 15:29 ` [tarantool-patches] [PATCH v5 1/2] Journal transaction boundaries Georgy Kirichenko
@ 2019-02-21 16:25   ` Vladimir Davydov
  0 siblings, 0 replies; 4+ messages in thread
From: Vladimir Davydov @ 2019-02-21 16:25 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Thu, Feb 21, 2019 at 06:29:16PM +0300, Georgy Kirichenko wrote:
> Append txn_id and is_commit to xrow_header structure, txn_id identifies
> transaction id on replica where transaction was started. As transaction id
> a lsn of the first row in the transaction is used. is_commit is set to true
> for the last row in a transaction.
> 
> As encoding/deconding rule assumed:
>  * txn_id encoded using transaction sequence number iproto field
>    as IPROTO_TSN = lsn - txn_id,
>  * is_commit packed into IPROTO_FLAGS field with a bit mask,
>  * txn_id and is_commit are encoded only for multi-row transactions.
>    So if we do not have txn_id after row decoding then this means that it
>    is a single row transaction.
> 
> These rules provide compatibility with previous xlog format as well
> as good compaction level.
> 
> Needed for: 2798
> ---
>  src/box/iproto_constants.c     |   8 +--
>  src/box/iproto_constants.h     |   7 +++
>  src/box/lua/xlog.c             |  10 +++
>  src/box/wal.c                  |   5 ++
>  src/box/xrow.c                 |  41 +++++++++++++
>  src/box/xrow.h                 |   4 +-
>  test/unit/xrow.cc              |   2 +
>  test/xlog/transaction.result   | 108 +++++++++++++++++++++++++++++++++
>  test/xlog/transaction.test.lua |  46 ++++++++++++++
>  9 files changed, 226 insertions(+), 5 deletions(-)
>  create mode 100644 test/xlog/transaction.result
>  create mode 100644 test/xlog/transaction.test.lua

Pushed this one to 2.1.

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [tarantool-patches] [PATCH v5 2/2] Transaction support for applier
  2019-02-21 15:29 ` [tarantool-patches] [PATCH v5 2/2] Transaction support for applier Georgy Kirichenko
@ 2019-03-01  9:50   ` Vladimir Davydov
  0 siblings, 0 replies; 4+ messages in thread
From: Vladimir Davydov @ 2019-03-01  9:50 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Thu, Feb 21, 2019 at 06:29:17PM +0300, Georgy Kirichenko wrote:
> Applier fetch incoming rows to form a transaction and then apply it.
> In case of  replication all local changes moved to an journal entry
> tail to form a separate transaction (like autonomous transaction)
> to be able to replicate changes back. Applier assumes that transactions
> could not be mixed in a replication stream.

Discussed with Kostja. Here's what he thinks we need to do to commit
this patch:

 1. Get rid of xstream in applier. Should be done in a separate patch.

 2. Instead of using obuf for requests and ibuf for headers, we should
    copy those to fiber->gc.

^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2019-03-01  9:50 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
     [not found] <cover.1550762885.git.georgy@tarantool.org>
2019-02-21 15:29 ` [tarantool-patches] [PATCH v5 1/2] Journal transaction boundaries Georgy Kirichenko
2019-02-21 16:25   ` Vladimir Davydov
2019-02-21 15:29 ` [tarantool-patches] [PATCH v5 2/2] Transaction support for applier Georgy Kirichenko
2019-03-01  9:50   ` Vladimir Davydov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox