From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp45.i.mail.ru (smtp45.i.mail.ru [94.100.177.105]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 13A6243D679 for ; Tue, 19 Nov 2019 19:05:04 +0300 (MSK) From: Georgy Kirichenko Date: Tue, 19 Nov 2019 19:04:57 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH 6/6] recovery: follow transaction boundaries while recovery or join List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Do not start a transaction for each local journal or final join row but follow transaction boundaries instead. Part of #980 --- src/box/applier.cc | 92 +++++++++++++------------- src/box/box.cc | 72 ++++++++++++++------ test/xlog/panic_on_broken_lsn.result | 9 ++- test/xlog/panic_on_broken_lsn.test.lua | 7 +- 4 files changed, 107 insertions(+), 73 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 294765195..d00b1b04a 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -268,23 +268,6 @@ apply_row(struct xrow_header *row) return 0; } -static int -apply_final_join_row(struct xrow_header *row) -{ - struct txn *txn = txn_begin(); - if (txn == NULL) - return -1; - if (apply_row(row) != 0) { - txn_rollback(txn); - fiber_gc(); - return -1; - } - if (txn_commit(txn) != 0) - return -1; - fiber_gc(); - return 0; -} - /** * Connect to a remote host and authenticate the client. */ @@ -391,6 +374,22 @@ done: applier_set_state(applier, APPLIER_READY); } +/** + * A helper struct to link xrow objects in a list. + */ +struct applier_tx_row { + /* Next transaction row. */ + struct stailq_entry next; + /* xrow_header struct for the current transaction row. */ + struct xrow_header row; +}; + +static void +applier_read_tx(struct applier *applier, struct stailq *rows); + +static int +applier_apply_tx(struct stailq *rows); + /** * Execute and process JOIN request (bootstrap the instance). */ @@ -478,27 +477,29 @@ applier_join(struct applier *applier) * Receive final data. */ while (true) { - if (coio_read_xrow(coio, ibuf, &row) < 0) - diag_raise(); - applier->last_row_time = ev_monotonic_now(loop()); - if (iproto_type_is_dml(row.type)) { - vclock_follow_xrow(&replicaset.vclock, &row); - if (apply_final_join_row(&row) != 0) - diag_raise(); - if (++row_count % 100000 == 0) - say_info("%.1fM rows received", row_count / 1e6); - } else if (row.type == IPROTO_OK) { - /* - * Current vclock. This is not used now, - * ignore. - */ + struct stailq rows; + applier_read_tx(applier, &rows); + struct xrow_header *first_row = + &(stailq_first_entry(&rows, struct applier_tx_row, + next)->row); + if (first_row->type == IPROTO_OK) { + if (applier->version_id < version_id(1, 7, 0)) { + /* + * This is the start vclock if the + * server is 1.6. Since we have + * not initialized replication + * vclock yet, do it now. In 1.7+ + * this vclock is not used. + */ + xrow_decode_vclock_xc(first_row, &replicaset.vclock); + } break; /* end of stream */ - } else if (iproto_type_is_error(row.type)) { - xrow_decode_error_xc(&row); /* rethrow error */ - } else { - tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, - (uint32_t) row.type); } + if (applier_apply_tx(&rows) != 0) + diag_raise(); + if (ibuf_used(ibuf) == 0) + ibuf_reset(ibuf); + fiber_gc(); } say_info("final data received"); @@ -506,16 +507,6 @@ applier_join(struct applier *applier) applier_set_state(applier, APPLIER_READY); } -/** - * A helper struct to link xrow objects in a list. - */ -struct applier_tx_row { - /* Next transaction row. */ - struct stailq_entry next; - /* xrow_header struct for the current transaction row. */ - struct xrow_header row; -}; - static struct applier_tx_row * applier_read_tx_row(struct applier *applier) { @@ -532,6 +523,9 @@ applier_read_tx_row(struct applier *applier) struct xrow_header *row = &tx_row->row; double timeout = replication_disconnect_timeout(); + /* We check timeout only in case of subscribe. */ + if (applier->state == APPLIER_FINAL_JOIN) + timeout = TIMEOUT_INFINITY; /* * Tarantool < 1.7.7 does not send periodic heartbeat * messages so we can't assume that if we haven't heard @@ -568,6 +562,12 @@ applier_read_tx(struct applier *applier, struct stailq *rows) struct applier_tx_row *tx_row = applier_read_tx_row(applier); struct xrow_header *row = &tx_row->row; + if (row->type == IPROTO_OK) { + stailq_add_tail(rows, &tx_row->next); + assert(tx_row->row.is_commit); + break; + } + if (iproto_type_is_error(row->type)) xrow_decode_error_xc(row); diff --git a/src/box/box.cc b/src/box/box.cc index 71822551e..9464eee63 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -286,6 +286,8 @@ struct wal_stream { struct xstream base; /** How many rows have been recovered so far. */ size_t rows; + /** Current transaction.*/ + struct txn *txn; }; /** @@ -321,7 +323,8 @@ recovery_journal_write(struct journal *base, } static inline void -recovery_journal_create(struct recovery_journal *journal, const struct vclock *v) +recovery_journal_create(struct recovery_journal *journal, + const struct vclock *v) { journal_create(&journal->base, recovery_journal_write, NULL); vclock_copy(&journal->vclock, v); @@ -330,33 +333,44 @@ recovery_journal_create(struct recovery_journal *journal, const struct vclock *v static int apply_wal_row(struct xstream *stream, struct xrow_header *row) { + struct wal_stream *wal_stream = + container_of(stream, struct wal_stream, base); + if (wal_stream->txn == NULL) { + wal_stream->txn = txn_begin(); + if (wal_stream->txn == NULL) + return -1; + } struct request request; xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); + int rc = 0; if (request.type != IPROTO_NOP) { struct space *space = space_cache_find_xc(request.space_id); - if (box_process_rw(&request, space, NULL) != 0) { + rc = box_process_rw(&request, space, NULL); + if (rc != 0) say_error("error applying row: %s", request_str(&request)); - return -1; - } } else { - struct txn *txn = txn_begin(); - if (txn == NULL || txn_begin_stmt(txn, NULL) != 0 || - txn_commit_stmt(txn, &request) != 0) { - txn_rollback(txn); + struct txn *txn = in_txn(); + rc = txn_begin_stmt(txn, NULL); + if (rc == 0) + rc = txn_commit_stmt(txn, &request); + } + if (row->is_commit) { + if (txn_commit(wal_stream->txn) != 0) { + wal_stream->txn = NULL; return -1; } - if (txn_commit(txn) != 0) - return -1; + wal_stream->txn = NULL; } - struct wal_stream *xstream = - container_of(stream, struct wal_stream, base); /** * Yield once in a while, but not too often, * mostly to allow signal handling to take place. */ - if (++xstream->rows % WAL_ROWS_PER_YIELD == 0) + if (++(wal_stream->rows) > WAL_ROWS_PER_YIELD && + wal_stream->txn == NULL) { + wal_stream->rows -= WAL_ROWS_PER_YIELD; fiber_sleep(0); - return 0; + } + return rc; } static void @@ -364,6 +378,21 @@ wal_stream_create(struct wal_stream *ctx) { xstream_create(&ctx->base, apply_wal_row); ctx->rows = 0; + ctx->txn = NULL; +} + +static int +wal_stream_destroy(struct wal_stream *ctx) +{ + if (ctx->txn != NULL) { + /* The last processed row does not have a commit flag set. */ + txn_rollback(ctx->txn); + ctx->txn = NULL; + diag_set(ClientError, ER_UNSUPPORTED, + "recovery", "not finished transactions"); + return -1; + } + return 0; } /* {{{ configuration bindings */ @@ -1917,6 +1946,9 @@ local_recovery(const struct tt_uuid *instance_uuid, struct wal_stream wal_stream; wal_stream_create(&wal_stream); + auto wal_stream_guard = make_scoped_guard([&]{ + wal_stream_destroy(&wal_stream); + }); struct recovery *recovery; recovery = recovery_new(cfg_gets("wal_dir"), @@ -1925,13 +1957,7 @@ local_recovery(const struct tt_uuid *instance_uuid, if (recovery == NULL) diag_raise(); - /* - * Make sure we report the actual recovery position - * in box.info while local recovery is in progress. - */ - box_vclock = &recovery->vclock; auto guard = make_scoped_guard([&]{ - box_vclock = &replicaset.vclock; recovery_stop_local(recovery); recovery_delete(recovery); }); @@ -1981,12 +2007,12 @@ local_recovery(const struct tt_uuid *instance_uuid, */ memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock); - vclock_copy(&replicaset.vclock, checkpoint_vclock); struct recovery_journal journal; recovery_journal_create(&journal, &recovery->vclock); journal_set(&journal.base); engine_begin_final_recovery_xc(); + if (recover_remaining_wals(recovery, &wal_stream.base, NULL, false) != 0) diag_raise(); engine_end_recovery_xc(); @@ -2015,7 +2041,9 @@ local_recovery(const struct tt_uuid *instance_uuid, box_sync_replication(false); } recovery_finalize(recovery); - + wal_stream_guard.is_active = false; + if (wal_stream_destroy(&wal_stream)) + diag_raise(); /* * We must enable WAL before finalizing engine recovery, * because an engine may start writing to WAL right after diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result index 1e62680eb..e209374b6 100644 --- a/test/xlog/panic_on_broken_lsn.result +++ b/test/xlog/panic_on_broken_lsn.result @@ -141,8 +141,11 @@ test_run:cmd("setopt delimiter ';'") _ = fiber.create(function() test_run:wait_cond(function() return box.info.replication[2] ~= nil end) lsn = box.info.vclock[1] - box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1) + box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 2) + box.begin() box.space.test:auto_increment{'v1'} + box.space.test:auto_increment{'v1'} + box.commit() box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false) end); --- @@ -164,9 +167,9 @@ box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1) - ok ... -- Check that log contains the mention of broken LSN and the request printout -grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn) +grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn + 1) --- -- '{type: ''INSERT'', replica_id: 1, space_id: 9000, index_id: 0, tuple: [2, "v1"]}' +- '{type: ''INSERT'', replica_id: 1, space_id: 9000, index_id: 0, tuple: [3, "v1"]}' ... test_run:cmd('cleanup server replica') --- diff --git a/test/xlog/panic_on_broken_lsn.test.lua b/test/xlog/panic_on_broken_lsn.test.lua index 80cccd918..a1d62cee5 100644 --- a/test/xlog/panic_on_broken_lsn.test.lua +++ b/test/xlog/panic_on_broken_lsn.test.lua @@ -67,8 +67,11 @@ test_run:cmd("setopt delimiter ';'") _ = fiber.create(function() test_run:wait_cond(function() return box.info.replication[2] ~= nil end) lsn = box.info.vclock[1] - box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1) + box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 2) + box.begin() box.space.test:auto_increment{'v1'} + box.space.test:auto_increment{'v1'} + box.commit() box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false) end); test_run:cmd("setopt delimiter ''"); @@ -78,7 +81,7 @@ test_run:cmd('start server replica with crash_expected=True') box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1) -- Check that log contains the mention of broken LSN and the request printout -grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn) +grep_broken_lsn(fio.pathjoin(fio.cwd(), 'replica.log'), lsn + 1) test_run:cmd('cleanup server replica') test_run:cmd('delete server replica') -- 2.24.0