* [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries
[not found] <cover.1550001848.git.georgy@tarantool.org>
@ 2019-02-12 20:04 ` Georgy Kirichenko
2019-02-15 13:15 ` Vladimir Davydov
2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 2/2] Transaction support for applier Georgy Kirichenko
1 sibling, 1 reply; 6+ messages in thread
From: Georgy Kirichenko @ 2019-02-12 20:04 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Append txn_id and txn_commit to xrow_header structure, txn_id identifies
transaction id on replica where transaction was started. As transaction id
a lsn of the first row in the transaction is used. txn_commit is set to true
if it is the last row in a transaction, so we could commit transaction by the
last row or by additional NOP requests with txn_commit set as well as
start transaction with NOP and corresponding txn_id. In case of replication
all local changes moved to an journal entry tail to form a separate transaction
(like autonomous transaction) to be able to replicate changes back.
As encoding/deconding rule assumed:
* txn_id and txn_commit are encoded only for multi-row transactions.
So if we do not have txn_id while row decoding then this means that it
is a single row transaction.
* TXN_ID field is differential encoded as lsn - txn_id value
* txn_commit packed into TXN_FLAGS field
These rules provide compatibility with previous xlog format as well
as good compaction level.
Needed for: 2798
---
src/box/iproto_constants.c | 4 ++--
src/box/iproto_constants.h | 7 +++++++
src/box/txn.c | 21 +++++++++++++++++----
src/box/txn.h | 2 ++
src/box/wal.c | 28 ++++++++++++++++++++++++----
src/box/xrow.c | 36 ++++++++++++++++++++++++++++++++++++
src/box/xrow.h | 4 +++-
test/unit/xrow.cc | 2 ++
8 files changed, 93 insertions(+), 11 deletions(-)
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index 7fd295775..4d2e21752 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
/* 0x05 */ MP_UINT, /* IPROTO_SCHEMA_VERSION */
/* 0x06 */ MP_UINT, /* IPROTO_SERVER_VERSION */
/* 0x07 */ MP_UINT, /* IPROTO_GROUP_ID */
+ /* 0x08 */ MP_UINT, /* IPROTO_TXN_ID */
+ /* 0x09 */ MP_UINT, /* IPROTO_TXN_FLAGS */
/* }}} */
/* {{{ unused */
- /* 0x08 */ MP_UINT,
- /* 0x09 */ MP_UINT,
/* 0x0a */ MP_UINT,
/* 0x0b */ MP_UINT,
/* 0x0c */ MP_UINT,
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 728514297..fd80e3111 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -49,6 +49,11 @@ enum {
XLOG_FIXHEADER_SIZE = 19
};
+enum {
+ /** Set for the last xrow in a transaction. */
+ TXN_FLAG_COMMIT = 0x01,
+};
+
enum iproto_key {
IPROTO_REQUEST_TYPE = 0x00,
IPROTO_SYNC = 0x01,
@@ -60,6 +65,8 @@ enum iproto_key {
IPROTO_SCHEMA_VERSION = 0x05,
IPROTO_SERVER_VERSION = 0x06,
IPROTO_GROUP_ID = 0x07,
+ IPROTO_TXN_ID = 0x08,
+ IPROTO_TXN_FLAGS = 0x09,
/* Leave a gap for other keys in the header. */
IPROTO_SPACE_ID = 0x10,
IPROTO_INDEX_ID = 0x11,
diff --git a/src/box/txn.c b/src/box/txn.c
index 7f4e85b47..de0152706 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -34,6 +34,7 @@
#include "journal.h"
#include <fiber.h>
#include "xrow.h"
+#include "replication.h"
double too_long_threshold;
@@ -150,6 +151,7 @@ txn_begin(bool is_autocommit)
txn->engine = NULL;
txn->engine_tx = NULL;
txn->psql_txn = NULL;
+ txn->remote_row_count = 0;
/* fiber_on_yield/fiber_on_stop initialized by engine on demand */
fiber_set_txn(fiber(), txn);
return txn;
@@ -231,6 +233,10 @@ txn_commit_stmt(struct txn *txn, struct request *request)
* stmt->space can be NULL for IRPOTO_NOP.
*/
if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
+ if (request->header &&
+ request->header->replica_id != instance_id &&
+ request->header->replica_id != 0)
+ ++txn->remote_row_count;
if (txn_add_redo(stmt, request) != 0)
goto fail;
++txn->n_rows;
@@ -271,14 +277,21 @@ txn_write_to_wal(struct txn *txn)
return -1;
struct txn_stmt *stmt;
- struct xrow_header **row = req->rows;
+ struct xrow_header **remote_row = req->rows;
+ struct xrow_header **local_row = req->rows + txn->remote_row_count;
stailq_foreach_entry(stmt, &txn->stmts, next) {
if (stmt->row == NULL)
- continue; /* A read (e.g. select) request */
- *row++ = stmt->row;
+ /* A read (e.g. select) request */
+ continue;
+ if (stmt->row->replica_id == instance_id ||
+ stmt->row->replica_id == 0)
+ *local_row++ = stmt->row;
+ else
+ *remote_row++ = stmt->row;
req->approx_len += xrow_approx_len(stmt->row);
}
- assert(row == req->rows + req->n_rows);
+ assert(remote_row == req->rows + txn->remote_row_count);
+ assert(local_row == req->rows + req->n_rows);
ev_tstamp start = ev_monotonic_now(loop());
int64_t res = journal_write(req);
diff --git a/src/box/txn.h b/src/box/txn.h
index de5cb0de4..143f21715 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -180,6 +180,8 @@ struct txn {
/** Commit and rollback triggers */
struct rlist on_commit, on_rollback;
struct sql_txn *psql_txn;
+ /** Count of remote rows. */
+ uint32_t remote_row_count;
};
/* Pointer to the current transaction (if any) */
diff --git a/src/box/wal.c b/src/box/wal.c
index cdcaabc00..0ea3be68d 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -896,10 +896,11 @@ wal_writer_begin_rollback(struct wal_writer *writer)
cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback);
}
-static void
-wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
+static int
+wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
struct xrow_header **end)
{
+ struct xrow_header **row = begin;
/** Assign LSN to all local rows. */
for ( ; row < end; row++) {
if ((*row)->replica_id == 0) {
@@ -909,6 +910,21 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
vclock_follow_xrow(vclock, *row);
}
}
+ while (begin < end && begin[0]->replica_id != instance_id)
+ ++begin;
+ /* Setup txn_id and tnx_replica_id for locally generated rows. */
+ row = begin;
+ while (row < end) {
+ if (row[0]->replica_id != instance_id) {
+ diag_set(ClientError, ER_UNSUPPORTED,
+ "Interleaved transactions");
+ return -1;
+ }
+ row[0]->txn_id = begin[0]->lsn;
+ row[0]->txn_commit = row == end - 1 ? 1 : 0;
+ ++row;
+ }
+ return 0;
}
static void
@@ -979,7 +995,9 @@ wal_write_to_disk(struct cmsg *msg)
struct journal_entry *entry;
struct stailq_entry *last_committed = NULL;
stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
- wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
+ if (wal_assign_lsn(&vclock, entry->rows,
+ entry->rows + entry->n_rows) < 0)
+ goto done;
entry->res = vclock_sum(&vclock);
rc = xlog_write_entry(l, entry);
if (rc < 0)
@@ -1173,7 +1191,9 @@ wal_write_in_wal_mode_none(struct journal *journal,
struct journal_entry *entry)
{
struct wal_writer *writer = (struct wal_writer *) journal;
- wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
+ if (wal_assign_lsn(&writer->vclock, entry->rows,
+ entry->rows + entry->n_rows) != 0)
+ return -1;
vclock_copy(&replicaset.vclock, &writer->vclock);
return vclock_sum(&writer->vclock);
}
diff --git a/src/box/xrow.c b/src/box/xrow.c
index fec8873d0..29fa75de4 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -102,6 +102,8 @@ error:
if (mp_typeof(**pos) != MP_MAP)
goto error;
+ bool txn_is_set = false;
+ uint32_t txn_flags = 0;
uint32_t size = mp_decode_map(pos);
for (uint32_t i = 0; i < size; i++) {
@@ -133,12 +135,32 @@ error:
case IPROTO_SCHEMA_VERSION:
header->schema_version = mp_decode_uint(pos);
break;
+ case IPROTO_TXN_ID:
+ txn_is_set = true;
+ header->txn_id = mp_decode_uint(pos);
+ break;
+ case IPROTO_TXN_FLAGS:
+ txn_flags = mp_decode_uint(pos);
+ header->txn_commit = txn_flags & TXN_FLAG_COMMIT;
+ if ((txn_flags & ~TXN_FLAG_COMMIT) != 0)
+ /* Unknow flags. */
+ goto error;
+ break;
default:
/* unknown header */
mp_next(pos);
}
}
assert(*pos <= end);
+ if (!txn_is_set) {
+ /*
+ * Transaction id is not set so it is a single statement
+ * transaction.
+ */
+ header->txn_commit = true;
+ }
+ header->txn_id = header->lsn + header->txn_id;
+
/* Nop requests aren't supposed to have a body. */
if (*pos < end && header->type != IPROTO_NOP) {
const char *body = *pos;
@@ -223,6 +245,20 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
d = mp_encode_double(d, header->tm);
map_size++;
}
+ if (header->txn_id != 0) {
+ if (header->txn_id != header->lsn || header->txn_commit == 0) {
+ /* Encode txn id for multi row transaction members. */
+ d = mp_encode_uint(d, IPROTO_TXN_ID);
+ d = mp_encode_uint(d, header->lsn - header->txn_id);
+ map_size++;
+ }
+ if (header->txn_commit && header->txn_id != header->lsn) {
+ /* Setup last row for multi row transaction. */
+ d = mp_encode_uint(d, IPROTO_TXN_FLAGS);
+ d = mp_encode_uint(d, TXN_FLAG_COMMIT);
+ map_size++;
+ }
+ }
assert(d <= data + XROW_HEADER_LEN_MAX);
mp_encode_map(data, map_size);
out->iov_len = d - (char *) out->iov_base;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 719add4f0..bc4c4a2d7 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -47,7 +47,7 @@ enum {
XROW_HEADER_IOVMAX = 1,
XROW_BODY_IOVMAX = 2,
XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
- XROW_HEADER_LEN_MAX = 40,
+ XROW_HEADER_LEN_MAX = 52,
XROW_BODY_LEN_MAX = 128,
IPROTO_HEADER_LEN = 28,
/** 7 = sizeof(iproto_body_bin). */
@@ -63,6 +63,8 @@ struct xrow_header {
uint64_t sync;
int64_t lsn; /* LSN must be signed for correct comparison */
double tm;
+ int64_t txn_id;
+ bool txn_commit;
int bodycnt;
uint32_t schema_version;
diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
index 022d1f998..bc99285de 100644
--- a/test/unit/xrow.cc
+++ b/test/unit/xrow.cc
@@ -215,6 +215,8 @@ test_xrow_header_encode_decode()
header.lsn = 400;
header.tm = 123.456;
header.bodycnt = 0;
+ header.txn_id = header.lsn;
+ header.txn_commit = true;
uint64_t sync = 100500;
struct iovec vec[1];
is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
--
2.20.1
^ permalink raw reply [flat|nested] 6+ messages in thread
* [tarantool-patches] [PATCH v3 2/2] Transaction support for applier
[not found] <cover.1550001848.git.georgy@tarantool.org>
2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries Georgy Kirichenko
@ 2019-02-12 20:04 ` Georgy Kirichenko
2019-02-18 9:36 ` Vladimir Davydov
1 sibling, 1 reply; 6+ messages in thread
From: Georgy Kirichenko @ 2019-02-12 20:04 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Applier fetch incoming rows to form a transaction and then apply it.
Implementation assumes that transaction could not mix in a
replication stream. Also distributed transaction are not supported yet.
Closes: #2798
Needed for: #980
---
src/box/applier.cc | 185 +++++++++++++++-----
test/replication/transaction.result | 240 ++++++++++++++++++++++++++
test/replication/transaction.test.lua | 86 +++++++++
3 files changed, 471 insertions(+), 40 deletions(-)
create mode 100644 test/replication/transaction.result
create mode 100644 test/replication/transaction.test.lua
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 7f37fe2ee..59c33bb84 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -48,6 +48,7 @@
#include "error.h"
#include "session.h"
#include "cfg.h"
+#include "txn.h"
STRS(applier_state, applier_STATE);
@@ -378,6 +379,105 @@ applier_join(struct applier *applier)
applier_set_state(applier, APPLIER_READY);
}
+/**
+ * Read one transaction from network.
+ * Transaction rows are placed into row_buf as an array, row's bodies are
+ * placed into obuf because it is not allowed to relocate row's bodies.
+ * Also we could not use applier input buffer because rpos adjusted after xrow
+ * decoding and corresponding space going to reuse.
+ *
+ * Note: current implementation grants that transaction could not be mixed, so
+ * we read each transaction from first xrow until xrow with txn_last = true.
+ */
+static int64_t
+applier_read_tx(struct applier *applier, struct ibuf *row_buf,
+ struct obuf *data_buf)
+{
+ struct xrow_header *row;
+ struct ev_io *coio = &applier->io;
+ struct ibuf *ibuf = &applier->ibuf;
+ int64_t txn_id = 0;
+
+ do {
+ row = (struct xrow_header *)ibuf_alloc(row_buf,
+ sizeof(struct xrow_header));
+ if (row == NULL) {
+ diag_set(OutOfMemory, sizeof(struct xrow_header),
+ "slab", "struct xrow_header");
+ goto error;
+ }
+
+ double timeout = replication_disconnect_timeout();
+ try {
+ /* TODO: we should have a C version of this function. */
+ coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+ } catch (...) {
+ goto error;
+ }
+
+ if (iproto_type_is_error(row->type)) {
+ xrow_decode_error(row);
+ goto error;
+ }
+
+ /* Replication request. */
+ if (row->replica_id == REPLICA_ID_NIL ||
+ row->replica_id >= VCLOCK_MAX) {
+ /*
+ * A safety net, this can only occur
+ * if we're fed a strangely broken xlog.
+ */
+ diag_set(ClientError, ER_UNKNOWN_REPLICA,
+ int2str(row->replica_id),
+ tt_uuid_str(&REPLICASET_UUID));
+ goto error;
+ }
+ if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
+ /*
+ * First row in a transaction. In order to enforce
+ * consistency check that first row lsn and replica id
+ * match with transaction.
+ */
+ txn_id = row->txn_id;
+ if (row->lsn != txn_id) {
+ /* There is not a first row in the transactions. */
+ diag_set(ClientError, ER_PROTOCOL,
+ "Not a first row in a transaction");
+ goto error;
+ }
+ }
+ if (txn_id != row->txn_id) {
+ /* We are not able to handle interleaving transactions. */
+ diag_set(ClientError, ER_UNSUPPORTED,
+ "replications",
+ "interleaving transactions");
+ goto error;
+ }
+
+
+ applier->lag = ev_now(loop()) - row->tm;
+ applier->last_row_time = ev_monotonic_now(loop());
+
+ if (row->body->iov_base != NULL) {
+ void *new_base = obuf_alloc(data_buf, row->body->iov_len);
+ if (new_base == NULL) {
+ diag_set(OutOfMemory, row->body->iov_len,
+ "slab", "xrow_data");
+ goto error;
+ }
+ memcpy(new_base, row->body->iov_base, row->body->iov_len);
+ row->body->iov_base = new_base;
+ }
+
+ } while (row->txn_commit == 0);
+
+ return 0;
+error:
+ ibuf_reset(row_buf);
+ obuf_reset(data_buf);
+ return -1;
+}
+
/**
* Execute and process SUBSCRIBE request (follow updates from a master).
*/
@@ -392,6 +492,10 @@ applier_subscribe(struct applier *applier)
struct xrow_header row;
struct vclock remote_vclock_at_subscribe;
struct tt_uuid cluster_id = uuid_nil;
+ struct ibuf row_buf;
+ struct obuf data_buf;
+ ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header));
+ obuf_create(&data_buf, &cord()->slabc, 0x10000);
xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
&replicaset.vclock);
@@ -501,36 +605,16 @@ applier_subscribe(struct applier *applier)
applier_set_state(applier, APPLIER_FOLLOW);
}
- /*
- * Tarantool < 1.7.7 does not send periodic heartbeat
- * messages so we can't assume that if we haven't heard
- * from the master for quite a while the connection is
- * broken - the master might just be idle.
- */
- if (applier->version_id < version_id(1, 7, 7)) {
- coio_read_xrow(coio, ibuf, &row);
- } else {
- double timeout = replication_disconnect_timeout();
- coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
- }
+ if (applier_read_tx(applier, &row_buf, &data_buf) != 0)
+ diag_raise();
- if (iproto_type_is_error(row.type))
- xrow_decode_error_xc(&row); /* error */
- /* Replication request. */
- if (row.replica_id == REPLICA_ID_NIL ||
- row.replica_id >= VCLOCK_MAX) {
- /*
- * A safety net, this can only occur
- * if we're fed a strangely broken xlog.
- */
- tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
- int2str(row.replica_id),
- tt_uuid_str(&REPLICASET_UUID));
- }
+ struct txn *txn = NULL;
+ struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos;
+ struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1;
- applier->lag = ev_now(loop()) - row.tm;
+ applier->lag = ev_now(loop()) - last_row->tm;
applier->last_row_time = ev_monotonic_now(loop());
- struct replica *replica = replica_by_id(row.replica_id);
+ struct replica *replica = replica_by_id(first_row->replica_id);
struct latch *latch = (replica ? &replica->order_latch :
&replicaset.applier.order_latch);
/*
@@ -540,24 +624,45 @@ applier_subscribe(struct applier *applier)
* that belong to the same server id.
*/
latch_lock(latch);
- if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
- int res = xstream_write(applier->subscribe_stream, &row);
- if (res != 0) {
- struct error *e = diag_last_error(diag_get());
- /**
- * Silently skip ER_TUPLE_FOUND error if such
- * option is set in config.
- */
- if (e->type == &type_ClientError &&
+ if (vclock_get(&replicaset.vclock,
+ first_row->replica_id) < first_row->lsn) {
+ struct xrow_header *row = first_row;
+ if (first_row != last_row)
+ txn = txn_begin(false);
+ int res = 0;
+ while (row <= last_row && res == 0) {
+ res = xstream_write(applier->subscribe_stream, row);
+ struct error *e;
+ if (res != 0 &&
+ (e = diag_last_error(diag_get()))->type ==
+ &type_ClientError &&
box_error_code(e) == ER_TUPLE_FOUND &&
- replication_skip_conflict)
+ replication_skip_conflict) {
+ /**
+ * Silently skip ER_TUPLE_FOUND error
+ * if such option is set in config.
+ */
diag_clear(diag_get());
- else {
- latch_unlock(latch);
- diag_raise();
+ row->type = IPROTO_NOP;
+ row->bodycnt = 0;
+ res = xstream_write(applier->subscribe_stream,
+ row);
}
+ ++row;
+ }
+ if (res == 0 && txn != NULL)
+ res = txn_commit(txn);
+
+ if (res != 0) {
+ txn_rollback();
+ obuf_reset(&data_buf);
+ ibuf_reset(&row_buf);
+ latch_unlock(latch);
+ diag_raise();
}
}
+ obuf_reset(&data_buf);
+ ibuf_reset(&row_buf);
latch_unlock(latch);
if (applier->state == APPLIER_SYNC ||
diff --git a/test/replication/transaction.result b/test/replication/transaction.result
new file mode 100644
index 000000000..009f84430
--- /dev/null
+++ b/test/replication/transaction.result
@@ -0,0 +1,240 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+---
+...
+_ = s:create_index('pk')
+---
+...
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+---
+- [4, 'r']
+...
+v1 = box.info.vclock
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+---
+...
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [4, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- set conflict to third transaction
+box.space.test:delete({3})
+---
+...
+box.space.test:replace({6, 'r'})
+---
+- [6, 'r']
+...
+-- restart replication
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+---
+- false
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [4, 'r']
+ - [6, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- check restart does not help
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [4, 'r']
+ - [6, 'r']
+...
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}, replication_skip_conflict = true}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- check last transaction applied without conflicting row
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'r']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+---
+- [8, 'r']
+...
+box.space.test:replace({9, 'r'})
+---
+- [9, 'r']
+...
+-- issue a conflicting tx
+test_run:cmd("switch default")
+---
+- true
+...
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- vclock should be increased but rows skipped
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'r']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+ - [8, 'r']
+ - [9, 'r']
+...
+-- check restart does not change something
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'r']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+ - [8, 'r']
+ - [9, 'r']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+s:drop()
+---
+...
diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
new file mode 100644
index 000000000..47003c644
--- /dev/null
+++ b/test/replication/transaction.test.lua
@@ -0,0 +1,86 @@
+env = require('test_run')
+test_run = env.new()
+box.schema.user.grant('guest', 'replication')
+
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+_ = s:create_index('pk')
+
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+v1 = box.info.vclock
+
+test_run:cmd("switch default")
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+-- set conflict to third transaction
+box.space.test:delete({3})
+box.space.test:replace({6, 'r'})
+-- restart replication
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+
+-- check restart does not help
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+box.cfg{replication = {}, replication_skip_conflict = true}
+box.cfg{replication = replication}
+
+-- check last transaction applied without conflicting row
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+box.space.test:replace({9, 'r'})
+
+-- issue a conflicting tx
+test_run:cmd("switch default")
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- vclock should be increased but rows skipped
+box.space.test:select()
+
+-- check restart does not change something
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+box.schema.user.revoke('guest', 'replication')
+s:drop()
--
2.20.1
^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries
2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries Georgy Kirichenko
@ 2019-02-15 13:15 ` Vladimir Davydov
2019-02-19 14:59 ` [tarantool-patches] " Konstantin Osipov
0 siblings, 1 reply; 6+ messages in thread
From: Vladimir Davydov @ 2019-02-15 13:15 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Tue, Feb 12, 2019 at 11:04:31PM +0300, Georgy Kirichenko wrote:
> Append txn_id and txn_commit to xrow_header structure, txn_id identifies
> transaction id on replica where transaction was started. As transaction id
> a lsn of the first row in the transaction is used. txn_commit is set to true
> if it is the last row in a transaction, so we could commit transaction by the
> last row or by additional NOP requests with txn_commit set as well as
> start transaction with NOP and corresponding txn_id. In case of replication
> all local changes moved to an journal entry tail to form a separate transaction
> (like autonomous transaction) to be able to replicate changes back.
>
> As encoding/deconding rule assumed:
> * txn_id and txn_commit are encoded only for multi-row transactions.
> So if we do not have txn_id while row decoding then this means that it
> is a single row transaction.
> * TXN_ID field is differential encoded as lsn - txn_id value
> * txn_commit packed into TXN_FLAGS field
>
> These rules provide compatibility with previous xlog format as well
> as good compaction level.
>
> Needed for: 2798
> ---
> src/box/iproto_constants.c | 4 ++--
> src/box/iproto_constants.h | 7 +++++++
> src/box/txn.c | 21 +++++++++++++++++----
> src/box/txn.h | 2 ++
> src/box/wal.c | 28 ++++++++++++++++++++++++----
> src/box/xrow.c | 36 ++++++++++++++++++++++++++++++++++++
> src/box/xrow.h | 4 +++-
> test/unit/xrow.cc | 2 ++
> 8 files changed, 93 insertions(+), 11 deletions(-)
>
> diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
> index 7fd295775..4d2e21752 100644
> --- a/src/box/iproto_constants.c
> +++ b/src/box/iproto_constants.c
> @@ -41,11 +41,11 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
> /* 0x05 */ MP_UINT, /* IPROTO_SCHEMA_VERSION */
> /* 0x06 */ MP_UINT, /* IPROTO_SERVER_VERSION */
> /* 0x07 */ MP_UINT, /* IPROTO_GROUP_ID */
> + /* 0x08 */ MP_UINT, /* IPROTO_TXN_ID */
> + /* 0x09 */ MP_UINT, /* IPROTO_TXN_FLAGS */
I don't quite like the name, because we encode not a txn id, but the
statement number in a txn. Let's rename it to IPROTO_TXN_STMT_NO or
IPROTO_TXN_STMT_ID or IPROTO_TXN_STMT_SEQ?
xrow_header::txn_id is OK though.
> /* }}} */
>
> /* {{{ unused */
> - /* 0x08 */ MP_UINT,
> - /* 0x09 */ MP_UINT,
> /* 0x0a */ MP_UINT,
> /* 0x0b */ MP_UINT,
> /* 0x0c */ MP_UINT,
You forgot to patch iproto_key_strs.
Let's please add a test that ensures that those new headers are written
to xlogs. You could use xlog reader for that.
> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index 728514297..fd80e3111 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -49,6 +49,11 @@ enum {
> XLOG_FIXHEADER_SIZE = 19
> };
>
/* IPROTO_TXN_FLAGS bits. */
> +enum {
> + /** Set for the last xrow in a transaction. */
> + TXN_FLAG_COMMIT = 0x01,
IPROTO_TXN_COMMIT?
> +};
> +
> enum iproto_key {
> IPROTO_REQUEST_TYPE = 0x00,
> IPROTO_SYNC = 0x01,
> @@ -60,6 +65,8 @@ enum iproto_key {
> IPROTO_SCHEMA_VERSION = 0x05,
> IPROTO_SERVER_VERSION = 0x06,
> IPROTO_GROUP_ID = 0x07,
> + IPROTO_TXN_ID = 0x08,
> + IPROTO_TXN_FLAGS = 0x09,
> /* Leave a gap for other keys in the header. */
> IPROTO_SPACE_ID = 0x10,
> IPROTO_INDEX_ID = 0x11,
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 7f4e85b47..de0152706 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -34,6 +34,7 @@
> #include "journal.h"
> #include <fiber.h>
> #include "xrow.h"
> +#include "replication.h"
>
> double too_long_threshold;
>
> @@ -150,6 +151,7 @@ txn_begin(bool is_autocommit)
> txn->engine = NULL;
> txn->engine_tx = NULL;
> txn->psql_txn = NULL;
> + txn->remote_row_count = 0;
Nit: let's rename it to n_remote_rows to match n_rows and keep the two
member initializers together.
> /* fiber_on_yield/fiber_on_stop initialized by engine on demand */
> fiber_set_txn(fiber(), txn);
> return txn;
> @@ -231,6 +233,10 @@ txn_commit_stmt(struct txn *txn, struct request *request)
> * stmt->space can be NULL for IRPOTO_NOP.
> */
> if (stmt->space == NULL || !space_is_temporary(stmt->space)) {
> + if (request->header &&
> + request->header->replica_id != instance_id &&
> + request->header->replica_id != 0)
> + ++txn->remote_row_count;
Nit: if we moved this after txn_add_redo(), then we wouldn't have to
check if request->header is set.
> if (txn_add_redo(stmt, request) != 0)
> goto fail;
> ++txn->n_rows;
> @@ -271,14 +277,21 @@ txn_write_to_wal(struct txn *txn)
> return -1;
>
> struct txn_stmt *stmt;
> - struct xrow_header **row = req->rows;
> + struct xrow_header **remote_row = req->rows;
> + struct xrow_header **local_row = req->rows + txn->remote_row_count;
> stailq_foreach_entry(stmt, &txn->stmts, next) {
> if (stmt->row == NULL)
> - continue; /* A read (e.g. select) request */
> - *row++ = stmt->row;
> + /* A read (e.g. select) request */
> + continue;
Nit: pointless change, please remove.
> + if (stmt->row->replica_id == instance_id ||
> + stmt->row->replica_id == 0)
> + *local_row++ = stmt->row;
> + else
> + *remote_row++ = stmt->row;
This piece of code looks nice, but it definitely needs a comment: what
we do, why we do that...
Anyway, we should add a test for this change. May be, it's even worth
submitting this change in a separate patch.
> req->approx_len += xrow_approx_len(stmt->row);
> }
> - assert(row == req->rows + req->n_rows);
> + assert(remote_row == req->rows + txn->remote_row_count);
> + assert(local_row == req->rows + req->n_rows);
>
> ev_tstamp start = ev_monotonic_now(loop());
> int64_t res = journal_write(req);
> diff --git a/src/box/txn.h b/src/box/txn.h
> index de5cb0de4..143f21715 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -180,6 +180,8 @@ struct txn {
> /** Commit and rollback triggers */
> struct rlist on_commit, on_rollback;
> struct sql_txn *psql_txn;
> + /** Count of remote rows. */
> + uint32_t remote_row_count;
Nit: please use int rather than uint32_t and move the definition after
n_rows, because those are closely related.
> };
>
> /* Pointer to the current transaction (if any) */
> diff --git a/src/box/wal.c b/src/box/wal.c
> index cdcaabc00..0ea3be68d 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -896,10 +896,11 @@ wal_writer_begin_rollback(struct wal_writer *writer)
> cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback);
> }
>
> -static void
> -wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> +static int
> +wal_assign_lsn(struct vclock *vclock, struct xrow_header **begin,
> struct xrow_header **end)
> {
> + struct xrow_header **row = begin;
> /** Assign LSN to all local rows. */
> for ( ; row < end; row++) {
> if ((*row)->replica_id == 0) {
> @@ -909,6 +910,21 @@ wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> vclock_follow_xrow(vclock, *row);
> }
> }
> + while (begin < end && begin[0]->replica_id != instance_id)
> + ++begin;
> + /* Setup txn_id and tnx_replica_id for locally generated rows. */
> + row = begin;
> + while (row < end) {
> + if (row[0]->replica_id != instance_id) {
> + diag_set(ClientError, ER_UNSUPPORTED,
> + "Interleaved transactions");
> + return -1;
Do we really need to bother about it here, in WAL? IMO a check in
applier would be enough.
> + }
> + row[0]->txn_id = begin[0]->lsn;
> + row[0]->txn_commit = row == end - 1 ? 1 : 0;
> + ++row;
Why can't we do this while we are iterating over rows just a few lines
above, assigning LSNs?
> + }
> + return 0;
> }
>
> static void
> @@ -979,7 +995,9 @@ wal_write_to_disk(struct cmsg *msg)
> struct journal_entry *entry;
> struct stailq_entry *last_committed = NULL;
> stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
> - wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
> + if (wal_assign_lsn(&vclock, entry->rows,
> + entry->rows + entry->n_rows) < 0)
> + goto done;
> entry->res = vclock_sum(&vclock);
> rc = xlog_write_entry(l, entry);
> if (rc < 0)
> @@ -1173,7 +1191,9 @@ wal_write_in_wal_mode_none(struct journal *journal,
> struct journal_entry *entry)
> {
> struct wal_writer *writer = (struct wal_writer *) journal;
> - wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
> + if (wal_assign_lsn(&writer->vclock, entry->rows,
> + entry->rows + entry->n_rows) != 0)
> + return -1;
> vclock_copy(&replicaset.vclock, &writer->vclock);
> return vclock_sum(&writer->vclock);
> }
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index fec8873d0..29fa75de4 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -102,6 +102,8 @@ error:
>
> if (mp_typeof(**pos) != MP_MAP)
> goto error;
> + bool txn_is_set = false;
> + uint32_t txn_flags = 0;
>
> uint32_t size = mp_decode_map(pos);
> for (uint32_t i = 0; i < size; i++) {
> @@ -133,12 +135,32 @@ error:
> case IPROTO_SCHEMA_VERSION:
> header->schema_version = mp_decode_uint(pos);
> break;
> + case IPROTO_TXN_ID:
> + txn_is_set = true;
> + header->txn_id = mp_decode_uint(pos);
> + break;
> + case IPROTO_TXN_FLAGS:
> + txn_flags = mp_decode_uint(pos);
> + header->txn_commit = txn_flags & TXN_FLAG_COMMIT;
> + if ((txn_flags & ~TXN_FLAG_COMMIT) != 0)
> + /* Unknow flags. */
> + goto error;
We silently ignore unknown headers. I think we can silently ignore
unknown flags as well.
> + break;
> default:
> /* unknown header */
> mp_next(pos);
> }
> }
> assert(*pos <= end);
> + if (!txn_is_set) {
> + /*
> + * Transaction id is not set so it is a single statement
> + * transaction.
> + */
> + header->txn_commit = true;
> + }
> + header->txn_id = header->lsn + header->txn_id;
> +
> /* Nop requests aren't supposed to have a body. */
> if (*pos < end && header->type != IPROTO_NOP) {
> const char *body = *pos;
> @@ -223,6 +245,20 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
> d = mp_encode_double(d, header->tm);
> map_size++;
> }
> + if (header->txn_id != 0) {
> + if (header->txn_id != header->lsn || header->txn_commit == 0) {
Nit: txn_commit is a bool so
s/header->txn_commit == 0/!header->txn_commit
> + /* Encode txn id for multi row transaction members. */
> + d = mp_encode_uint(d, IPROTO_TXN_ID);
> + d = mp_encode_uint(d, header->lsn - header->txn_id);
> + map_size++;
> + }
> + if (header->txn_commit && header->txn_id != header->lsn) {
> + /* Setup last row for multi row transaction. */
> + d = mp_encode_uint(d, IPROTO_TXN_FLAGS);
> + d = mp_encode_uint(d, TXN_FLAG_COMMIT);
> + map_size++;
> + }
> + }
> assert(d <= data + XROW_HEADER_LEN_MAX);
> mp_encode_map(data, map_size);
> out->iov_len = d - (char *) out->iov_base;
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 719add4f0..bc4c4a2d7 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -47,7 +47,7 @@ enum {
> XROW_HEADER_IOVMAX = 1,
> XROW_BODY_IOVMAX = 2,
> XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
> - XROW_HEADER_LEN_MAX = 40,
> + XROW_HEADER_LEN_MAX = 52,
> XROW_BODY_LEN_MAX = 128,
> IPROTO_HEADER_LEN = 28,
> /** 7 = sizeof(iproto_body_bin). */
> @@ -63,6 +63,8 @@ struct xrow_header {
> uint64_t sync;
> int64_t lsn; /* LSN must be signed for correct comparison */
> double tm;
> + int64_t txn_id;
> + bool txn_commit;
Please add a comment explaining how these are mapped to IPROTO headers.
>
> int bodycnt;
> uint32_t schema_version;
> diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
> index 022d1f998..bc99285de 100644
> --- a/test/unit/xrow.cc
> +++ b/test/unit/xrow.cc
> @@ -215,6 +215,8 @@ test_xrow_header_encode_decode()
> header.lsn = 400;
> header.tm = 123.456;
> header.bodycnt = 0;
> + header.txn_id = header.lsn;
> + header.txn_commit = true;
> uint64_t sync = 100500;
> struct iovec vec[1];
> is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [tarantool-patches] [PATCH v3 2/2] Transaction support for applier
2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 2/2] Transaction support for applier Georgy Kirichenko
@ 2019-02-18 9:36 ` Vladimir Davydov
0 siblings, 0 replies; 6+ messages in thread
From: Vladimir Davydov @ 2019-02-18 9:36 UTC (permalink / raw)
To: Georgy Kirichenko; +Cc: tarantool-patches
On Tue, Feb 12, 2019 at 11:04:32PM +0300, Georgy Kirichenko wrote:
> Applier fetch incoming rows to form a transaction and then apply it.
> Implementation assumes that transaction could not mix in a
> replication stream. Also distributed transaction are not supported yet.
>
> Closes: #2798
> Needed for: #980
> ---
> src/box/applier.cc | 185 +++++++++++++++-----
> test/replication/transaction.result | 240 ++++++++++++++++++++++++++
> test/replication/transaction.test.lua | 86 +++++++++
> 3 files changed, 471 insertions(+), 40 deletions(-)
> create mode 100644 test/replication/transaction.result
> create mode 100644 test/replication/transaction.test.lua
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 7f37fe2ee..59c33bb84 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -48,6 +48,7 @@
> #include "error.h"
> #include "session.h"
> #include "cfg.h"
> +#include "txn.h"
>
> STRS(applier_state, applier_STATE);
>
> @@ -378,6 +379,105 @@ applier_join(struct applier *applier)
> applier_set_state(applier, APPLIER_READY);
> }
>
> +/**
> + * Read one transaction from network.
> + * Transaction rows are placed into row_buf as an array, row's bodies are
> + * placed into obuf because it is not allowed to relocate row's bodies.
> + * Also we could not use applier input buffer because rpos adjusted after xrow
> + * decoding and corresponding space going to reuse.
> + *
> + * Note: current implementation grants that transaction could not be mixed, so
> + * we read each transaction from first xrow until xrow with txn_last = true.
> + */
> +static int64_t
> +applier_read_tx(struct applier *applier, struct ibuf *row_buf,
> + struct obuf *data_buf)
> +{
> + struct xrow_header *row;
> + struct ev_io *coio = &applier->io;
> + struct ibuf *ibuf = &applier->ibuf;
> + int64_t txn_id = 0;
> +
> + do {
> + row = (struct xrow_header *)ibuf_alloc(row_buf,
> + sizeof(struct xrow_header));
Nit: the line's too for no reason, and there are more lines like that.
Please fix them where you can without making the code look ugly.
> + if (row == NULL) {
> + diag_set(OutOfMemory, sizeof(struct xrow_header),
> + "slab", "struct xrow_header");
> + goto error;
> + }
> +
> + double timeout = replication_disconnect_timeout();
> + try {
> + /* TODO: we should have a C version of this function. */
> + coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
Nit: IMO better use guards to free resources, if any.
> + } catch (...) {
> + goto error;
> + }
> +
> + if (iproto_type_is_error(row->type)) {
> + xrow_decode_error(row);
> + goto error;
> + }
> +
> + /* Replication request. */
> + if (row->replica_id == REPLICA_ID_NIL ||
> + row->replica_id >= VCLOCK_MAX) {
> + /*
> + * A safety net, this can only occur
> + * if we're fed a strangely broken xlog.
> + */
> + diag_set(ClientError, ER_UNKNOWN_REPLICA,
> + int2str(row->replica_id),
> + tt_uuid_str(&REPLICASET_UUID));
> + goto error;
> + }
> + if (ibuf_used(row_buf) == sizeof(struct xrow_header)) {
> + /*
> + * First row in a transaction. In order to enforce
> + * consistency check that first row lsn and replica id
> + * match with transaction.
> + */
> + txn_id = row->txn_id;
> + if (row->lsn != txn_id) {
> + /* There is not a first row in the transactions. */
> + diag_set(ClientError, ER_PROTOCOL,
> + "Not a first row in a transaction");
> + goto error;
> + }
> + }
> + if (txn_id != row->txn_id) {
> + /* We are not able to handle interleaving transactions. */
> + diag_set(ClientError, ER_UNSUPPORTED,
> + "replications",
> + "interleaving transactions");
> + goto error;
> + }
> +
> +
> + applier->lag = ev_now(loop()) - row->tm;
> + applier->last_row_time = ev_monotonic_now(loop());
> +
> + if (row->body->iov_base != NULL) {
> + void *new_base = obuf_alloc(data_buf, row->body->iov_len);
> + if (new_base == NULL) {
> + diag_set(OutOfMemory, row->body->iov_len,
> + "slab", "xrow_data");
> + goto error;
> + }
> + memcpy(new_base, row->body->iov_base, row->body->iov_len);
> + row->body->iov_base = new_base;
So we first read a row to an ibuf, then copy it to an obuf. I understand
that you do this, because xrow_header::body has pointers in it. Still it
looks rather awkward. May be, we'd better temporarily fix up those
pointers to store relative offsets instead?
> + }
> +
> + } while (row->txn_commit == 0);
> +
> + return 0;
> +error:
> + ibuf_reset(row_buf);
> + obuf_reset(data_buf);
obuf_reset, ibuf_reset don't free up memory. You must use obuf_destroy,
ibuf_destroy.
> + return -1;
> +}
> +
> /**
> * Execute and process SUBSCRIBE request (follow updates from a master).
> */
> @@ -392,6 +492,10 @@ applier_subscribe(struct applier *applier)
> struct xrow_header row;
> struct vclock remote_vclock_at_subscribe;
> struct tt_uuid cluster_id = uuid_nil;
> + struct ibuf row_buf;
> + struct obuf data_buf;
IMO the buffers better be a part of the applier struct. Then you
wouldn't need to use a label to free them up in applier_read_tx and
hence could simply let the exception thrown by coio_read_xrow_timeout_xc
travel up the stack without wrapping it in that awkward try-catch block.
> + ibuf_create(&row_buf, &cord()->slabc, 32 * sizeof(struct xrow_header));
> + obuf_create(&data_buf, &cord()->slabc, 0x10000);
This constant look like magic to me.
>
> xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
> &replicaset.vclock);
> @@ -501,36 +605,16 @@ applier_subscribe(struct applier *applier)
> applier_set_state(applier, APPLIER_FOLLOW);
> }
>
> - /*
> - * Tarantool < 1.7.7 does not send periodic heartbeat
> - * messages so we can't assume that if we haven't heard
> - * from the master for quite a while the connection is
> - * broken - the master might just be idle.
> - */
> - if (applier->version_id < version_id(1, 7, 7)) {
> - coio_read_xrow(coio, ibuf, &row);
You silently dropped this branch. Please leave it be.
> - } else {
> - double timeout = replication_disconnect_timeout();
> - coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
> - }
> + if (applier_read_tx(applier, &row_buf, &data_buf) != 0)
> + diag_raise();
>
> - if (iproto_type_is_error(row.type))
> - xrow_decode_error_xc(&row); /* error */
> - /* Replication request. */
> - if (row.replica_id == REPLICA_ID_NIL ||
> - row.replica_id >= VCLOCK_MAX) {
> - /*
> - * A safety net, this can only occur
> - * if we're fed a strangely broken xlog.
> - */
> - tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
> - int2str(row.replica_id),
> - tt_uuid_str(&REPLICASET_UUID));
> - }
> + struct txn *txn = NULL;
> + struct xrow_header *first_row = (struct xrow_header *)row_buf.rpos;
> + struct xrow_header *last_row = (struct xrow_header *)row_buf.wpos - 1;
>
> - applier->lag = ev_now(loop()) - row.tm;
> + applier->lag = ev_now(loop()) - last_row->tm;
> applier->last_row_time = ev_monotonic_now(loop());
> - struct replica *replica = replica_by_id(row.replica_id);
> + struct replica *replica = replica_by_id(first_row->replica_id);
> struct latch *latch = (replica ? &replica->order_latch :
> &replicaset.applier.order_latch);
> /*
> @@ -540,24 +624,45 @@ applier_subscribe(struct applier *applier)
> * that belong to the same server id.
> */
> latch_lock(latch);
> - if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> - int res = xstream_write(applier->subscribe_stream, &row);
> - if (res != 0) {
> - struct error *e = diag_last_error(diag_get());
> - /**
> - * Silently skip ER_TUPLE_FOUND error if such
> - * option is set in config.
> - */
> - if (e->type == &type_ClientError &&
> + if (vclock_get(&replicaset.vclock,
> + first_row->replica_id) < first_row->lsn) {
> + struct xrow_header *row = first_row;
> + if (first_row != last_row)
> + txn = txn_begin(false);
We have a nice level of abstraction implemented by xstream, but now you
bluntly break it by calling txn_begin/commit directly. Please come up
with a better solution, e.g. you could extend the xstream interface with
write_tx method.
> + int res = 0;
> + while (row <= last_row && res == 0) {
> + res = xstream_write(applier->subscribe_stream, row);
> + struct error *e;
> + if (res != 0 &&
> + (e = diag_last_error(diag_get()))->type ==
> + &type_ClientError &&
> box_error_code(e) == ER_TUPLE_FOUND &&
> - replication_skip_conflict)
> + replication_skip_conflict) {
> + /**
> + * Silently skip ER_TUPLE_FOUND error
> + * if such option is set in config.
> + */
> diag_clear(diag_get());
> - else {
> - latch_unlock(latch);
> - diag_raise();
> + row->type = IPROTO_NOP;
> + row->bodycnt = 0;
> + res = xstream_write(applier->subscribe_stream,
> + row);
> }
> + ++row;
> + }
> + if (res == 0 && txn != NULL)
> + res = txn_commit(txn);
> +
> + if (res != 0) {
> + txn_rollback();
> + obuf_reset(&data_buf);
> + ibuf_reset(&row_buf);
> + latch_unlock(latch);
> + diag_raise();
> }
> }
> + obuf_reset(&data_buf);
> + ibuf_reset(&row_buf);
> latch_unlock(latch);
>
> if (applier->state == APPLIER_SYNC ||
> diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
> new file mode 100644
> index 000000000..47003c644
> --- /dev/null
> +++ b/test/replication/transaction.test.lua
> @@ -0,0 +1,86 @@
> +env = require('test_run')
> +test_run = env.new()
> +box.schema.user.grant('guest', 'replication')
> +
> +s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
> +_ = s:create_index('pk')
> +
> +-- transaction w/o conflict
> +box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
Whenever you add a test, please write a few words about what it does and
the resolution of which ticket it is supposed to test.
^ permalink raw reply [flat|nested] 6+ messages in thread
* [tarantool-patches] Re: [PATCH v3 1/2] Journal transaction boundaries
2019-02-15 13:15 ` Vladimir Davydov
@ 2019-02-19 14:59 ` Konstantin Osipov
0 siblings, 0 replies; 6+ messages in thread
From: Konstantin Osipov @ 2019-02-19 14:59 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/02/15 16:17]:
> > + /* 0x08 */ MP_UINT, /* IPROTO_TXN_ID */
> > + /* 0x09 */ MP_UINT, /* IPROTO_TXN_FLAGS */
>
> I don't quite like the name, because we encode not a txn id, but the
> statement number in a txn. Let's rename it to IPROTO_TXN_STMT_NO or
> IPROTO_TXN_STMT_ID or IPROTO_TXN_STMT_SEQ?
How about IRPOTO_TSN and IPROTO_FLAGS?
> > +enum {
> > + /** Set for the last xrow in a transaction. */
> > + TXN_FLAG_COMMIT = 0x01,
>
> IPROTO_TXN_COMMIT?
IRPOTO_FLAG_COMMIT?
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 6+ messages in thread
* [tarantool-patches] [PATCH v3 2/2] Transaction support for applier
2019-03-10 20:21 [tarantool-patches] [PATCH v3 0/2] Transaction boundaries " Georgy Kirichenko
@ 2019-03-10 20:21 ` Georgy Kirichenko
0 siblings, 0 replies; 6+ messages in thread
From: Georgy Kirichenko @ 2019-03-10 20:21 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Applier fetch incoming rows to form a transaction and then apply it.
Rows are fetched and stored on fiber gc region until last transaction row
with is_commit was fetched. After fetch a multi row transaction is going to be
applied into txn_begin/txn_commit/txn_rolback boundaries. At this time
we could not apply single row transaction in such boundaries because of
ddl which does not support non auto commit transactions.
Closes: #2798
Needed for: #980
---
src/box/applier.cc | 219 +++++++++++++++++------
test/replication/transaction.result | 242 ++++++++++++++++++++++++++
test/replication/transaction.test.lua | 86 +++++++++
3 files changed, 492 insertions(+), 55 deletions(-)
create mode 100644 test/replication/transaction.result
create mode 100644 test/replication/transaction.test.lua
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 94c07aac7..5af132377 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -426,6 +426,159 @@ applier_join(struct applier *applier)
applier_set_state(applier, APPLIER_READY);
}
+/**
+ * Helper struct to bind rows in a list.
+ */
+struct applier_tx_row {
+ /* Next transaction row. */
+ struct stailq_entry next;
+ /* xrow_header struct for the current transaction row. */
+ struct xrow_header row;
+};
+
+static struct applier_tx_row *
+applier_read_tx_row(struct applier *applier)
+{
+ struct ev_io *coio = &applier->io;
+ struct ibuf *ibuf = &applier->ibuf;
+
+ struct applier_tx_row *tx_row = (struct applier_tx_row *)
+ region_alloc(&fiber()->gc, sizeof(struct applier_tx_row));
+
+ if (tx_row == NULL)
+ tnt_raise(OutOfMemory, sizeof(struct applier_tx_row),
+ "region", "struct applier_tx_row");
+
+ struct xrow_header *row = &tx_row->row;
+
+ double timeout = replication_disconnect_timeout();
+ /*
+ * Tarantool < 1.7.7 does not send periodic heartbeat
+ * messages so we can't assume that if we haven't heard
+ * from the master for quite a while the connection is
+ * broken - the master might just be idle.
+ */
+ if (applier->version_id < version_id(1, 7, 7))
+ coio_read_xrow(coio, ibuf, row);
+ else
+ coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+
+ applier->lag = ev_now(loop()) - row->tm;
+ applier->last_row_time = ev_monotonic_now(loop());
+ return tx_row;
+}
+
+/**
+ * Read one transaction from network using applier's input buffer.
+ * Transaction rows are placed onto fiber gc region.
+ * We could not use applier input buffer for that because rpos is adjusted
+ * after each xrow decoding and corresponding network input space is going
+ * to be reused.
+ */
+static void
+applier_read_tx(struct applier *applier, struct stailq *rows)
+{
+ int64_t tsn = 0;
+
+ stailq_create(rows);
+ do {
+ struct applier_tx_row *tx_row = applier_read_tx_row(applier);
+ struct xrow_header *row = &tx_row->row;
+
+ if (iproto_type_is_error(row->type))
+ xrow_decode_error_xc(row);
+
+ /* Replication request. */
+ if (row->replica_id == REPLICA_ID_NIL ||
+ row->replica_id >= VCLOCK_MAX) {
+ /*
+ * A safety net, this can only occur
+ * if we're fed a strangely broken xlog.
+ */
+ tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
+ int2str(row->replica_id),
+ tt_uuid_str(&REPLICASET_UUID));
+ }
+ if (tsn == 0) {
+ /*
+ * Transaction id must be derived from the log sequence
+ * number of the first row in the transaction.
+ */
+ tsn = row->tsn;
+ if (row->lsn != tsn)
+ tnt_raise(ClientError, ER_PROTOCOL,
+ "Transaction id must be derived from "
+ "the lsn of the first row in the "
+ "transaction.");
+ }
+ if (tsn != row->tsn)
+ tnt_raise(ClientError, ER_UNSUPPORTED,
+ "replication",
+ "interleaving transactions");
+
+ assert(row->bodycnt <= 1);
+ if (row->bodycnt == 1 && !row->is_commit) {
+ /* Save row body to gc region. */
+ void *new_base = region_alloc(&fiber()->gc,
+ row->body->iov_len);
+ if (new_base == NULL)
+ tnt_raise(OutOfMemory, row->body->iov_len,
+ "region", "xrow body");
+ memcpy(new_base, row->body->iov_base, row->body->iov_len);
+ /* Adjust row body pointers. */
+ row->body->iov_base = new_base;
+ }
+ stailq_add_tail(rows, &tx_row->next);
+
+ } while (!stailq_last_entry(rows, struct applier_tx_row,
+ next)->row.is_commit);
+}
+
+/**
+ * Apply all rows in the rows queue as a single transaction.
+ *
+ * Return 0 for success or -1 in case of an error.
+ */
+static int
+applier_apply_tx(struct stailq *rows)
+{
+ int res = 0;
+ struct txn *txn = NULL;
+ struct applier_tx_row *item;
+ if (stailq_first(rows) != stailq_last(rows) &&
+ (txn = txn_begin(false)) == NULL)
+ diag_raise();
+ stailq_foreach_entry(item, rows, next) {
+ struct xrow_header *row = &item->row;
+ res = apply_row(row);
+ if (res != 0) {
+ struct error *e = diag_last_error(diag_get());
+ /*
+ * In case of ER_TUPLE_FOUND error and enabled
+ * replication_skip_conflict configuration
+ * option, skip applying the foreign row and
+ * replace it with NOP in the local write ahead
+ * log.
+ */
+ if (e->type == &type_ClientError &&
+ box_error_code(e) == ER_TUPLE_FOUND &&
+ replication_skip_conflict) {
+ diag_clear(diag_get());
+ row->type = IPROTO_NOP;
+ row->bodycnt = 0;
+ res = apply_row(row);
+ }
+ }
+ if (res != 0)
+ break;
+ }
+ if (res == 0 && txn != NULL)
+ res = txn_commit(txn);
+ if (res != 0)
+ txn_rollback();
+ return res;
+}
+
/**
* Execute and process SUBSCRIBE request (follow updates from a master).
*/
@@ -555,36 +708,14 @@ applier_subscribe(struct applier *applier)
applier_set_state(applier, APPLIER_FOLLOW);
}
- /*
- * Tarantool < 1.7.7 does not send periodic heartbeat
- * messages so we can't assume that if we haven't heard
- * from the master for quite a while the connection is
- * broken - the master might just be idle.
- */
- if (applier->version_id < version_id(1, 7, 7)) {
- coio_read_xrow(coio, ibuf, &row);
- } else {
- double timeout = replication_disconnect_timeout();
- coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
- }
-
- if (iproto_type_is_error(row.type))
- xrow_decode_error_xc(&row); /* error */
- /* Replication request. */
- if (row.replica_id == REPLICA_ID_NIL ||
- row.replica_id >= VCLOCK_MAX) {
- /*
- * A safety net, this can only occur
- * if we're fed a strangely broken xlog.
- */
- tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
- int2str(row.replica_id),
- tt_uuid_str(&REPLICASET_UUID));
- }
+ struct stailq rows;
+ applier_read_tx(applier, &rows);
- applier->lag = ev_now(loop()) - row.tm;
+ struct xrow_header *first_row =
+ &stailq_first_entry(&rows, struct applier_tx_row,
+ next)->row;
applier->last_row_time = ev_monotonic_now(loop());
- struct replica *replica = replica_by_id(row.replica_id);
+ struct replica *replica = replica_by_id(first_row->replica_id);
struct latch *latch = (replica ? &replica->order_latch :
&replicaset.applier.order_latch);
/*
@@ -594,33 +725,11 @@ applier_subscribe(struct applier *applier)
* that belong to the same server id.
*/
latch_lock(latch);
- if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
- int res = apply_row(&row);
- if (res != 0) {
- struct error *e = diag_last_error(diag_get());
- /*
- * In case of ER_TUPLE_FOUND error and enabled
- * replication_skip_conflict configuration
- * option, skip applying the foreign row and
- * replace it with NOP in the local write ahead
- * log.
- */
- if (e->type == &type_ClientError &&
- box_error_code(e) == ER_TUPLE_FOUND &&
- replication_skip_conflict) {
- diag_clear(diag_get());
- struct xrow_header nop;
- nop.type = IPROTO_NOP;
- nop.bodycnt = 0;
- nop.replica_id = row.replica_id;
- nop.lsn = row.lsn;
- res = apply_row(&nop);
- }
- }
- if (res != 0) {
- latch_unlock(latch);
- diag_raise();
- }
+ if (vclock_get(&replicaset.vclock, first_row->replica_id) <
+ first_row->lsn &&
+ applier_apply_tx(&rows) != 0) {
+ latch_unlock(latch);
+ diag_raise();
}
latch_unlock(latch);
diff --git a/test/replication/transaction.result b/test/replication/transaction.result
new file mode 100644
index 000000000..8c2ac6ee4
--- /dev/null
+++ b/test/replication/transaction.result
@@ -0,0 +1,242 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+---
+...
+_ = s:create_index('pk')
+---
+...
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+---
+- [4, 'r']
+...
+v1 = box.info.vclock
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+---
+...
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [4, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- set conflict to third transaction
+_ = box.space.test:delete({4})
+---
+...
+box.space.test:replace({6, 'r'})
+---
+- [6, 'r']
+...
+-- restart replication
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'm']
+ - [6, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- check restart does not help
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'm']
+ - [6, 'r']
+...
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}, replication_skip_conflict = true}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- check last transaction applied without conflicting row
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'm']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+---
+- [8, 'r']
+...
+box.space.test:replace({9, 'r'})
+---
+- [9, 'r']
+...
+-- issue a conflicting tx
+test_run:cmd("switch default")
+---
+- true
+...
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- vclock should be increased but rows skipped
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'm']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+ - [8, 'r']
+ - [9, 'r']
+...
+-- check restart does not change something
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'm']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+ - [8, 'r']
+ - [9, 'r']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+s:drop()
+---
+...
diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
new file mode 100644
index 000000000..f25a4737d
--- /dev/null
+++ b/test/replication/transaction.test.lua
@@ -0,0 +1,86 @@
+env = require('test_run')
+test_run = env.new()
+box.schema.user.grant('guest', 'replication')
+
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+_ = s:create_index('pk')
+
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+v1 = box.info.vclock
+
+test_run:cmd("switch default")
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+-- set conflict to third transaction
+_ = box.space.test:delete({4})
+box.space.test:replace({6, 'r'})
+-- restart replication
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+
+-- check restart does not help
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+box.cfg{replication = {}, replication_skip_conflict = true}
+box.cfg{replication = replication}
+
+-- check last transaction applied without conflicting row
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+box.space.test:replace({9, 'r'})
+
+-- issue a conflicting tx
+test_run:cmd("switch default")
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- vclock should be increased but rows skipped
+box.space.test:select()
+
+-- check restart does not change something
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+box.schema.user.revoke('guest', 'replication')
+s:drop()
--
2.21.0
^ permalink raw reply [flat|nested] 6+ messages in thread
end of thread, other threads:[~2019-03-10 20:21 UTC | newest]
Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
[not found] <cover.1550001848.git.georgy@tarantool.org>
2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 1/2] Journal transaction boundaries Georgy Kirichenko
2019-02-15 13:15 ` Vladimir Davydov
2019-02-19 14:59 ` [tarantool-patches] " Konstantin Osipov
2019-02-12 20:04 ` [tarantool-patches] [PATCH v3 2/2] Transaction support for applier Georgy Kirichenko
2019-02-18 9:36 ` Vladimir Davydov
2019-03-10 20:21 [tarantool-patches] [PATCH v3 0/2] Transaction boundaries " Georgy Kirichenko
2019-03-10 20:21 ` [tarantool-patches] [PATCH v3 2/2] Transaction support " Georgy Kirichenko
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox