From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id ABDC86EC5E; Sat, 3 Apr 2021 16:18:19 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org ABDC86EC5E DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1617455899; bh=Sg26ON9eg0jHjNE9S84JiTM5ZllhztFQV8wdg3agTSI=; h=To:References:Date:In-Reply-To:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To: From; b=DvhoVLtatlblKLuYCu7j7yQ58CoFHEsfiS4W1r4Zo+gXf1FN3sOl2APxQ9cA0BMNi Q4yp1m0qZ+OeQqYxNvZFIAHO3UCMgWS+JWzf8bV1uP94FOi2kbdO6GcoohpRoSqhlp vaPW9wQ+T1SyTiJ081FTbDiocN4p4Qruey1nzTzg= Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 618B46EC5D for ; Sat, 3 Apr 2021 16:18:18 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 618B46EC5D Received: by smtpng1.m.smailru.net with esmtpa (envelope-from ) id 1lSgAK-0006GI-AQ; Sat, 03 Apr 2021 16:18:17 +0300 To: Serge Petrenko , tarantool-patches@dev.tarantool.org, gorcunov@gmail.com, korablev@tarantool.org References: <9c9b3077-0579-d57b-c829-242d6fdd2a3c@tarantool.org> Message-ID: <0bce3f47-8305-b7ee-df91-28620c887433@tarantool.org> Date: Sat, 3 Apr 2021 15:18:15 +0200 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:78.0) Gecko/20100101 Thunderbird/78.9.0 MIME-Version: 1.0 In-Reply-To: <9c9b3077-0579-d57b-c829-242d6fdd2a3c@tarantool.org> Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 7bit X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD9ED7173E37F4E32947287414FD1D04A09E656A5F3377C994A182A05F538085040658F5589E67D46206F299472106A5CFA5C070D78102804BC2FE7139F92FCBEC4 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE759251087BE69EEF7EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637889C00975665ABF68638F802B75D45FF914D58D5BE9E6BC131B5C99E7648C95C7B5A45DDF210A4CF63025F9E746B291D247CC68C3562C0BDA471835C12D1D9774AD6D5ED66289B5278DA827A17800CE71AE4D56B06699BBC9FA2833FD35BB23D2EF20D2F80756B5F868A13BD56FB6657A471835C12D1D977725E5C173C3A84C3A12191B5F2BB8629117882F4460429728AD0CFFFB425014E868A13BD56FB6657E2021AF6380DFAD1A18204E546F3947CB11811A4A51E3B096D1867E19FE1407959CC434672EE6371089D37D7C0E48F6C8AA50765F7900637B5EAED125435346EEFF80C71ABB335746BA297DBC24807EABDAD6C7F3747799A X-C1DE0DAB: 0D63561A33F958A501037277922DD485F89C31EC995AFF98E11585DDB48D7E8CD59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA7502E6951B79FF9A3F410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D341C998A3771F04153423D8558013F23E08C1A56C68287B9DAD54F12F2E777AF14F7AEF5F3609B3AF91D7E09C32AA3244C96873A9D154F73B65C822466CE724A9A3FD9C8CA1B0515E0927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojB918AwkVmI0DHxGMtNhMcQ== X-Mailru-Sender: 689FA8AB762F73936BC43F508A06382226C80955E99973032776565E9089FACB3841015FED1DE5223CC9A89AB576DD93FB559BB5D741EB963CF37A108A312F5C27E8A8C3839CE0E267EA787935ED9F1B X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH 2/3] recovery: make it transactional X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladislav Shpilevoy via Tarantool-patches Reply-To: Vladislav Shpilevoy Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Hi! Thanks for the review! > I ran the tests locally and replication suite fails occasionally with > ``` > [009] 2021-04-02 14:28:32.925 [30257] main/103/master box.cc:539 E> error at request: {type: 'INSERT', replica_id: 0, lsn: 7, space_id: 517, index_id: 0, tuple: [21]} > [009] 2021-04-02 14:28:32.925 [30257] main/103/master box.cc:481 E> XlogError: found a first row in a transaction with LSN/TSN mismatch > [009] 2021-04-02 14:28:32.925 [30257] main/103/master F> can't initialize storage: found a first row in a transaction with LSN/TSN mismatch > [009] 2021-04-02 14:28:32.925 [30257] main/103/master F> can't initialize storage: found a first row in a transaction with LSN/TSN mismatch > ``` Thanks for noticing! There was a bug with transactions starting from a local row - its LSN is from vclock[0] while for global rows it is from vclock[instance_id > 0]. I fixed it by checking LSN == TSN only for the first global row. If all rows are local, I check LSN matches the first row's LSN. Here is the diff. ==================== diff --git a/src/box/box.cc b/src/box/box.cc index f70a2bd0e..67b44c053 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -338,12 +338,22 @@ struct wal_stream { struct xstream base; /** Current transaction ID. 0 when no transaction. */ int64_t tsn; + /** + * LSN of the first row saved to check TSN and LSN match in case all + * rows of the tx appeared to be local. + */ + int64_t first_row_lsn; /** * Flag whether there is a pending yield to do when the current * transaction is finished. It can't always be done right away because * would abort the current transaction if it is memtx. */ bool has_yield; + /** + * True if any row in the transaction was global. Saved to check if TSN + * matches LSN of a first global row. + */ + bool has_global_row; /** How many rows have been recovered so far. */ size_t rows; }; @@ -484,12 +494,9 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) diag_set(XlogError, "found a row without TSN"); goto end_diag_request; } - if (row->tsn != row->lsn) { - diag_set(XlogError, "found a first row in a " - "transaction with LSN/TSN mismatch"); - goto end_diag_request; - } stream->tsn = row->tsn; + stream->first_row_lsn = row->lsn; + stream->has_global_row = false; /* * Rows are not stacked into a list like during replication, * because recovery does not yield while reading the rows. All @@ -510,6 +517,15 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) } else { txn = in_txn(); } + /* Ensure TSN is equal to LSN of the first global row. */ + if (!stream->has_global_row && row->group_id != GROUP_LOCAL) { + if (row->tsn != row->lsn) { + diag_set(XlogError, "found a first global row in a " + "transaction with LSN/TSN mismatch"); + goto end_diag_request; + } + stream->has_global_row = true; + } assert(wal_stream_has_tx(stream)); /* Nops might appear at least after before_replace skipping rows. */ if (request.type != IPROTO_NOP) { @@ -526,8 +542,26 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) assert(txn != NULL); if (!row->is_commit) return 0; - + /* + * For fully local transactions the TSN check won't work like for global + * transactions, because it is not known if there are global rows until + * commit arrives. + */ + if (!stream->has_global_row && stream->tsn != stream->first_row_lsn) { + diag_set(XlogError, "fully local transaction's TSN does not " + "match LSN of the first row"); + return -1; + } stream->tsn = 0; + /* + * During local recovery the commit procedure should be async, otherwise + * the only fiber processing recovery will get stuck on the first + * synchronous tx it meets until confirm timeout is reached and the tx + * is rolled back, yielding an error. + * Moreover, txn_commit_try_async() doesn't hurt at all during local + * recovery, since journal_write is faked at this stage and returns + * immediately. + */ if (txn_commit_try_async(txn) != 0) { /* Commit fail automatically leads to rollback. */ assert(in_txn() == NULL); @@ -555,20 +589,15 @@ end_diag_request: static void wal_stream_try_yield(struct wal_stream *stream) { - bool needs_yield = (stream->rows % WAL_ROWS_PER_YIELD == 0); - if (wal_stream_has_tx(stream)) { - /* - * Save the yield. Otherwise it would happen only on rows which - * are a multiple of WAL_ROWS_PER_YIELD and are last in their - * transaction, which is probably a very rare coincidence. - */ - stream->has_yield = true; - return; - } - if (stream->has_yield) - stream->has_yield = false; - else if (!needs_yield) + /* + * Save the yield. Otherwise it would happen only on rows which + * are a multiple of WAL_ROWS_PER_YIELD and are last in their + * transaction, which is probably a very rare coincidence. + */ + stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0); + if (wal_stream_has_tx(stream) || !stream->has_yield) return; + stream->has_yield = false; fiber_sleep(0); } @@ -605,7 +634,9 @@ wal_stream_create(struct wal_stream *ctx) { xstream_create(&ctx->base, wal_stream_apply_row); ctx->tsn = 0; + ctx->first_row_lsn = 0; ctx->has_yield = false; + ctx->has_global_row = false; ctx->rows = 0; } diff --git a/test/replication/gh-5874-qsync-txn-recovery.result b/test/replication/gh-5874-qsync-txn-recovery.result index a64eadd9c..73f903ca7 100644 --- a/test/replication/gh-5874-qsync-txn-recovery.result +++ b/test/replication/gh-5874-qsync-txn-recovery.result @@ -95,6 +95,31 @@ box.commit() | --- | ... +-- +-- First row might be for a local space and its LSN won't match TSN. Need to be +-- ok with that. +-- +loc = box.schema.create_space('loc', {is_local = true, engine = engine}) + | --- + | ... +_ = loc:create_index('pk') + | --- + | ... +box.begin() \ +loc:replace{1} \ +async:replace{9} \ +box.commit() + | --- + | ... + +-- All local. +box.begin() \ +loc:replace{2} \ +loc:replace{3} \ +box.commit() + | --- + | ... + test_run:cmd('restart server default') | async = box.space.async @@ -103,12 +128,16 @@ async = box.space.async sync = box.space.sync | --- | ... +loc = box.space.loc + | --- + | ... async:select() | --- | - - [1] | - [3] | - [4] | - [6] + | - [9] | ... sync:select() | --- @@ -116,9 +145,18 @@ sync:select() | - [2] | - [3] | ... +loc:select() + | --- + | - - [1] + | - [2] + | - [3] + | ... async:drop() | --- | ... sync:drop() | --- | ... +loc:drop() + | --- + | ... diff --git a/test/replication/gh-5874-qsync-txn-recovery.test.lua b/test/replication/gh-5874-qsync-txn-recovery.test.lua index efcf727cc..f35eb68de 100644 --- a/test/replication/gh-5874-qsync-txn-recovery.test.lua +++ b/test/replication/gh-5874-qsync-txn-recovery.test.lua @@ -55,10 +55,30 @@ async:replace{8} do_skip = false \ box.commit() +-- +-- First row might be for a local space and its LSN won't match TSN. Need to be +-- ok with that. +-- +loc = box.schema.create_space('loc', {is_local = true, engine = engine}) +_ = loc:create_index('pk') +box.begin() \ +loc:replace{1} \ +async:replace{9} \ +box.commit() + +-- All local. +box.begin() \ +loc:replace{2} \ +loc:replace{3} \ +box.commit() + test_run:cmd('restart server default') async = box.space.async sync = box.space.sync +loc = box.space.loc async:select() sync:select() +loc:select() async:drop() sync:drop() +loc:drop() ==================== > Consider this diff. It looks simpler IMO, but feel free to ignore. Applied your diff, see above. The full patch: ==================== diff --git a/changelogs/unreleased/qsync-multi-statement-recovery b/changelogs/unreleased/qsync-multi-statement-recovery new file mode 100644 index 000000000..c902cbe24 --- /dev/null +++ b/changelogs/unreleased/qsync-multi-statement-recovery @@ -0,0 +1,5 @@ +## bugfix/replication + +* Fix recovery of a rolled back multi-statement synchronous transaction which + could lead to the transaction being applied partially, and to recovery errors. + It happened in case the transaction worked with non-sync spaces (gh-5874). diff --git a/src/box/box.cc b/src/box/box.cc index 4da274976..67b44c053 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -334,7 +334,26 @@ box_set_orphan(bool orphan) } struct wal_stream { + /** Base class. */ struct xstream base; + /** Current transaction ID. 0 when no transaction. */ + int64_t tsn; + /** + * LSN of the first row saved to check TSN and LSN match in case all + * rows of the tx appeared to be local. + */ + int64_t first_row_lsn; + /** + * Flag whether there is a pending yield to do when the current + * transaction is finished. It can't always be done right away because + * would abort the current transaction if it is memtx. + */ + bool has_yield; + /** + * True if any row in the transaction was global. Saved to check if TSN + * matches LSN of a first global row. + */ + bool has_global_row; /** How many rows have been recovered so far. */ size_t rows; }; @@ -379,47 +398,245 @@ recovery_journal_create(struct vclock *v) journal_set(&journal.base); } +/** + * Drop the stream to the initial state. It is supposed to be done when an error + * happens. Because in case of force recovery the stream will continue getting + * tuples. For that it must stay in a valid state and must handle them somehow. + * + * Now the stream simply drops the current transaction like it never happened, + * even if its commit-row wasn't met yet. Should be good enough for + * force-recovery when the consistency is already out of the game. + */ static void -apply_wal_row(struct xstream *stream, struct xrow_header *row) +wal_stream_abort(struct wal_stream *stream) +{ + struct txn *tx = in_txn(); + if (tx != NULL) + txn_rollback(tx); + stream->tsn = 0; + fiber_gc(); +} + +/** + * The wrapper exists only for the debug purposes, to ensure tsn being non-0 is + * in sync with the fiber's txn being non-NULL. It has nothing to do with the + * journal content, and therefore can use assertions instead of rigorous error + * checking even in release. + */ +static bool +wal_stream_has_tx(const struct wal_stream *stream) +{ + bool has = stream->tsn != 0; + assert(has == (in_txn() != NULL)); + return has; +} + +static int +wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row) +{ + assert(iproto_type_is_synchro_request(row->type)); + if (wal_stream_has_tx(stream)) { + diag_set(XlogError, "found synchro request in a transaction"); + return -1; + } + struct synchro_request syn_req; + if (xrow_decode_synchro(row, &syn_req) != 0) { + say_error("couldn't decode a synchro request"); + return -1; + } + txn_limbo_process(&txn_limbo, &syn_req); + return 0; +} + +static int +wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row) +{ + assert(iproto_type_is_raft_request(row->type)); + if (wal_stream_has_tx(stream)) { + diag_set(XlogError, "found raft request in a transaction"); + return -1; + } + struct raft_request raft_req; + /* Vclock is never persisted in WAL by Raft. */ + if (xrow_decode_raft(row, &raft_req, NULL) != 0) { + say_error("couldn't decode a raft request"); + return -1; + } + box_raft_recover(&raft_req); + return 0; +} + +/** + * Rows of the same transaction are wrapped into begin/commit. Mostly for the + * sake of synchronous replication, when the log can contain rolled back + * transactions, which must be entirely reverted during recovery when ROLLBACK + * records are met. Row-by-row recovery wouldn't work for multi-statement + * synchronous transactions. + */ +static int +wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) { struct request request; - if (iproto_type_is_synchro_request(row->type)) { - struct synchro_request syn_req; - if (xrow_decode_synchro(row, &syn_req) != 0) - diag_raise(); - txn_limbo_process(&txn_limbo, &syn_req); - return; + uint64_t req_type = dml_request_key_map(row->type); + if (xrow_decode_dml(row, &request, req_type) != 0) { + say_error("couldn't decode a DML request"); + return -1; } - if (iproto_type_is_raft_request(row->type)) { - struct raft_request raft_req; - /* Vclock is never persisted in WAL by Raft. */ - if (xrow_decode_raft(row, &raft_req, NULL) != 0) - diag_raise(); - box_raft_recover(&raft_req); - return; + /* + * Note that all the information which came from the log is validated + * and the errors are handled. Not asserted or paniced. That is for the + * sake of force recovery, which must be able to recover just everything + * what possible instead of terminating the instance. + */ + struct txn *txn; + if (stream->tsn == 0) { + if (row->tsn == 0) { + diag_set(XlogError, "found a row without TSN"); + goto end_diag_request; + } + stream->tsn = row->tsn; + stream->first_row_lsn = row->lsn; + stream->has_global_row = false; + /* + * Rows are not stacked into a list like during replication, + * because recovery does not yield while reading the rows. All + * the yields are controlled by the stream, and therefore no + * need to wait for all the rows to start a transaction. Can + * start now, apply the rows, and make a yield after commit if + * necessary. Helps to avoid a lot of copying. + */ + txn = txn_begin(); + if (txn == NULL) { + say_error("couldn't begin a recovery transaction"); + return -1; + } + } else if (row->tsn != stream->tsn) { + diag_set(XlogError, "found a next transaction with the " + "previous one not yet committed"); + goto end_diag_request; + } else { + txn = in_txn(); + } + /* Ensure TSN is equal to LSN of the first global row. */ + if (!stream->has_global_row && row->group_id != GROUP_LOCAL) { + if (row->tsn != row->lsn) { + diag_set(XlogError, "found a first global row in a " + "transaction with LSN/TSN mismatch"); + goto end_diag_request; + } + stream->has_global_row = true; } - xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); + assert(wal_stream_has_tx(stream)); + /* Nops might appear at least after before_replace skipping rows. */ if (request.type != IPROTO_NOP) { - struct space *space = space_cache_find_xc(request.space_id); + struct space *space = space_cache_find(request.space_id); + if (space == NULL) { + say_error("couldn't find space by ID"); + goto end_diag_request; + } if (box_process_rw(&request, space, NULL) != 0) { - say_error("error applying row: %s", request_str(&request)); - diag_raise(); + say_error("couldn't apply the request"); + goto end_diag_request; } } - struct wal_stream *xstream = - container_of(stream, struct wal_stream, base); - /** - * Yield once in a while, but not too often, - * mostly to allow signal handling to take place. + assert(txn != NULL); + if (!row->is_commit) + return 0; + /* + * For fully local transactions the TSN check won't work like for global + * transactions, because it is not known if there are global rows until + * commit arrives. */ - if (++xstream->rows % WAL_ROWS_PER_YIELD == 0) - fiber_sleep(0); + if (!stream->has_global_row && stream->tsn != stream->first_row_lsn) { + diag_set(XlogError, "fully local transaction's TSN does not " + "match LSN of the first row"); + return -1; + } + stream->tsn = 0; + /* + * During local recovery the commit procedure should be async, otherwise + * the only fiber processing recovery will get stuck on the first + * synchronous tx it meets until confirm timeout is reached and the tx + * is rolled back, yielding an error. + * Moreover, txn_commit_try_async() doesn't hurt at all during local + * recovery, since journal_write is faked at this stage and returns + * immediately. + */ + if (txn_commit_try_async(txn) != 0) { + /* Commit fail automatically leads to rollback. */ + assert(in_txn() == NULL); + say_error("couldn't commit a recovery transaction"); + return -1; + } + assert(in_txn() == NULL); + fiber_gc(); + return 0; + +end_diag_request: + /* + * The label must be used only for the errors related directly to the + * request. Errors like txn_begin() fail has nothing to do with it, and + * therefore don't log the request as the fault reason. + */ + say_error("error at request: %s", request_str(&request)); + return -1; +} + +/** + * Yield once in a while, but not too often, mostly to allow signal handling to + * take place. + */ +static void +wal_stream_try_yield(struct wal_stream *stream) +{ + /* + * Save the yield. Otherwise it would happen only on rows which + * are a multiple of WAL_ROWS_PER_YIELD and are last in their + * transaction, which is probably a very rare coincidence. + */ + stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0); + if (wal_stream_has_tx(stream) || !stream->has_yield) + return; + stream->has_yield = false; + fiber_sleep(0); +} + +static void +wal_stream_apply_row(struct xstream *base, struct xrow_header *row) +{ + struct wal_stream *stream = + container_of(base, struct wal_stream, base); + /* + * Account all rows, even non-DML, and even leading to an error. Because + * still need to yield sometimes. + */ + ++stream->rows; + if (iproto_type_is_synchro_request(row->type)) { + if (wal_stream_apply_synchro_row(stream, row) != 0) + goto end_error; + } else if (iproto_type_is_raft_request(row->type)) { + if (wal_stream_apply_raft_row(stream, row) != 0) + goto end_error; + } else if (wal_stream_apply_dml_row(stream, row) != 0) { + goto end_error; + } + wal_stream_try_yield(stream); + return; + +end_error: + wal_stream_abort(stream); + wal_stream_try_yield(stream); + diag_raise(); } static void wal_stream_create(struct wal_stream *ctx) { - xstream_create(&ctx->base, apply_wal_row); + xstream_create(&ctx->base, wal_stream_apply_row); + ctx->tsn = 0; + ctx->first_row_lsn = 0; + ctx->has_yield = false; + ctx->has_global_row = false; ctx->rows = 0; } @@ -2797,9 +3014,13 @@ local_recovery(const struct tt_uuid *instance_uuid, struct wal_stream wal_stream; wal_stream_create(&wal_stream); + auto stream_guard = make_scoped_guard([&]{ + wal_stream_abort(&wal_stream); + }); struct recovery *recovery; - recovery = recovery_new(wal_dir(), cfg_geti("force_recovery"), + bool is_force_recovery = cfg_geti("force_recovery"); + recovery = recovery_new(wal_dir(), is_force_recovery, checkpoint_vclock); /* @@ -2861,6 +3082,14 @@ local_recovery(const struct tt_uuid *instance_uuid, engine_begin_final_recovery_xc(); recover_remaining_wals(recovery, &wal_stream.base, NULL, false); + if (wal_stream_has_tx(&wal_stream)) { + wal_stream_abort(&wal_stream); + diag_set(XlogError, "found a not finished transaction " + "in the log"); + if (!is_force_recovery) + diag_raise(); + diag_log(); + } engine_end_recovery_xc(); /* * Leave hot standby mode, if any, only after @@ -2880,6 +3109,14 @@ local_recovery(const struct tt_uuid *instance_uuid, } recovery_stop_local(recovery); recover_remaining_wals(recovery, &wal_stream.base, NULL, true); + if (wal_stream_has_tx(&wal_stream)) { + wal_stream_abort(&wal_stream); + diag_set(XlogError, "found a not finished transaction " + "in the log in hot standby mode"); + if (!is_force_recovery) + diag_raise(); + diag_log(); + } /* * Advance replica set vclock to reflect records * applied in hot standby mode. @@ -2888,6 +3125,7 @@ local_recovery(const struct tt_uuid *instance_uuid, box_listen(); box_sync_replication(false); } + stream_guard.is_active = false; recovery_finalize(recovery); is_local_recovery = false; diff --git a/test/replication/gh-5874-qsync-txn-recovery.result b/test/replication/gh-5874-qsync-txn-recovery.result new file mode 100644 index 000000000..73f903ca7 --- /dev/null +++ b/test/replication/gh-5874-qsync-txn-recovery.result @@ -0,0 +1,162 @@ +-- test-run result file version 2 +test_run = require('test_run').new() + | --- + | ... +-- +-- gh-5874: synchronous transactions should be recovered as whole units, not row +-- by row. So as to be able to roll them back when ROLLBACK is recovered +-- afterwards. +-- +old_synchro_quorum = box.cfg.replication_synchro_quorum + | --- + | ... +old_synchro_timeout = box.cfg.replication_synchro_timeout + | --- + | ... +box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001} + | --- + | ... +engine = test_run:get_cfg('engine') + | --- + | ... +async = box.schema.create_space('async', {engine = engine}) + | --- + | ... +_ = async:create_index('pk') + | --- + | ... +sync = box.schema.create_space('sync', {is_sync = true, engine = engine}) + | --- + | ... +_ = sync:create_index('pk') + | --- + | ... + +-- The transaction fails, but is written to the log anyway. +box.begin() async:insert{1} sync:insert{1} box.commit() + | --- + | - error: Quorum collection for a synchronous transaction is timed out + | ... +-- Ok, the previous txn is rolled back. +_ = async:insert{1} + | --- + | ... +box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000} + | --- + | ... +_ = sync:insert{1} + | --- + | ... +-- Try multi-statement sync txn to see how it recovers. +box.begin() sync:insert{2} sync:insert{3} box.commit() + | --- + | ... + +-- See if NOP multi-statement recovery works fine. +-- +-- Start with NOP. +do_skip = false + | --- + | ... +_ = async:before_replace(function() \ + if do_skip then \ + return nil \ + end \ +end) + | --- + | ... +box.begin() \ +do_skip = true \ +async:replace{2} \ +do_skip = false \ +async:replace{3} \ +box.commit() + | --- + | ... + +-- NOP in the middle. +box.begin() \ +async:replace{4} \ +do_skip = true \ +async:replace{5} \ +do_skip = false \ +async:replace{6} \ +box.commit() + | --- + | ... + +-- All NOP. +box.begin() \ +do_skip = true \ +async:replace{7} \ +async:replace{8} \ +do_skip = false \ +box.commit() + | --- + | ... + +-- +-- First row might be for a local space and its LSN won't match TSN. Need to be +-- ok with that. +-- +loc = box.schema.create_space('loc', {is_local = true, engine = engine}) + | --- + | ... +_ = loc:create_index('pk') + | --- + | ... +box.begin() \ +loc:replace{1} \ +async:replace{9} \ +box.commit() + | --- + | ... + +-- All local. +box.begin() \ +loc:replace{2} \ +loc:replace{3} \ +box.commit() + | --- + | ... + +test_run:cmd('restart server default') + | +async = box.space.async + | --- + | ... +sync = box.space.sync + | --- + | ... +loc = box.space.loc + | --- + | ... +async:select() + | --- + | - - [1] + | - [3] + | - [4] + | - [6] + | - [9] + | ... +sync:select() + | --- + | - - [1] + | - [2] + | - [3] + | ... +loc:select() + | --- + | - - [1] + | - [2] + | - [3] + | ... +async:drop() + | --- + | ... +sync:drop() + | --- + | ... +loc:drop() + | --- + | ... diff --git a/test/replication/gh-5874-qsync-txn-recovery.test.lua b/test/replication/gh-5874-qsync-txn-recovery.test.lua new file mode 100644 index 000000000..f35eb68de --- /dev/null +++ b/test/replication/gh-5874-qsync-txn-recovery.test.lua @@ -0,0 +1,84 @@ +test_run = require('test_run').new() +-- +-- gh-5874: synchronous transactions should be recovered as whole units, not row +-- by row. So as to be able to roll them back when ROLLBACK is recovered +-- afterwards. +-- +old_synchro_quorum = box.cfg.replication_synchro_quorum +old_synchro_timeout = box.cfg.replication_synchro_timeout +box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001} +engine = test_run:get_cfg('engine') +async = box.schema.create_space('async', {engine = engine}) +_ = async:create_index('pk') +sync = box.schema.create_space('sync', {is_sync = true, engine = engine}) +_ = sync:create_index('pk') + +-- The transaction fails, but is written to the log anyway. +box.begin() async:insert{1} sync:insert{1} box.commit() +-- Ok, the previous txn is rolled back. +_ = async:insert{1} +box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000} +_ = sync:insert{1} +-- Try multi-statement sync txn to see how it recovers. +box.begin() sync:insert{2} sync:insert{3} box.commit() + +-- See if NOP multi-statement recovery works fine. +-- +-- Start with NOP. +do_skip = false +_ = async:before_replace(function() \ + if do_skip then \ + return nil \ + end \ +end) +box.begin() \ +do_skip = true \ +async:replace{2} \ +do_skip = false \ +async:replace{3} \ +box.commit() + +-- NOP in the middle. +box.begin() \ +async:replace{4} \ +do_skip = true \ +async:replace{5} \ +do_skip = false \ +async:replace{6} \ +box.commit() + +-- All NOP. +box.begin() \ +do_skip = true \ +async:replace{7} \ +async:replace{8} \ +do_skip = false \ +box.commit() + +-- +-- First row might be for a local space and its LSN won't match TSN. Need to be +-- ok with that. +-- +loc = box.schema.create_space('loc', {is_local = true, engine = engine}) +_ = loc:create_index('pk') +box.begin() \ +loc:replace{1} \ +async:replace{9} \ +box.commit() + +-- All local. +box.begin() \ +loc:replace{2} \ +loc:replace{3} \ +box.commit() + +test_run:cmd('restart server default') +async = box.space.async +sync = box.space.sync +loc = box.space.loc +async:select() +sync:select() +loc:select() +async:drop() +sync:drop() +loc:drop()