Tarantool development patches archive
 help / color / mirror / Atom feed
From: Georgy Kirichenko <georgy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: Georgy Kirichenko <georgy@tarantool.org>
Subject: [tarantool-patches] [PATCH v2 3/3] Transaction support for applier
Date: Wed,  6 Mar 2019 23:16:18 +0300	[thread overview]
Message-ID: <9fca821d3c43ee5ee928a0ca800d6bd0823cfd06.1551902962.git.georgy@tarantool.org> (raw)
In-Reply-To: <cover.1551902962.git.georgy@tarantool.org>

Applier fetch incoming rows to form a transaction and then apply it.
Rows are fetched and stored on fiber gc region until last transaction row
with is_commit was fetched. After fetch a multi row transaction is going to be
applied into txn_begin/txn_commit/txn_rolback boundaries. At this time
we could not apply single row transaction in such boundaries because of
ddl which does not support non auto commit transactions.

Closes: #2798
Needed for: #980
---
 src/box/applier.cc                    | 211 ++++++++++++++++------
 test/replication/transaction.result   | 240 ++++++++++++++++++++++++++
 test/replication/transaction.test.lua |  86 +++++++++
 3 files changed, 482 insertions(+), 55 deletions(-)
 create mode 100644 test/replication/transaction.result
 create mode 100644 test/replication/transaction.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index a687d2bea..f0a779aa7 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -429,6 +429,146 @@ applier_join(struct applier *applier)
 	applier_set_state(applier, APPLIER_READY);
 }
 
+/**
+ * Helper struct to bind rows in a list.
+ */
+struct xrow_header_item {
+	struct stailq_entry next;
+	struct xrow_header row;
+};
+
+/**
+ * Read one transaction from network using applier's input buffer.
+ * Transaction rows are placed onto fiber gc region.
+ * We could not use applier input buffer for that because rpos is adjusted
+ * after each xrow decoding and corresponding network input space is going
+ * to be reused.
+ */
+static void
+applier_read_tx(struct applier *applier, struct stailq *rows)
+{
+	struct ev_io *coio = &applier->io;
+	struct ibuf *ibuf = &applier->ibuf;
+	int64_t tsn = 0;
+
+	stailq_create(rows);
+	do {
+		struct xrow_header_item *item = (struct xrow_header_item *)
+			region_alloc(&fiber()->gc,
+				     sizeof(struct xrow_header_item));
+		if (item == NULL)
+			tnt_raise(OutOfMemory, sizeof(struct xrow_header_item),
+				  "region", "struct xrow_header_item");
+		stailq_add_tail(rows, &item->next);
+		struct xrow_header *row = &item->row;
+
+		double timeout = replication_disconnect_timeout();
+		/*
+		 * Tarantool < 1.7.7 does not send periodic heartbeat
+		 * messages so we can't assume that if we haven't heard
+		 * from the master for quite a while the connection is
+		 * broken - the master might just be idle.
+		 */
+		if (applier->version_id < version_id(1, 7, 7))
+			coio_read_xrow(coio, ibuf, row);
+		else
+			coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+
+		if (iproto_type_is_error(row->type))
+			xrow_decode_error_xc(row);
+
+		/* Replication request. */
+		if (row->replica_id == REPLICA_ID_NIL ||
+		    row->replica_id >= VCLOCK_MAX) {
+			/*
+			 * A safety net, this can only occur
+			 * if we're fed a strangely broken xlog.
+			 */
+			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
+				  int2str(row->replica_id),
+				  tt_uuid_str(&REPLICASET_UUID));
+		}
+		if (tsn == 0) {
+			/*
+			 * Transaction id must be derived from the log sequence
+			 * number of the first row in the transaction.
+			 */
+			tsn = row->tsn;
+			if (row->lsn != tsn)
+				tnt_raise(ClientError, ER_PROTOCOL,
+					  "Transaction id must be derived from "
+					  "the lsn of the first row in the "
+					  "transaction.");
+		}
+		if (tsn != row->tsn)
+			tnt_raise(ClientError, ER_UNSUPPORTED,
+				  "replication",
+				  "interleaving transactions");
+
+		applier->lag = ev_now(loop()) - row->tm;
+		applier->last_row_time = ev_monotonic_now(loop());
+
+		if (row->body->iov_base != NULL && row->is_commit == false) {
+			/* Save row bodies to gc region. */
+			void *new_base = region_alloc(&fiber()->gc,
+						      row->body->iov_len);
+			if (new_base == NULL)
+				tnt_raise(OutOfMemory, row->body->iov_len,
+					  "region", "xrow body");
+			memcpy(new_base, row->body->iov_base, row->body->iov_len);
+			/* Adjust row body pointers. */
+			row->body->iov_base = new_base;
+		}
+
+	} while (stailq_last_entry(rows, struct xrow_header_item,
+				   next)->row.is_commit == 0);
+}
+
+/**
+ * Apply all rows in the rows queue as a single transaction.
+ *
+ * Return 0 for success or -1 in case of an error.
+ */
+static int
+applier_apply_tx(struct stailq *rows)
+{
+	int res = 0;
+	struct txn *txn = NULL;
+	struct xrow_header_item *item;
+	if (stailq_first(rows) != stailq_last(rows))
+		txn = txn_begin(false);
+	stailq_foreach_entry(item, rows, next) {
+		struct xrow_header *row = &item->row;
+		res = apply_row(row);
+		struct error *e;
+		if (res != 0 &&
+		    (e = diag_last_error(diag_get()))->type ==
+			    &type_ClientError &&
+		    box_error_code(e) == ER_TUPLE_FOUND &&
+		    replication_skip_conflict) {
+			/*
+			 * In case of ER_TUPLE_FOUND error and enabled
+			 * replication_skip_conflict configuration
+			 * option, skip applying the foreign row and
+			 * replace it with NOP in the local write ahead
+			 * log.
+			 */
+			diag_clear(diag_get());
+			(row)->type = IPROTO_NOP;
+			(row)->bodycnt = 0;
+			res = apply_row(row);
+		}
+		++row;
+		if (res != 0)
+			break;
+	}
+	if (res == 0 && txn != NULL)
+		res = txn_commit(txn);
+	if (res != 0)
+		txn_rollback();
+	return res;
+}
+
 /**
  * Execute and process SUBSCRIBE request (follow updates from a master).
  */
@@ -558,36 +698,18 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		/*
-		 * Tarantool < 1.7.7 does not send periodic heartbeat
-		 * messages so we can't assume that if we haven't heard
-		 * from the master for quite a while the connection is
-		 * broken - the master might just be idle.
-		 */
-		if (applier->version_id < version_id(1, 7, 7)) {
-			coio_read_xrow(coio, ibuf, &row);
-		} else {
-			double timeout = replication_disconnect_timeout();
-			coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
-		}
-
-		if (iproto_type_is_error(row.type))
-			xrow_decode_error_xc(&row);  /* error */
-		/* Replication request. */
-		if (row.replica_id == REPLICA_ID_NIL ||
-		    row.replica_id >= VCLOCK_MAX) {
-			/*
-			 * A safety net, this can only occur
-			 * if we're fed a strangely broken xlog.
-			 */
-			tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
-				  int2str(row.replica_id),
-				  tt_uuid_str(&REPLICASET_UUID));
-		}
+		struct stailq rows;
+		applier_read_tx(applier, &rows);
 
-		applier->lag = ev_now(loop()) - row.tm;
+		struct xrow_header *first_row =
+			&stailq_first_entry(&rows, struct xrow_header_item,
+					    next)->row;
+		applier->lag = ev_now(loop()) -
+			       stailq_last_entry(&rows,
+						 struct xrow_header_item,
+						 next)->row.tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-		struct replica *replica = replica_by_id(row.replica_id);
+		struct replica *replica = replica_by_id(first_row->replica_id);
 		struct latch *latch = (replica ? &replica->order_latch :
 				       &replicaset.applier.order_latch);
 		/*
@@ -597,33 +719,12 @@ applier_subscribe(struct applier *applier)
 		 * that belong to the same server id.
 		 */
 		latch_lock(latch);
-		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			int res = apply_row(&row);
-			if (res != 0) {
-				struct error *e = diag_last_error(diag_get());
-				/*
-				 * In case of ER_TUPLE_FOUND error and enabled
-				 * replication_skip_conflict configuration
-				 * option, skip applying the foreign row and
-				 * replace it with NOP in the local write ahead
-				 * log.
-				 */
-				if (e->type == &type_ClientError &&
-				    box_error_code(e) == ER_TUPLE_FOUND &&
-				    replication_skip_conflict) {
-					diag_clear(diag_get());
-					struct xrow_header nop;
-					nop.type = IPROTO_NOP;
-					nop.bodycnt = 0;
-					nop.replica_id = row.replica_id;
-					nop.lsn = row.lsn;
-					res = apply_row(&nop);
-				}
-			}
-			if (res != 0) {
-				latch_unlock(latch);
-				diag_raise();
-			}
+		if (vclock_get(&replicaset.vclock, first_row->replica_id) <
+		    first_row->lsn &&
+		    applier_apply_tx(&rows) != 0) {
+			latch_unlock(latch);
+			fiber_gc();
+			diag_raise();
 		}
 		latch_unlock(latch);
 
diff --git a/test/replication/transaction.result b/test/replication/transaction.result
new file mode 100644
index 000000000..009f84430
--- /dev/null
+++ b/test/replication/transaction.result
@@ -0,0 +1,240 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+---
+...
+_ = s:create_index('pk')
+---
+...
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+---
+- [4, 'r']
+...
+v1 = box.info.vclock
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+---
+...
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- set conflict to third transaction
+box.space.test:delete({3})
+---
+...
+box.space.test:replace({6, 'r'})
+---
+- [6, 'r']
+...
+-- restart replication
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+---
+- false
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+  - [6, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- check restart does not help
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [4, 'r']
+  - [6, 'r']
+...
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}, replication_skip_conflict = true}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- check last transaction applied without conflicting row
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+---
+- [8, 'r']
+...
+box.space.test:replace({9, 'r'})
+---
+- [9, 'r']
+...
+-- issue a conflicting tx
+test_run:cmd("switch default")
+---
+- true
+...
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- vclock should be increased but rows skipped
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+  - [8, 'r']
+  - [9, 'r']
+...
+-- check restart does not change something
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+  - [2, 'm']
+  - [3, 'm']
+  - [4, 'r']
+  - [5, 'm']
+  - [6, 'r']
+  - [7, 'm']
+  - [8, 'r']
+  - [9, 'r']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+s:drop()
+---
+...
diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
new file mode 100644
index 000000000..47003c644
--- /dev/null
+++ b/test/replication/transaction.test.lua
@@ -0,0 +1,86 @@
+env = require('test_run')
+test_run = env.new()
+box.schema.user.grant('guest', 'replication')
+
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+_ = s:create_index('pk')
+
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+v1 = box.info.vclock
+
+test_run:cmd("switch default")
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+-- set conflict to third transaction
+box.space.test:delete({3})
+box.space.test:replace({6, 'r'})
+-- restart replication
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+
+-- check restart does not help
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+box.cfg{replication = {}, replication_skip_conflict = true}
+box.cfg{replication = replication}
+
+-- check last transaction applied without conflicting row
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+box.space.test:replace({9, 'r'})
+
+-- issue a conflicting tx
+test_run:cmd("switch default")
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- vclock should be increased but rows skipped
+box.space.test:select()
+
+-- check restart does not change something
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+box.schema.user.revoke('guest', 'replication')
+s:drop()
-- 
2.21.0

  parent reply	other threads:[~2019-03-06 20:16 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2019-03-06 20:16 [tarantool-patches] [PATCH v2 0/3] Transaction boundaries " Georgy Kirichenko
2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream Georgy Kirichenko
2019-03-07  9:31   ` Vladimir Davydov
2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 2/3] Put all new rows to the end of journal request Georgy Kirichenko
2019-03-07  9:46   ` Vladimir Davydov
2019-03-07 10:38   ` [tarantool-patches] " Konstantin Osipov
2019-03-07 10:53     ` Vladimir Davydov
2019-03-07 11:22       ` Konstantin Osipov
2019-03-06 20:16 ` Georgy Kirichenko [this message]
2019-03-07 10:38   ` [tarantool-patches] [PATCH v2 3/3] Transaction support for applier Vladimir Davydov
2019-03-07 10:40   ` [tarantool-patches] " Konstantin Osipov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=9fca821d3c43ee5ee928a0ca800d6bd0823cfd06.1551902962.git.georgy@tarantool.org \
    --to=georgy@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --subject='Re: [tarantool-patches] [PATCH v2 3/3] Transaction support for applier' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox