[Tarantool-patches] [PATCH 6/6] recovery: follow transaction boundaries while recovery or join
Georgy Kirichenko
georgy at tarantool.org
Tue Nov 19 19:04:57 MSK 2019
Do not start a transaction for each local journal or final join row
but follow transaction boundaries instead.
Part of #980
---
src/box/applier.cc | 92 +++++++++++++-------------
src/box/box.cc | 72 ++++++++++++++------
test/xlog/panic_on_broken_lsn.result | 9 ++-
test/xlog/panic_on_broken_lsn.test.lua | 7 +-
4 files changed, 107 insertions(+), 73 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 294765195..d00b1b04a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -268,23 +268,6 @@ apply_row(struct xrow_header *row)
return 0;
}
-static int
-apply_final_join_row(struct xrow_header *row)
-{
- struct txn *txn = txn_begin();
- if (txn == NULL)
- return -1;
- if (apply_row(row) != 0) {
- txn_rollback(txn);
- fiber_gc();
- return -1;
- }
- if (txn_commit(txn) != 0)
- return -1;
- fiber_gc();
- return 0;
-}
-
/**
* Connect to a remote host and authenticate the client.
*/
@@ -391,6 +374,22 @@ done:
applier_set_state(applier, APPLIER_READY);
}
+/**
+ * A helper struct to link xrow objects in a list.
+ */
+struct applier_tx_row {
+ /* Next transaction row. */
+ struct stailq_entry next;
+ /* xrow_header struct for the current transaction row. */
+ struct xrow_header row;
+};
+
+static void
+applier_read_tx(struct applier *applier, struct stailq *rows);
+
+static int
+applier_apply_tx(struct stailq *rows);
+
/**
* Execute and process JOIN request (bootstrap the instance).
*/
@@ -478,27 +477,29 @@ applier_join(struct applier *applier)
* Receive final data.
*/
while (true) {
- if (coio_read_xrow(coio, ibuf, &row) < 0)
- diag_raise();
- applier->last_row_time = ev_monotonic_now(loop());
- if (iproto_type_is_dml(row.type)) {
- vclock_follow_xrow(&replicaset.vclock, &row);
- if (apply_final_join_row(&row) != 0)
- diag_raise();
- if (++row_count % 100000 == 0)
- say_info("%.1fM rows received", row_count / 1e6);
- } else if (row.type == IPROTO_OK) {
- /*
- * Current vclock. This is not used now,
- * ignore.
- */
+ struct stailq rows;
+ applier_read_tx(applier, &rows);
+ struct xrow_header *first_row =
+ &(stailq_first_entry(&rows, struct applier_tx_row,
+ next)->row);
+ if (first_row->type == IPROTO_OK) {
+ if (applier->version_id < version_id(1, 7, 0)) {
+ /*
+ * This is the start vclock if the
+ * server is 1.6. Since we have
+ * not initialized replication
+ * vclock yet, do it now. In 1.7+
+ * this vclock is not used.
+ */
+ xrow_decode_vclock_xc(first_row, &replicaset.vclock);
+ }
break; /* end of stream */
- } else if (iproto_type_is_error(row.type)) {
- xrow_decode_error_xc(&row); /* rethrow error */
- } else {
- tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
- (uint32_t) row.type);
}
+ if (applier_apply_tx(&rows) != 0)
+ diag_raise();
+ if (ibuf_used(ibuf) == 0)
+ ibuf_reset(ibuf);
+ fiber_gc();
}
say_info("final data received");
@@ -506,16 +507,6 @@ applier_join(struct applier *applier)
applier_set_state(applier, APPLIER_READY);
}
-/**
- * A helper struct to link xrow objects in a list.
- */
-struct applier_tx_row {
- /* Next transaction row. */
- struct stailq_entry next;
- /* xrow_header struct for the current transaction row. */
- struct xrow_header row;
-};
-
static struct applier_tx_row *
applier_read_tx_row(struct applier *applier)
{
@@ -532,6 +523,9 @@ applier_read_tx_row(struct applier *applier)
struct xrow_header *row = &tx_row->row;
double timeout = replication_disconnect_timeout();
+ /* We check timeout only in case of subscribe. */
+ if (applier->state == APPLIER_FINAL_JOIN)
+ timeout = TIMEOUT_INFINITY;
/*
* Tarantool < 1.7.7 does not send periodic heartbeat
* messages so we can't assume that if we haven't heard
@@ -568,6 +562,12 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
struct applier_tx_row *tx_row = applier_read_tx_row(applier);
struct xrow_header *row = &tx_row->row;
+ if (row->type == IPROTO_OK) {
+ stailq_add_tail(rows, &tx_row->next);
+ assert(tx_row->row.is_commit);
+ break;
+ }
+
if (iproto_type_is_error(row->type))
xrow_decode_error_xc(row);
diff --git a/src/box/box.cc b/src/box/box.cc
index 71822551e..9464eee63 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -286,6 +286,8 @@ struct wal_stream {
struct xstream base;
/** How many rows have been recovered so far. */
size_t rows;
+ /** Current transaction.*/
+ struct txn *txn;
};
/**
@@ -321,7 +323,8 @@ recovery_journal_write(struct journal *base,
}
static inline void
-recovery_journal_create(struct recovery_journal *journal, const struct vclock *v)
+recovery_journal_create(struct recovery_journal *journal,
+ const struct vclock *v)
{
journal_create(&journal->base, recovery_journal_write, NULL);
vclock_copy(&journal->vclock, v);
@@ -330,33 +333,44 @@ recovery_journal_create(struct recovery_journal *journal, const struct vclock *v
static int
apply_wal_row(struct xstream *stream, struct xrow_header *row)
{
+ struct wal_stream *wal_stream =
+ container_of(stream, struct wal_stream, base);
+ if (wal_stream->txn == NULL) {
+ wal_stream->txn = txn_begin();
+ if (wal_stream->txn == NULL)
+ return -1;
+ }
struct request request;
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
+ int rc = 0;
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find_xc(request.space_id);
- if (box_process_rw(&request, space, NULL) != 0) {
+ rc = box_process_rw(&request, space, NULL);
+ if (rc != 0)
say_error("error applying row: %s", request_str(&request));
- return -1;
- }
} else {
- struct txn *txn = txn_begin();
- if (txn == NULL || txn_begin_stmt(txn, NULL) != 0 ||
- txn_commit_stmt(txn, &request) != 0) {
- txn_rollback(txn);
+ struct txn *txn = in_txn();
+ rc = txn_begin_stmt(txn, NULL);
+ if (rc == 0)
+ rc = txn_commit_stmt(txn, &request);
+ }
+ if (row->is_commit) {
+ if (txn_commit(wal_stream->txn) != 0) {
+ wal_stream->txn = NULL;
return -1;
}
- if (txn_commit(txn) != 0)
- return -1;
+ wal_stream->txn = NULL;
}
- struct wal_stream *xstream =
- container_of(stream, struct wal_stream, base);
/**
* Yield once in a while, but not too often,
* mostly to allow signal handling to take place.
*/
- if (++xstream->rows % WAL_ROWS_PER_YIELD == 0)
+ if (++(wal_stream->rows) > WAL_ROWS_PER_YIELD &&
+ wal_stream->txn == NULL) {
+ wal_stream->rows -= WAL_ROWS_PER_YIELD;
fiber_sleep(0);
- return 0;
+ }
+ return rc;
}
static void
@@ -364,6 +378,21 @@ wal_stream_create(struct wal_stream *ctx)
{
xstream_create(&ctx->base, apply_wal_row);
ctx->rows = 0;
+ ctx->txn = NULL;
+}
+
+static int
+wal_stream_destroy(struct wal_stream *ctx)
+{
+ if (ctx->txn != NULL) {
+ /* The last processed row does not have a commit flag set. */
+ txn_rollback(ctx->txn);
+ ctx->txn = NULL;
+ diag_set(ClientError, ER_UNSUPPORTED,
+ "recovery", "not finished transactions");
+ return -1;
+ }
+ return 0;
}
/* {{{ configuration bindings */
@@ -1917,6 +1946,9 @@ local_recovery(const struct tt_uuid *instance_uuid,
struct wal_stream wal_stream;
wal_stream_create(&wal_stream);
+ auto wal_stream_guard = make_scoped_guard([&]{
+ wal_stream_destroy(&wal_stream);
+ });
struct recovery *recovery;
recovery = recovery_new(cfg_gets("wal_dir"),
@@ -1925,13 +1957,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
if (recovery == NULL)
diag_raise();
- /*
- * Make sure we report the actual recovery position
- * in box.info while local recovery is in progress.
- */
- box_vclock = &recovery->vclock;
auto guard = make_scoped_guard([&]{
- box_vclock = &replicaset.vclock;
recovery_stop_local(recovery);
recovery_delete(recovery);
});
@@ -1981,12 +2007,12 @@ local_recovery(const struct tt_uuid *instance_uuid,
*/
memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
- vclock_copy(&replicaset.vclock, checkpoint_vclock);
struct recovery_journal journal;
recovery_journal_create(&journal, &recovery->vclock);
journal_set(&journal.base);
engine_begin_final_recovery_xc();
+
if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0)
diag_raise();
engine_end_recovery_xc();
@@ -2015,7 +2041,9 @@ local_recovery(const struct tt_uuid *instance_uuid,
box_sync_replication(false);
}
recovery_finalize(recovery);
-
+ wal_stream_guard.is_active = false;
+ if (wal_stream_destroy(&wal_stream))
+ diag_raise();
/*
* We must enable WAL before finalizing engine recovery,
* because an engine may start writing to WAL right after
diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result
index 1e62680eb..e209374b6 100644
--- a/test/xlog/panic_on_broken_lsn.result
+++ b/test/xlog/panic_on_broken_lsn.result
@@ -141,8 +141,11 @@ test_run:cmd("setopt delimiter ';'")
_ = fiber.create(function()
test_run:wait_cond(function() return box.info.replication[2] ~= nil end)
lsn = box.info.vclock[1]
- box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+ box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 2)
+ box.begin()
box.space.test:auto_increment{'v1'}
+ box.space.test:auto_increment{'v1'}
+ box.commit()
box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
end);
---
@@ -164,9 +167,9 @@ box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
- ok
...
-- Check that log contains the mention of broken LSN and the request printout
-grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn)
+grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn + 1)
---
-- '{type: ''INSERT'', replica_id: 1, space_id: 9000, index_id: 0, tuple: [2, "v1"]}'
+- '{type: ''INSERT'', replica_id: 1, space_id: 9000, index_id: 0, tuple: [3, "v1"]}'
...
test_run:cmd('cleanup server replica')
---
diff --git a/test/xlog/panic_on_broken_lsn.test.lua b/test/xlog/panic_on_broken_lsn.test.lua
index 80cccd918..a1d62cee5 100644
--- a/test/xlog/panic_on_broken_lsn.test.lua
+++ b/test/xlog/panic_on_broken_lsn.test.lua
@@ -67,8 +67,11 @@ test_run:cmd("setopt delimiter ';'")
_ = fiber.create(function()
test_run:wait_cond(function() return box.info.replication[2] ~= nil end)
lsn = box.info.vclock[1]
- box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
+ box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 2)
+ box.begin()
box.space.test:auto_increment{'v1'}
+ box.space.test:auto_increment{'v1'}
+ box.commit()
box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
end);
test_run:cmd("setopt delimiter ''");
@@ -78,7 +81,7 @@ test_run:cmd('start server replica with crash_expected=True')
box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
-- Check that log contains the mention of broken LSN and the request printout
-grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn)
+grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn + 1)
test_run:cmd('cleanup server replica')
test_run:cmd('delete server replica')
--
2.24.0
More information about the Tarantool-patches
mailing list