* [tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream
2019-03-06 20:16 [tarantool-patches] [PATCH v2 0/3] Transaction boundaries for applier Georgy Kirichenko
@ 2019-03-06 20:16 ` Georgy Kirichenko
2019-03-07 9:31 ` Vladimir Davydov
2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 2/3] Put all new rows to the end of journal request Georgy Kirichenko
2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 3/3] Transaction support for applier Georgy Kirichenko
2 siblings, 1 reply; 11+ messages in thread
From: Georgy Kirichenko @ 2019-03-06 20:16 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Remove xstream dependency and use direct box interface to apply all
replication rows. This is refactoring before transactional replication.
Needed for: #2798
---
src/box/applier.cc | 69 ++++++++++++++++++++++++++++++++++++++--------
src/box/applier.h | 9 +-----
src/box/box.cc | 68 ++++++++-------------------------------------
3 files changed, 69 insertions(+), 77 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index e9addcb3e..a687d2bea 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -37,7 +37,6 @@
#include "fiber_cond.h"
#include "coio.h"
#include "coio_buf.h"
-#include "xstream.h"
#include "wal.h"
#include "xrow.h"
#include "replication.h"
@@ -48,6 +47,9 @@
#include "error.h"
#include "session.h"
#include "cfg.h"
+#include "schema.h"
+#include "txn.h"
+#include "box.h"
STRS(applier_state, applier_STATE);
@@ -167,6 +169,53 @@ applier_writer_f(va_list ap)
return 0;
}
+static int
+apply_initial_join_row(struct xrow_header *row)
+{
+ struct request request;
+ xrow_decode_dml(row, &request, dml_request_key_map(row->type));
+ struct space *space = space_cache_find_xc(request.space_id);
+ /* no access checks here - applier always works with admin privs */
+ return space_apply_initial_join_row(space, &request);
+}
+
+/**
+ * Process a no-op request.
+ *
+ * A no-op request does not affect any space, but it
+ * promotes vclock and is written to WAL.
+ */
+static int
+process_nop(struct request *request)
+{
+ assert(request->type == IPROTO_NOP);
+ struct txn *txn = txn_begin_stmt(NULL);
+ if (txn == NULL)
+ return -1;
+ return txn_commit_stmt(txn, request);
+}
+
+static int
+apply_row(struct xrow_header *row)
+{
+ struct request request;
+ if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
+ return -1;
+ if (request.type == IPROTO_NOP) {
+ if (process_nop(&request) != 0)
+ return -1;
+ return 0;
+ }
+ struct space *space = space_cache_find(request.space_id);
+ if (space == NULL)
+ return -1;
+ if (box_process_rw(&request, space, NULL) != 0) {
+ say_error("error applying row: %s", request_str(&request));
+ return -1;
+ }
+ return 0;
+}
+
/**
* Connect to a remote host and authenticate the client.
*/
@@ -309,13 +358,13 @@ applier_join(struct applier *applier)
/*
* Receive initial data.
*/
- assert(applier->join_stream != NULL);
uint64_t row_count = 0;
while (true) {
coio_read_xrow(coio, ibuf, &row);
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
- xstream_write_xc(applier->join_stream, &row);
+ if (apply_initial_join_row(&row) != 0)
+ diag_raise();
if (++row_count % 100000 == 0)
say_info("%.1fM rows received", row_count / 1e6);
} else if (row.type == IPROTO_OK) {
@@ -357,7 +406,8 @@ applier_join(struct applier *applier)
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
vclock_follow_xrow(&replicaset.vclock, &row);
- xstream_write_xc(applier->subscribe_stream, &row);
+ if (apply_row(&row) != 0)
+ diag_raise();
if (++row_count % 100000 == 0)
say_info("%.1fM rows received", row_count / 1e6);
} else if (row.type == IPROTO_OK) {
@@ -385,8 +435,6 @@ applier_join(struct applier *applier)
static void
applier_subscribe(struct applier *applier)
{
- assert(applier->subscribe_stream != NULL);
-
/* Send SUBSCRIBE request */
struct ev_io *coio = &applier->io;
struct ibuf *ibuf = &applier->ibuf;
@@ -550,7 +598,7 @@ applier_subscribe(struct applier *applier)
*/
latch_lock(latch);
if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
- int res = xstream_write(applier->subscribe_stream, &row);
+ int res = apply_row(&row);
if (res != 0) {
struct error *e = diag_last_error(diag_get());
/*
@@ -569,7 +617,7 @@ applier_subscribe(struct applier *applier)
nop.bodycnt = 0;
nop.replica_id = row.replica_id;
nop.lsn = row.lsn;
- res = xstream_write(applier->subscribe_stream, &nop);
+ res = apply_row(&nop);
}
}
if (res != 0) {
@@ -731,8 +779,7 @@ applier_stop(struct applier *applier)
}
struct applier *
-applier_new(const char *uri, struct xstream *join_stream,
- struct xstream *subscribe_stream)
+applier_new(const char *uri)
{
struct applier *applier = (struct applier *)
calloc(1, sizeof(struct applier));
@@ -751,8 +798,6 @@ applier_new(const char *uri, struct xstream *join_stream,
assert(rc == 0 && applier->uri.service != NULL);
(void) rc;
- applier->join_stream = join_stream;
- applier->subscribe_stream = subscribe_stream;
applier->last_row_time = ev_monotonic_now(loop());
rlist_create(&applier->on_state);
fiber_cond_create(&applier->resume_cond);
diff --git a/src/box/applier.h b/src/box/applier.h
index d942b6fbb..5bff90031 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -45,8 +45,6 @@
#include "xrow.h"
-struct xstream;
-
enum { APPLIER_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */
#define applier_STATE(_) \
@@ -116,10 +114,6 @@ struct applier {
bool is_paused;
/** Condition variable signaled to resume the applier. */
struct fiber_cond resume_cond;
- /** xstream to process rows during initial JOIN */
- struct xstream *join_stream;
- /** xstream to process rows during final JOIN and SUBSCRIBE */
- struct xstream *subscribe_stream;
};
/**
@@ -152,8 +146,7 @@ applier_stop(struct applier *applier);
* @error throws OutOfMemory exception if out of memory.
*/
struct applier *
-applier_new(const char *uri, struct xstream *join_stream,
- struct xstream *subscribe_stream);
+applier_new(const char *uri);
/**
* Destroy and delete a applier.
diff --git a/src/box/box.cc b/src/box/box.cc
index a62595421..190e5eae5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -122,10 +122,6 @@ static fiber_cond ro_cond;
*/
static bool is_orphan;
-/* Use the shared instance of xstream for all appliers */
-static struct xstream join_stream;
-static struct xstream subscribe_stream;
-
/**
* The pool of fibers in the transaction processor thread
* working on incoming messages from net, wal and other
@@ -203,22 +199,6 @@ box_process_rw(struct request *request, struct space *space,
return rc;
}
-/**
- * Process a no-op request.
- *
- * A no-op request does not affect any space, but it
- * promotes vclock and is written to WAL.
- */
-static int
-process_nop(struct request *request)
-{
- assert(request->type == IPROTO_NOP);
- struct txn *txn = txn_begin_stmt(NULL);
- if (txn == NULL)
- return -1;
- return txn_commit_stmt(txn, request);
-}
-
void
box_set_ro(bool ro)
{
@@ -305,35 +285,24 @@ recovery_journal_create(struct recovery_journal *journal, struct vclock *v)
journal->vclock = v;
}
-static inline void
-apply_row(struct xstream *stream, struct xrow_header *row)
+static void
+apply_wal_row(struct xstream *stream, struct xrow_header *row)
{
- (void) stream;
struct request request;
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
- if (request.type == IPROTO_NOP) {
- if (process_nop(&request) != 0)
+ if (request.type != IPROTO_NOP) {
+ struct space *space = space_cache_find_xc(request.space_id);
+ if (box_process_rw(&request, space, NULL) != 0) {
+ say_error("error applying row: %s", request_str(&request));
diag_raise();
- return;
- }
- struct space *space = space_cache_find_xc(request.space_id);
- if (box_process_rw(&request, space, NULL) != 0) {
- say_error("error applying row: %s", request_str(&request));
- diag_raise();
+ }
}
-}
-
-static void
-apply_wal_row(struct xstream *stream, struct xrow_header *row)
-{
- apply_row(stream, row);
-
struct wal_stream *xstream =
container_of(stream, struct wal_stream, base);
/**
- * Yield once in a while, but not too often,
- * mostly to allow signal handling to take place.
- */
+ * Yield once in a while, but not too often,
+ * mostly to allow signal handling to take place.
+ */
if (++xstream->rows % xstream->yield == 0)
fiber_sleep(0);
}
@@ -352,17 +321,6 @@ wal_stream_create(struct wal_stream *ctx, size_t wal_max_rows)
ctx->yield = (wal_max_rows >> 4) + 1;
}
-static void
-apply_initial_join_row(struct xstream *stream, struct xrow_header *row)
-{
- (void) stream;
- struct request request;
- xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
- struct space *space = space_cache_find_xc(request.space_id);
- /* no access checks here - applier always works with admin privs */
- space_apply_initial_join_row_xc(space, &request);
-}
-
/* {{{ configuration bindings */
static void
@@ -656,9 +614,7 @@ cfg_get_replication(int *p_count)
for (int i = 0; i < count; i++) {
const char *source = cfg_getarr_elem("replication", i);
- struct applier *applier = applier_new(source,
- &join_stream,
- &subscribe_stream);
+ struct applier *applier = applier_new(source);
if (applier == NULL) {
/* Delete created appliers */
while (--i >= 0)
@@ -2131,8 +2087,6 @@ box_cfg_xc(void)
box_set_replication_sync_lag();
box_set_replication_sync_timeout();
box_set_replication_skip_conflict();
- xstream_create(&join_stream, apply_initial_join_row);
- xstream_create(&subscribe_stream, apply_row);
struct gc_checkpoint *checkpoint = gc_last_checkpoint();
--
2.21.0
^ permalink raw reply [flat|nested] 11+ messages in thread
* [tarantool-patches] [PATCH v2 3/3] Transaction support for applier
2019-03-06 20:16 [tarantool-patches] [PATCH v2 0/3] Transaction boundaries for applier Georgy Kirichenko
2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 1/3] Applier gets rid of a xstream Georgy Kirichenko
2019-03-06 20:16 ` [tarantool-patches] [PATCH v2 2/3] Put all new rows to the end of journal request Georgy Kirichenko
@ 2019-03-06 20:16 ` Georgy Kirichenko
2019-03-07 10:38 ` Vladimir Davydov
2019-03-07 10:40 ` [tarantool-patches] " Konstantin Osipov
2 siblings, 2 replies; 11+ messages in thread
From: Georgy Kirichenko @ 2019-03-06 20:16 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Applier fetch incoming rows to form a transaction and then apply it.
Rows are fetched and stored on fiber gc region until last transaction row
with is_commit was fetched. After fetch a multi row transaction is going to be
applied into txn_begin/txn_commit/txn_rolback boundaries. At this time
we could not apply single row transaction in such boundaries because of
ddl which does not support non auto commit transactions.
Closes: #2798
Needed for: #980
---
src/box/applier.cc | 211 ++++++++++++++++------
test/replication/transaction.result | 240 ++++++++++++++++++++++++++
test/replication/transaction.test.lua | 86 +++++++++
3 files changed, 482 insertions(+), 55 deletions(-)
create mode 100644 test/replication/transaction.result
create mode 100644 test/replication/transaction.test.lua
diff --git a/src/box/applier.cc b/src/box/applier.cc
index a687d2bea..f0a779aa7 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -429,6 +429,146 @@ applier_join(struct applier *applier)
applier_set_state(applier, APPLIER_READY);
}
+/**
+ * Helper struct to bind rows in a list.
+ */
+struct xrow_header_item {
+ struct stailq_entry next;
+ struct xrow_header row;
+};
+
+/**
+ * Read one transaction from network using applier's input buffer.
+ * Transaction rows are placed onto fiber gc region.
+ * We could not use applier input buffer for that because rpos is adjusted
+ * after each xrow decoding and corresponding network input space is going
+ * to be reused.
+ */
+static void
+applier_read_tx(struct applier *applier, struct stailq *rows)
+{
+ struct ev_io *coio = &applier->io;
+ struct ibuf *ibuf = &applier->ibuf;
+ int64_t tsn = 0;
+
+ stailq_create(rows);
+ do {
+ struct xrow_header_item *item = (struct xrow_header_item *)
+ region_alloc(&fiber()->gc,
+ sizeof(struct xrow_header_item));
+ if (item == NULL)
+ tnt_raise(OutOfMemory, sizeof(struct xrow_header_item),
+ "region", "struct xrow_header_item");
+ stailq_add_tail(rows, &item->next);
+ struct xrow_header *row = &item->row;
+
+ double timeout = replication_disconnect_timeout();
+ /*
+ * Tarantool < 1.7.7 does not send periodic heartbeat
+ * messages so we can't assume that if we haven't heard
+ * from the master for quite a while the connection is
+ * broken - the master might just be idle.
+ */
+ if (applier->version_id < version_id(1, 7, 7))
+ coio_read_xrow(coio, ibuf, row);
+ else
+ coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+
+ if (iproto_type_is_error(row->type))
+ xrow_decode_error_xc(row);
+
+ /* Replication request. */
+ if (row->replica_id == REPLICA_ID_NIL ||
+ row->replica_id >= VCLOCK_MAX) {
+ /*
+ * A safety net, this can only occur
+ * if we're fed a strangely broken xlog.
+ */
+ tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
+ int2str(row->replica_id),
+ tt_uuid_str(&REPLICASET_UUID));
+ }
+ if (tsn == 0) {
+ /*
+ * Transaction id must be derived from the log sequence
+ * number of the first row in the transaction.
+ */
+ tsn = row->tsn;
+ if (row->lsn != tsn)
+ tnt_raise(ClientError, ER_PROTOCOL,
+ "Transaction id must be derived from "
+ "the lsn of the first row in the "
+ "transaction.");
+ }
+ if (tsn != row->tsn)
+ tnt_raise(ClientError, ER_UNSUPPORTED,
+ "replication",
+ "interleaving transactions");
+
+ applier->lag = ev_now(loop()) - row->tm;
+ applier->last_row_time = ev_monotonic_now(loop());
+
+ if (row->body->iov_base != NULL && row->is_commit == false) {
+ /* Save row bodies to gc region. */
+ void *new_base = region_alloc(&fiber()->gc,
+ row->body->iov_len);
+ if (new_base == NULL)
+ tnt_raise(OutOfMemory, row->body->iov_len,
+ "region", "xrow body");
+ memcpy(new_base, row->body->iov_base, row->body->iov_len);
+ /* Adjust row body pointers. */
+ row->body->iov_base = new_base;
+ }
+
+ } while (stailq_last_entry(rows, struct xrow_header_item,
+ next)->row.is_commit == 0);
+}
+
+/**
+ * Apply all rows in the rows queue as a single transaction.
+ *
+ * Return 0 for success or -1 in case of an error.
+ */
+static int
+applier_apply_tx(struct stailq *rows)
+{
+ int res = 0;
+ struct txn *txn = NULL;
+ struct xrow_header_item *item;
+ if (stailq_first(rows) != stailq_last(rows))
+ txn = txn_begin(false);
+ stailq_foreach_entry(item, rows, next) {
+ struct xrow_header *row = &item->row;
+ res = apply_row(row);
+ struct error *e;
+ if (res != 0 &&
+ (e = diag_last_error(diag_get()))->type ==
+ &type_ClientError &&
+ box_error_code(e) == ER_TUPLE_FOUND &&
+ replication_skip_conflict) {
+ /*
+ * In case of ER_TUPLE_FOUND error and enabled
+ * replication_skip_conflict configuration
+ * option, skip applying the foreign row and
+ * replace it with NOP in the local write ahead
+ * log.
+ */
+ diag_clear(diag_get());
+ (row)->type = IPROTO_NOP;
+ (row)->bodycnt = 0;
+ res = apply_row(row);
+ }
+ ++row;
+ if (res != 0)
+ break;
+ }
+ if (res == 0 && txn != NULL)
+ res = txn_commit(txn);
+ if (res != 0)
+ txn_rollback();
+ return res;
+}
+
/**
* Execute and process SUBSCRIBE request (follow updates from a master).
*/
@@ -558,36 +698,18 @@ applier_subscribe(struct applier *applier)
applier_set_state(applier, APPLIER_FOLLOW);
}
- /*
- * Tarantool < 1.7.7 does not send periodic heartbeat
- * messages so we can't assume that if we haven't heard
- * from the master for quite a while the connection is
- * broken - the master might just be idle.
- */
- if (applier->version_id < version_id(1, 7, 7)) {
- coio_read_xrow(coio, ibuf, &row);
- } else {
- double timeout = replication_disconnect_timeout();
- coio_read_xrow_timeout_xc(coio, ibuf, &row, timeout);
- }
-
- if (iproto_type_is_error(row.type))
- xrow_decode_error_xc(&row); /* error */
- /* Replication request. */
- if (row.replica_id == REPLICA_ID_NIL ||
- row.replica_id >= VCLOCK_MAX) {
- /*
- * A safety net, this can only occur
- * if we're fed a strangely broken xlog.
- */
- tnt_raise(ClientError, ER_UNKNOWN_REPLICA,
- int2str(row.replica_id),
- tt_uuid_str(&REPLICASET_UUID));
- }
+ struct stailq rows;
+ applier_read_tx(applier, &rows);
- applier->lag = ev_now(loop()) - row.tm;
+ struct xrow_header *first_row =
+ &stailq_first_entry(&rows, struct xrow_header_item,
+ next)->row;
+ applier->lag = ev_now(loop()) -
+ stailq_last_entry(&rows,
+ struct xrow_header_item,
+ next)->row.tm;
applier->last_row_time = ev_monotonic_now(loop());
- struct replica *replica = replica_by_id(row.replica_id);
+ struct replica *replica = replica_by_id(first_row->replica_id);
struct latch *latch = (replica ? &replica->order_latch :
&replicaset.applier.order_latch);
/*
@@ -597,33 +719,12 @@ applier_subscribe(struct applier *applier)
* that belong to the same server id.
*/
latch_lock(latch);
- if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
- int res = apply_row(&row);
- if (res != 0) {
- struct error *e = diag_last_error(diag_get());
- /*
- * In case of ER_TUPLE_FOUND error and enabled
- * replication_skip_conflict configuration
- * option, skip applying the foreign row and
- * replace it with NOP in the local write ahead
- * log.
- */
- if (e->type == &type_ClientError &&
- box_error_code(e) == ER_TUPLE_FOUND &&
- replication_skip_conflict) {
- diag_clear(diag_get());
- struct xrow_header nop;
- nop.type = IPROTO_NOP;
- nop.bodycnt = 0;
- nop.replica_id = row.replica_id;
- nop.lsn = row.lsn;
- res = apply_row(&nop);
- }
- }
- if (res != 0) {
- latch_unlock(latch);
- diag_raise();
- }
+ if (vclock_get(&replicaset.vclock, first_row->replica_id) <
+ first_row->lsn &&
+ applier_apply_tx(&rows) != 0) {
+ latch_unlock(latch);
+ fiber_gc();
+ diag_raise();
}
latch_unlock(latch);
diff --git a/test/replication/transaction.result b/test/replication/transaction.result
new file mode 100644
index 000000000..009f84430
--- /dev/null
+++ b/test/replication/transaction.result
@@ -0,0 +1,240 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+---
+...
+_ = s:create_index('pk')
+---
+...
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+---
+- [4, 'r']
+...
+v1 = box.info.vclock
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+---
+...
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [4, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- set conflict to third transaction
+box.space.test:delete({3})
+---
+...
+box.space.test:replace({6, 'r'})
+---
+- [6, 'r']
+...
+-- restart replication
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+---
+- false
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [4, 'r']
+ - [6, 'r']
+...
+-- check replication status
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+-- check restart does not help
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [4, 'r']
+ - [6, 'r']
+...
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+---
+...
+box.cfg{replication = {}, replication_skip_conflict = true}
+---
+...
+box.cfg{replication = replication}
+---
+...
+-- check last transaction applied without conflicting row
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'r']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+---
+- [8, 'r']
+...
+box.space.test:replace({9, 'r'})
+---
+- [9, 'r']
+...
+-- issue a conflicting tx
+test_run:cmd("switch default")
+---
+- true
+...
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+---
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- vclock should be increased but rows skipped
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'r']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+ - [8, 'r']
+ - [9, 'r']
+...
+-- check restart does not change something
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:select()
+---
+- - [1, 'm']
+ - [2, 'm']
+ - [3, 'm']
+ - [4, 'r']
+ - [5, 'm']
+ - [6, 'r']
+ - [7, 'm']
+ - [8, 'r']
+ - [9, 'r']
+...
+box.info.replication[1].upstream.status
+---
+- follow
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+s:drop()
+---
+...
diff --git a/test/replication/transaction.test.lua b/test/replication/transaction.test.lua
new file mode 100644
index 000000000..47003c644
--- /dev/null
+++ b/test/replication/transaction.test.lua
@@ -0,0 +1,86 @@
+env = require('test_run')
+test_run = env.new()
+box.schema.user.grant('guest', 'replication')
+
+s = box.schema.space.create('test', {engine = test_run:get_cfg('engine')})
+_ = s:create_index('pk')
+
+-- transaction w/o conflict
+box.begin() s:insert({1, 'm'}) s:insert({2, 'm'}) box.commit()
+
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+test_run:cmd("switch replica")
+
+-- insert a conflicting row
+box.space.test:replace({4, 'r'})
+v1 = box.info.vclock
+
+test_run:cmd("switch default")
+-- create a two-row transaction with conflicting second
+box.begin() s:insert({3, 'm'}) s:insert({4, 'm'}) box.commit()
+-- create a third transaction
+box.begin() s:insert({5, 'm'}) s:insert({6, 'm'}) s:insert({7, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- nothing was applied
+v1[1] == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+-- set conflict to third transaction
+box.space.test:delete({3})
+box.space.test:replace({6, 'r'})
+-- restart replication
+replication = box.cfg.replication
+box.cfg{replication = {}}
+box.cfg{replication = replication}
+-- replication stopped of third transaction
+v1[1] + 2 == box.info.vclock[1]
+box.space.test:select()
+-- check replication status
+box.info.replication[1].upstream.status
+box.info.replication[1].upstream.message
+
+-- check restart does not help
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+-- set skip conflict rows and check that non-conflicting were applied
+replication = box.cfg.replication
+box.cfg{replication = {}, replication_skip_conflict = true}
+box.cfg{replication = replication}
+
+-- check last transaction applied without conflicting row
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+-- make some new conflicting rows with skip-conflicts
+box.space.test:replace({8, 'r'})
+box.space.test:replace({9, 'r'})
+
+-- issue a conflicting tx
+test_run:cmd("switch default")
+box.begin() s:insert({8, 'm'}) s:insert({9, 'm'}) box.commit()
+
+test_run:cmd("switch replica")
+-- vclock should be increased but rows skipped
+box.space.test:select()
+
+-- check restart does not change something
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+
+box.space.test:select()
+box.info.replication[1].upstream.status
+
+test_run:cmd("switch default")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+
+box.schema.user.revoke('guest', 'replication')
+s:drop()
--
2.21.0
^ permalink raw reply [flat|nested] 11+ messages in thread