[tarantool-patches] [PATCH 3/3] Transaction support for applier
Georgy Kirichenko
georgy at tarantool.org
Sun Mar 3 23:26:18 MSK 2019
Applier fetch incoming rows to form a transaction and then apply it.
In case of replication all local changes moved to an journal entry
tail to form a separate transaction (like autonomous transaction)
to be able to replicate changes back so applier assumes that transactions
could not be mixed in a replication stream.
Closes: #2798
Needed for: #980
---
src/box/applier.cc | 243 ++++++++++++++++++++------
src/box/txn.c | 21 ++-
src/box/txn.h | 4 +
test/replication/transaction.result | 240 +++++++++++++++++++++++++
test/replication/transaction.test.lua | 86 +++++++++
5 files changed, 534 insertions(+), 60 deletions(-)
create mode 100644 test/replication/transaction.result
create mode 100644 test/replication/transaction.test.lua
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 3222b041d..dfabbe5ab 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -48,6 +48,12 @@
#include "session.h"
#include "cfg.h"
#include "box.h"
+#include "txn.h"
+
+enum {
+ /* Initial capacity of rows array. */
+ APPLIER_TX_INITIAL_ROW_COUNT = 16,
+};
STRS(applier_state, applier_STATE);
@@ -380,6 +386,176 @@ applier_join(struct applier *applier)
applier_set_state(applier, APPLIER_READY);
}
+/**
+ * Read one transaction from network using applier's input buffer.
+ * Transaction rows are placed onto fiber gc region.
+ * We could not use applier input buffer for that because rpos is adjusted
+ * after each xrow decoding and corresponding network input space is going
+ * to be reused.
+ *
+ * Return count of transaction rows and put row's header pointers into rows
+ * array.
+ */
+static int
+applier_read_tx(struct applier *applier, struct xrow_header **rows)
+{
+ struct ev_io *coio = &applier->io;
+ struct ibuf *ibuf = &applier->ibuf;
+ int64_t tsn = 0;
+ int row_capacity = APPLIER_TX_INITIAL_ROW_COUNT;
+ struct xrow_header *first_row, *row;
+ first_row = (struct xrow_header *)region_alloc(&fiber()->gc,
+ row_capacity *
+ sizeof(struct xrow_header));
+ if (first_row == NULL) {
+ diag_set(OutOfMemory, sizeof(struct xrow_header) * row_capacity,
+ "region", "struct xrow_header");
+ goto error;
+ }
+ row = first_row;
+
+ do {
+ if (row == first_row + row_capacity) {
+ /* Realloc rows array. */
+ row = (struct xrow_header *)region_alloc(&fiber()->gc,
+ row_capacity *
+ sizeof(struct xrow_header) << 1);
+ if (row == NULL) {
+ diag_set(OutOfMemory,
+ sizeof(struct xrow_header) *
+ row_capacity << 1,
+ "region", "struct xrow_header");
+ goto error;
+ }
+ memcpy(row, first_row, row_capacity *
+ sizeof(struct xrow_header) << 1);
+ first_row = row;
+ row = first_row + row_capacity;
+ row_capacity <<= 1;
+ }
+
+ double timeout = replication_disconnect_timeout();
+ /*
+ * Unfortunately we do not have C-version of coio read xrow
+ * functions yet so use try-catch guard as workaround.
+ */
+ try {
+ /*
+ * Tarantool < 1.7.7 does not send periodic heartbeat
+ * messages so we can't assume that if we haven't heard
+ * from the master for quite a while the connection is
+ * broken - the master might just be idle.
+ */
+ if (applier->version_id < version_id(1, 7, 7))
+ coio_read_xrow(coio, ibuf, row);
+ else
+ coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+ } catch (...) {
+ goto error;
+ }
+
+ if (iproto_type_is_error(row->type)) {
+ xrow_decode_error(row);
+ goto error;
+ }
+
+ /* Replication request. */
+ if (row->replica_id == REPLICA_ID_NIL ||
+ row->replica_id >= VCLOCK_MAX) {
+ /*
+ * A safety net, this can only occur
+ * if we're fed a strangely broken xlog.
+ */
+ diag_set(ClientError, ER_UNKNOWN_REPLICA,
+ int2str(row->replica_id),
+ tt_uuid_str(&REPLICASET_UUID));
+ goto error;
+ }
+ if (row == first_row) {
+ /*
+ * First row in a transaction. In order to enforce
+ * consistency check that first row lsn and replica id
+ * match with transaction.
+ */
+ tsn = row->tsn;
+ if (row->lsn != tsn) {
+ /* There is not a first row in the transactions. */
+ diag_set(ClientError, ER_PROTOCOL,
+ "Not a first row in a transaction");
+ goto error;
+ }
+ }
+ if (tsn != row->tsn) {
+ /* We are not able to handle interleaving transactions. */
+ diag_set(ClientError, ER_UNSUPPORTED,
+ "replications",
+ "interleaving transactions");
+ goto error;
+ }
+
+
+ applier->lag = ev_now(loop()) - row->tm;
+ applier->last_row_time = ev_monotonic_now(loop());
+
+ if (row->body->iov_base != NULL) {
+ /* Save row bodies to gc region. */
+ void *new_base = region_alloc(&fiber()->gc,
+ row->body->iov_len);
+ if (new_base == NULL) {
+ diag_set(OutOfMemory, row->body->iov_len,
+ "slab", "xrow_data");
+ goto error;
+ }
+ memcpy(new_base, row->body->iov_base, row->body->iov_len);
+ /* Adjust row body pointers. */
+ row->body->iov_base = new_base;
+ }
+
+ } while (row->is_commit == 0 && ++row);
+
+ *rows = first_row;
+ return row - first_row + 1;
+error:
+ return -1;
+}
+
+static int
+applier_apply_tx(struct xrow_header *first_row, struct xrow_header *last_row)
+{
+ int res = 0;
+ struct txn *txn = NULL;
+ struct xrow_header *row = first_row;
+ if (first_row != last_row)
+ txn = txn_begin(false);
+ while (row <= last_row && res == 0) {
+ res = apply_row(row);
+ struct error *e;
+ if (res != 0 &&
+ (e = diag_last_error(diag_get()))->type ==
+ &type_ClientError &&
+ box_error_code(e) == ER_TUPLE_FOUND &&
+ replication_skip_conflict) {
+ /*
+ * In case of ER_TUPLE_FOUND error and enabled
+ * replication_skip_conflict configuration
+ * option, skip applying the foreign row and
+ * replace it with NOP in the local write ahead
+ * log.
+ */
+ diag_clear(diag_get());
+ (row)->type = IPROTO_NOP;
+ (row)->bodycnt = 0;
+ res = apply_row(row);
+ }
+ ++row;
+ }
+ if (res == 0 && txn != NULL)
+ res = txn_commit(txn);
+ if (res != 0)
+ txn_rollback();
+ return res;
+}
+
/**
* Execute and process SUBSCRIBE request (follow updates from a master).
*/
@@ -509,36 +685,14 @@ applier_subscribe(struct applier *applier)
applier_set_state(applier, APPLIER_FOLLOW);
}
- /*
- * Tarantool < 1.7.7 does not send periodic heartbeat
- * messages so we can't assume that if we haven't heard
- * from the master for quite a while the connection is
- * broken - the master might just be idle.
- */
- if (applier->version_id < version_id(1, 7, 7)) {
- coio_read_xrow(coio, ibuf, &row);
- } else {
- double timeout = replication_disconnect_timeout();
- coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
- }
+ struct xrow_header *tx_rows;
+ int row_count = applier_read_tx(applier, &tx_rows);
+ if (row_count < 0)
+ diag_raise();
- if (iproto_type_is_error(row.type))
- xrow_decode_error_xc(&row); /* error */
- /* Replication request. */
- if (row.replica_id == REPLICA_ID_NIL ||
- row.replica_id >= VCLOCK_MAX) {
- /*
- * A safety net, this can only occur
- * if we're fed a strangely broken xlog.
- */
- tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
- int2str(row.replica_id),
- tt_uuid_str(&REPLICASET_UUID));
- }
-
- applier->lag = ev_now(loop()) - row.tm;
+ applier->lag = ev_now(loop()) - (tx_rows + row_count - 1)->tm;
applier->last_row_time = ev_monotonic_now(loop());
- struct replica *replica = replica_by_id(row.replica_id);
+ struct replica *replica = replica_by_id(tx_rows->replica_id);
struct latch *latch = (replica ? &replica->order_latch :
&replicaset.applier.order_latch);
/*
@@ -548,33 +702,12 @@ applier_subscribe(struct applier *applier)
* that belong to the same server id.
*/
latch_lock(latch);
- if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
- int res = apply_row(&row);
- if (res != 0) {
- struct error *e = diag_last_error(diag_get());
- /*
- * In case of ER_TUPLE_FOUND error and enabled
- * replication_skip_conflict configuration
- * option, skip applying the foreign row and
- * replace it with NOP in the local write ahead
- * log.
- */
- if (e->type == &type_ClientError &&
- box_error_code(e) == ER_TUPLE_FOUND &&
- replication_skip_conflict) {
- diag_clear(diag_get());
- struct xrow_header nop;
- nop.type = IPROTO_NOP;
- nop.bodycnt = 0;
- nop.replica_id = row.replica_id;
- nop.lsn = row.lsn;
- res = apply_row(&nop);
- }
- }
- if (res != 0) {
- latch_unlock(latch);
- diag_raise();
- }
+ if (vclock_get(&replicaset.vclock,
+ tx_rows->replica_id) < tx_rows->lsn &&
+ applier_apply_tx(tx_rows, tx_rows + row_count - 1) != 0) {
+ latch_unlock(latch);
+ fiber_gc();
+ diag_raise();
}
latch_unlock(latch);
diff --git a/src/box/txn.c b/src/box/txn.c
index 7900fb3ab..f6bf72d0c 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -34,6 +34,7 @@
#include "journal.h"
#include <fiber.h>
#include "xrow.h"
+#include "replication.h"
double too_long_threshold;
@@ -141,6 +142,7 @@ txn_begin(bool is_autocommit)
/* Initialize members explicitly to save time on memset() */
stailq_create(&txn->stmts);
txn->n_rows = 0;
+ txn->n_remote_rows = 0;
txn->is_autocommit = is_autocommit;
txn->has_triggers = false;
txn->is_aborted = false;
@@ -233,6 +235,9 @@ txn_commit_stmt(struct txn *txn, struct request *request)
if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
if (txn_add_redo(stmt, request) != 0)
goto fail;
+ if (stmt->row->replica_id != 0 &&
+ stmt->row->replica_id != instance_id)
+ ++txn->n_remote_rows;
++txn->n_rows;
}
/*
@@ -271,14 +276,20 @@ txn_write_to_wal(struct txn *txn)
return -1;
struct txn_stmt *stmt;
- struct xrow_header **row = req->rows;
+ struct xrow_header **remote_row = req->rows;
+ struct xrow_header **local_row = req->rows + txn->n_remote_rows;
stailq_foreach_entry(stmt, &txn->stmts, next) {
if (stmt->row == NULL)
continue; /* A read (e.g. select) request */
- *row++ = stmt->row;
+ if (stmt->row->replica_id != 0 &&
+ stmt->row->replica_id != instance_id)
+ *remote_row++ = stmt->row;
+ else
+ *local_row++ = stmt->row;
req->approx_len += xrow_approx_len(stmt->row);
}
- assert(row == req->rows + req->n_rows);
+ assert(remote_row == req->rows + txn->n_remote_rows);
+ assert(local_row == req->rows + req->n_rows);
ev_tstamp start = ev_monotonic_now(loop());
int64_t res = journal_write(req);
@@ -399,8 +410,6 @@ txn_rollback()
txn_stmt_unref_tuples(stmt);
TRASH(txn);
- /** Free volatile txn memory. */
- fiber_gc();
fiber_set_txn(fiber(), NULL);
}
@@ -480,6 +489,8 @@ box_txn_rollback()
return -1;
}
txn_rollback(); /* doesn't throw */
+ /** Free volatile txn memory. */
+ fiber_gc();
return 0;
}
diff --git a/src/box/txn.h b/src/box/txn.h
index de5cb0de4..2791fdf73 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -142,6 +142,10 @@ struct txn {
struct stailq stmts;
/** Total number of WAL rows in this txn. */
int n_rows;
+ /**
+ * Count of rows generated on a remote replica.
+ */
+ int n_remote_rows;
/**
* True if this transaction is running in autocommit mode
* (statement end causes an automatic transaction commit).
diff --git a/test/replication/transaction.result b/test/replication/transaction.result
new file mode 100644
index 000000000..009f84430
--- /dev/null
+++ b/test/replication/transaction.result
@@ -0,0 +1,240 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+---
+...
+_ = s:create_index('pk')
+---
+...
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+---
+- [4, 'r']
+...
+v1 = box.info.vclock
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+---
+...
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [4, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- set conflict to third transaction
+box.space.test:delete({3})
+---
+...
+box.space.test:replace({6, 'r'})
+---
+- [6, 'r']
+...
+-- restart replication
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+---
+- false
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [4, 'r']
+ - [6, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- check restart does not help
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [4, 'r']
+ - [6, 'r']
+...
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}, replication_skip_conflict = true}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- check last transaction applied without conflicting row
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'r']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+---
+- [8, 'r']
+...
+box.space.test:replace({9, 'r'})
+---
+- [9, 'r']
+...
+-- issue a conflicting tx
+test_run:cmd("switch default")
+---
+- true
+...
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- vclock should be increased but rows skipped
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'r']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+ - [8, 'r']
+ - [9, 'r']
+...
+-- check restart does not change something
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'r']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+ - [8, 'r']
+ - [9, 'r']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+s:drop()
+---
+...
diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
new file mode 100644
index 000000000..47003c644
--- /dev/null
+++ b/test/replication/transaction.test.lua
@@ -0,0 +1,86 @@
+env = require('test_run')
+test_run = env.new()
+box.schema.user.grant('guest', 'replication')
+
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+_ = s:create_index('pk')
+
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+v1 = box.info.vclock
+
+test_run:cmd("switch default")
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+-- set conflict to third transaction
+box.space.test:delete({3})
+box.space.test:replace({6, 'r'})
+-- restart replication
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+
+-- check restart does not help
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+box.cfg{replication = {}, replication_skip_conflict = true}
+box.cfg{replication = replication}
+
+-- check last transaction applied without conflicting row
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+box.space.test:replace({9, 'r'})
+
+-- issue a conflicting tx
+test_run:cmd("switch default")
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- vclock should be increased but rows skipped
+box.space.test:select()
+
+-- check restart does not change something
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+box.schema.user.revoke('guest', 'replication')
+s:drop()
--
2.21.0
More information about the Tarantool-patches
mailing list