[Tarantool-patches] [PATCH 6/6] recovery: follow transaction boundaries while recovery or join
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Sat Nov 23 16:46:04 MSK 2019
Thanks for the patch!
See 13 comments below.
On 19/11/2019 17:04, Georgy Kirichenko wrote:
> Do not start a transaction for each local journal or final join row
> but follow transaction boundaries instead.
1. Why don't we already have boundaries on join? I thought,
that transactional replication is done.
>
> Part of #980
> ---
> src/box/applier.cc | 92 +++++++++++++-------------
> src/box/box.cc | 72 ++++++++++++++------
> test/xlog/panic_on_broken_lsn.result | 9 ++-
> test/xlog/panic_on_broken_lsn.test.lua | 7 +-
> 4 files changed, 107 insertions(+), 73 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 294765195..d00b1b04a 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -478,27 +477,29 @@ applier_join(struct applier *applier)
> * Receive final data.
> */
> while (true) {
> - if (coio_read_xrow(coio, ibuf, &row) < 0)
> - diag_raise();
> - applier->last_row_time = ev_monotonic_now(loop());
> - if (iproto_type_is_dml(row.type)) {
> - vclock_follow_xrow(&replicaset.vclock, &row);
> - if (apply_final_join_row(&row) != 0)
> - diag_raise();
> - if (++row_count % 100000 == 0)
> - say_info("%.1fM rows received", row_count / 1e6);
> - } else if (row.type == IPROTO_OK) {
> - /*
> - * Current vclock. This is not used now,
> - * ignore.
> - */
> + struct stailq rows;
> + applier_read_tx(applier, &rows);
> + struct xrow_header *first_row =
> + &(stailq_first_entry(&rows, struct applier_tx_row,
> + next)->row);
> + if (first_row->type == IPROTO_OK) {
> + if (applier->version_id < version_id(1, 7, 0)) {
> + /*
> + * This is the start vclock if the
> + * server is 1.6. Since we have
> + * not initialized replication
> + * vclock yet, do it now. In 1.7+
> + * this vclock is not used.
> + */
> + xrow_decode_vclock_xc(first_row, &replicaset.vclock);
> + }
> break; /* end of stream */
> - } else if (iproto_type_is_error(row.type)) {
> - xrow_decode_error_xc(&row); /* rethrow error */
> - } else {
> - tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
> - (uint32_t) row.type);
> }
> + if (applier_apply_tx(&rows) != 0)
> + diag_raise();
> + if (ibuf_used(ibuf) == 0)
> + ibuf_reset(ibuf);
2. I propose you to move ibuf reset and fiber_gc() into
applier_apply_tx(). This looks confusing, that the cycle does
not touch ibuf nor fiber->gc directly anywhere, but you reset
it each iteration.
And why do you reset it only when used == 0? It is filled
with transaction's data, right? After applier_apply_tx() the
data should not be needed anymore, regardless of what is in
the ibuf.
> + fiber_gc();
> }
> say_info("final data received");
>
> @@ -532,6 +523,9 @@ applier_read_tx_row(struct applier *applier)
> struct xrow_header *row = &tx_row->row;
>
> double timeout = replication_disconnect_timeout();
> + /* We check timeout only in case of subscribe. */
> + if (applier->state == APPLIER_FINAL_JOIN)
> + timeout = TIMEOUT_INFINITY;
3. Why? And how is it related to transactionality of
recovery and join?
> /*
> * Tarantool < 1.7.7 does not send periodic heartbeat
> * messages so we can't assume that if we haven't heard
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 71822551e..9464eee63 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -286,6 +286,8 @@ struct wal_stream {
> struct xstream base;
> /** How many rows have been recovered so far. */
> size_t rows;
> + /** Current transaction.*/
4. Please, put a whitespace before '*/'.
> + struct txn *txn;
> };
>
> /**
> @@ -321,7 +323,8 @@ recovery_journal_write(struct journal *base,
> }
>
> static inline void
> -recovery_journal_create(struct recovery_journal *journal, const struct vclock *v)
> +recovery_journal_create(struct recovery_journal *journal,
> + const struct vclock *v)
5. Unnecessary change.
> {
> journal_create(&journal->base, recovery_journal_write, NULL);
> vclock_copy(&journal->vclock, v);
> @@ -330,33 +333,44 @@ recovery_journal_create(struct recovery_journal *journal, const struct vclock *v
> static int
> apply_wal_row(struct xstream *stream, struct xrow_header *row)
> {
> + struct wal_stream *wal_stream =
> + container_of(stream, struct wal_stream, base);
> + if (wal_stream->txn == NULL) {
> + wal_stream->txn = txn_begin();
> + if (wal_stream->txn == NULL)
> + return -1;
> + }
> struct request request;
> xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
> + int rc = 0;
> if (request.type != IPROTO_NOP) {
> struct space *space = space_cache_find_xc(request.space_id);
> - if (box_process_rw(&request, space, NULL) != 0) {
> + rc = box_process_rw(&request, space, NULL);
> + if (rc != 0)
> say_error("error applying row: %s", request_str(&request));
> - return -1;
6. So now you can apply a row, get an error on that,
then you will commit the broken transaction below.
Please, don't.
> - }
> } else {
> - struct txn *txn = txn_begin();
> - if (txn == NULL || txn_begin_stmt(txn, NULL) != 0 ||
> - txn_commit_stmt(txn, &request) != 0) {
> - txn_rollback(txn);
> + struct txn *txn = in_txn();
> + rc = txn_begin_stmt(txn, NULL);
7. Why do you need this empty statement?
> + if (rc == 0)
> + rc = txn_commit_stmt(txn, &request);
> + }
> + if (row->is_commit) {
> + if (txn_commit(wal_stream->txn) != 0) {
> + wal_stream->txn = NULL;
> return -1;
> }
> - if (txn_commit(txn) != 0)
> - return -1;
> + wal_stream->txn = NULL;
> }
> - struct wal_stream *xstream =
> - container_of(stream, struct wal_stream, base);
> /**
> * Yield once in a while, but not too often,
> * mostly to allow signal handling to take place.
> */
> - if (++xstream->rows % WAL_ROWS_PER_YIELD == 0)
> + if (++(wal_stream->rows) > WAL_ROWS_PER_YIELD &&
> + wal_stream->txn == NULL) {
> + wal_stream->rows -= WAL_ROWS_PER_YIELD;
8. Why did you change this?
> fiber_sleep(0);
> - return 0;
> + }
> + return rc;
> }
>
> static void
> @@ -1917,6 +1946,9 @@ local_recovery(const struct tt_uuid *instance_uuid,
>
> struct wal_stream wal_stream;
> wal_stream_create(&wal_stream);
> + auto wal_stream_guard = make_scoped_guard([&]{
> + wal_stream_destroy(&wal_stream);
9. You ignore wal_stream_destroy() error here.
> + });
>
> struct recovery *recovery;
> recovery = recovery_new(cfg_gets("wal_dir"),
> @@ -1925,13 +1957,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
> if (recovery == NULL)
> diag_raise();
>
> - /*
> - * Make sure we report the actual recovery position
> - * in box.info while local recovery is in progress.
> - */
> - box_vclock = &recovery->vclock;
10. So now you don't report it?
> auto guard = make_scoped_guard([&]{
> - box_vclock = &replicaset.vclock;
> recovery_stop_local(recovery);
> recovery_delete(recovery);
> });
> @@ -1981,12 +2007,12 @@ local_recovery(const struct tt_uuid *instance_uuid,
> */
> memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock);
>
> - vclock_copy(&replicaset.vclock, checkpoint_vclock);
> struct recovery_journal journal;
> recovery_journal_create(&journal, &recovery->vclock);
> journal_set(&journal.base);
>
> engine_begin_final_recovery_xc();
> +
11. Unnecessary change.
> diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result
> index 1e62680eb..e209374b6 100644
> --- a/test/xlog/panic_on_broken_lsn.result
> +++ b/test/xlog/panic_on_broken_lsn.result
> @@ -141,8 +141,11 @@ test_run:cmd("setopt delimiter ';'")
> _ = fiber.create(function()
> test_run:wait_cond(function() return box.info.replication[2] ~= nil end)
> lsn = box.info.vclock[1]
> - box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
> + box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 2)
> + box.begin()
> box.space.test:auto_increment{'v1'}
> + box.space.test:auto_increment{'v1'}
> + box.commit()
12. Why?
If you want to test transactional recovery/join, then please,
add a new test case. Try not to change the old ones except
when they get broken, or a change is very trivial.
If this is not a test for transactional recovery/join, then
please, add one. I think this is testable, since it is not
just refactoring.
> box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
> end);
> ---
> @@ -164,9 +167,9 @@ box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
> - ok
> ...
> -- Check that log contains the mention of broken LSN and the request printout
> -grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn)
> +grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn + 1)
13. Why +1?
More information about the Tarantool-patches
mailing list