From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp63.i.mail.ru (smtp63.i.mail.ru [217.69.128.43]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 67D0E4696C3 for ; Sat, 23 Nov 2019 16:39:24 +0300 (MSK) References: From: Vladislav Shpilevoy Message-ID: <16fc5166-8ff5-54f7-efa7-14cecf55916e@tarantool.org> Date: Sat, 23 Nov 2019 14:46:04 +0100 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 7bit Subject: Re: [Tarantool-patches] [PATCH 6/6] recovery: follow transaction boundaries while recovery or join List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Georgy Kirichenko , tarantool-patches@dev.tarantool.org Thanks for the patch! See 13 comments below. On 19/11/2019 17:04, Georgy Kirichenko wrote: > Do not start a transaction for each local journal or final join row > but follow transaction boundaries instead. 1. Why don't we already have boundaries on join? I thought, that transactional replication is done. > > Part of #980 > --- > src/box/applier.cc | 92 +++++++++++++------------- > src/box/box.cc | 72 ++++++++++++++------ > test/xlog/panic_on_broken_lsn.result | 9 ++- > test/xlog/panic_on_broken_lsn.test.lua | 7 +- > 4 files changed, 107 insertions(+), 73 deletions(-) > > diff --git a/src/box/applier.cc b/src/box/applier.cc > index 294765195..d00b1b04a 100644 > --- a/src/box/applier.cc > +++ b/src/box/applier.cc > @@ -478,27 +477,29 @@ applier_join(struct applier *applier) > * Receive final data. > */ > while (true) { > - if (coio_read_xrow(coio, ibuf, &row) < 0) > - diag_raise(); > - applier->last_row_time = ev_monotonic_now(loop()); > - if (iproto_type_is_dml(row.type)) { > - vclock_follow_xrow(&replicaset.vclock, &row); > - if (apply_final_join_row(&row) != 0) > - diag_raise(); > - if (++row_count % 100000 == 0) > - say_info("%.1fM rows received", row_count / 1e6); > - } else if (row.type == IPROTO_OK) { > - /* > - * Current vclock. This is not used now, > - * ignore. > - */ > + struct stailq rows; > + applier_read_tx(applier, &rows); > + struct xrow_header *first_row = > + &(stailq_first_entry(&rows, struct applier_tx_row, > + next)->row); > + if (first_row->type == IPROTO_OK) { > + if (applier->version_id < version_id(1, 7, 0)) { > + /* > + * This is the start vclock if the > + * server is 1.6. Since we have > + * not initialized replication > + * vclock yet, do it now. In 1.7+ > + * this vclock is not used. > + */ > + xrow_decode_vclock_xc(first_row, &replicaset.vclock); > + } > break; /* end of stream */ > - } else if (iproto_type_is_error(row.type)) { > - xrow_decode_error_xc(&row); /* rethrow error */ > - } else { > - tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, > - (uint32_t) row.type); > } > + if (applier_apply_tx(&rows) != 0) > + diag_raise(); > + if (ibuf_used(ibuf) == 0) > + ibuf_reset(ibuf); 2. I propose you to move ibuf reset and fiber_gc() into applier_apply_tx(). This looks confusing, that the cycle does not touch ibuf nor fiber->gc directly anywhere, but you reset it each iteration. And why do you reset it only when used == 0? It is filled with transaction's data, right? After applier_apply_tx() the data should not be needed anymore, regardless of what is in the ibuf. > + fiber_gc(); > } > say_info("final data received"); > > @@ -532,6 +523,9 @@ applier_read_tx_row(struct applier *applier) > struct xrow_header *row = &tx_row->row; > > double timeout = replication_disconnect_timeout(); > + /* We check timeout only in case of subscribe. */ > + if (applier->state == APPLIER_FINAL_JOIN) > + timeout = TIMEOUT_INFINITY; 3. Why? And how is it related to transactionality of recovery and join? > /* > * Tarantool < 1.7.7 does not send periodic heartbeat > * messages so we can't assume that if we haven't heard > diff --git a/src/box/box.cc b/src/box/box.cc > index 71822551e..9464eee63 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -286,6 +286,8 @@ struct wal_stream { > struct xstream base; > /** How many rows have been recovered so far. */ > size_t rows; > + /** Current transaction.*/ 4. Please, put a whitespace before '*/'. > + struct txn *txn; > }; > > /** > @@ -321,7 +323,8 @@ recovery_journal_write(struct journal *base, > } > > static inline void > -recovery_journal_create(struct recovery_journal *journal, const struct vclock *v) > +recovery_journal_create(struct recovery_journal *journal, > + const struct vclock *v) 5. Unnecessary change. > { > journal_create(&journal->base, recovery_journal_write, NULL); > vclock_copy(&journal->vclock, v); > @@ -330,33 +333,44 @@ recovery_journal_create(struct recovery_journal *journal, const struct vclock *v > static int > apply_wal_row(struct xstream *stream, struct xrow_header *row) > { > + struct wal_stream *wal_stream = > + container_of(stream, struct wal_stream, base); > + if (wal_stream->txn == NULL) { > + wal_stream->txn = txn_begin(); > + if (wal_stream->txn == NULL) > + return -1; > + } > struct request request; > xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); > + int rc = 0; > if (request.type != IPROTO_NOP) { > struct space *space = space_cache_find_xc(request.space_id); > - if (box_process_rw(&request, space, NULL) != 0) { > + rc = box_process_rw(&request, space, NULL); > + if (rc != 0) > say_error("error applying row: %s", request_str(&request)); > - return -1; 6. So now you can apply a row, get an error on that, then you will commit the broken transaction below. Please, don't. > - } > } else { > - struct txn *txn = txn_begin(); > - if (txn == NULL || txn_begin_stmt(txn, NULL) != 0 || > - txn_commit_stmt(txn, &request) != 0) { > - txn_rollback(txn); > + struct txn *txn = in_txn(); > + rc = txn_begin_stmt(txn, NULL); 7. Why do you need this empty statement? > + if (rc == 0) > + rc = txn_commit_stmt(txn, &request); > + } > + if (row->is_commit) { > + if (txn_commit(wal_stream->txn) != 0) { > + wal_stream->txn = NULL; > return -1; > } > - if (txn_commit(txn) != 0) > - return -1; > + wal_stream->txn = NULL; > } > - struct wal_stream *xstream = > - container_of(stream, struct wal_stream, base); > /** > * Yield once in a while, but not too often, > * mostly to allow signal handling to take place. > */ > - if (++xstream->rows % WAL_ROWS_PER_YIELD == 0) > + if (++(wal_stream->rows) > WAL_ROWS_PER_YIELD && > + wal_stream->txn == NULL) { > + wal_stream->rows -= WAL_ROWS_PER_YIELD; 8. Why did you change this? > fiber_sleep(0); > - return 0; > + } > + return rc; > } > > static void > @@ -1917,6 +1946,9 @@ local_recovery(const struct tt_uuid *instance_uuid, > > struct wal_stream wal_stream; > wal_stream_create(&wal_stream); > + auto wal_stream_guard = make_scoped_guard([&]{ > + wal_stream_destroy(&wal_stream); 9. You ignore wal_stream_destroy() error here. > + }); > > struct recovery *recovery; > recovery = recovery_new(cfg_gets("wal_dir"), > @@ -1925,13 +1957,7 @@ local_recovery(const struct tt_uuid *instance_uuid, > if (recovery == NULL) > diag_raise(); > > - /* > - * Make sure we report the actual recovery position > - * in box.info while local recovery is in progress. > - */ > - box_vclock = &recovery->vclock; 10. So now you don't report it? > auto guard = make_scoped_guard([&]{ > - box_vclock = &replicaset.vclock; > recovery_stop_local(recovery); > recovery_delete(recovery); > }); > @@ -1981,12 +2007,12 @@ local_recovery(const struct tt_uuid *instance_uuid, > */ > memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock); > > - vclock_copy(&replicaset.vclock, checkpoint_vclock); > struct recovery_journal journal; > recovery_journal_create(&journal, &recovery->vclock); > journal_set(&journal.base); > > engine_begin_final_recovery_xc(); > + 11. Unnecessary change. > diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result > index 1e62680eb..e209374b6 100644 > --- a/test/xlog/panic_on_broken_lsn.result > +++ b/test/xlog/panic_on_broken_lsn.result > @@ -141,8 +141,11 @@ test_run:cmd("setopt delimiter ';'") > _ = fiber.create(function() > test_run:wait_cond(function() return box.info.replication[2] ~= nil end) > lsn = box.info.vclock[1] > - box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1) > + box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 2) > + box.begin() > box.space.test:auto_increment{'v1'} > + box.space.test:auto_increment{'v1'} > + box.commit() 12. Why? If you want to test transactional recovery/join, then please, add a new test case. Try not to change the old ones except when they get broken, or a change is very trivial. If this is not a test for transactional recovery/join, then please, add one. I think this is testable, since it is not just refactoring. > box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false) > end); > --- > @@ -164,9 +167,9 @@ box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1) > - ok > ... > -- Check that log contains the mention of broken LSN and the request printout > -grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn) > +grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn + 1) 13. Why +1?