From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 0BE2A6EC5D; Mon, 5 Apr 2021 11:36:43 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 0BE2A6EC5D DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1617611803; bh=qs5IiVXuxWr0r5njmh9r5j0BX3cs6hnRfg6LXTV82Cc=; h=To:References:Date:In-Reply-To:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To: From; b=XXtqsU65COGqS8c5CE+EQ/cD4b5LPdt0jngrRbg12eK6HHuqpvhh0LBYQM9mm3eS5 03x8cnirc5s5jepY7WwEj6O1Pfu7nA02EcqMrV9VhgSWbk6THCETyebUgjV0iozzRD AkeBLGWzGiYTCaskvyn1kXN+Hh92MZ+BKTIuU5Rg= Received: from smtp40.i.mail.ru (smtp40.i.mail.ru [94.100.177.100]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 318D96EC5D for ; Mon, 5 Apr 2021 11:36:41 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 318D96EC5D Received: by smtp40.i.mail.ru with esmtpa (envelope-from ) id 1lTKit-0005e6-KV; Mon, 05 Apr 2021 11:36:40 +0300 To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org, gorcunov@gmail.com, korablev@tarantool.org References: <9c9b3077-0579-d57b-c829-242d6fdd2a3c@tarantool.org> <0bce3f47-8305-b7ee-df91-28620c887433@tarantool.org> Message-ID: <54b8df5a-2dc3-96f8-6519-85897c4b0eba@tarantool.org> Date: Mon, 5 Apr 2021 11:36:39 +0300 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:78.0) Gecko/20100101 Thunderbird/78.9.0 MIME-Version: 1.0 In-Reply-To: <0bce3f47-8305-b7ee-df91-28620c887433@tarantool.org> Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit Content-Language: en-GB X-7564579A: 78E4E2B564C1792B X-77F55803: 4F1203BC0FB41BD9ED7173E37F4E32947A0146560F8BA709E798FFD99D1B1662182A05F538085040BEC261C0AE3A56D7BB297152389D9C05C7B263EC2BA82F0EE5457F69FD90249F X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7C27E92EFAD44F80DEA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637174E2957C4CE0F938638F802B75D45FF914D58D5BE9E6BC131B5C99E7648C95C16EE06F5A270FE6AA353FBDEEA71AF8957C4F40D0C4053B8A471835C12D1D9774AD6D5ED66289B5278DA827A17800CE71AE4D56B06699BBC9FA2833FD35BB23D2EF20D2F80756B5F868A13BD56FB6657A471835C12D1D977725E5C173C3A84C3CF36E64A7E3F8E58117882F4460429728AD0CFFFB425014E868A13BD56FB6657D81D268191BDAD3DC09775C1D3CA48CF379F0E44CB43EF0ABA3038C0950A5D36C8A9BA7A39EFB766EC990983EF5C0329BA3038C0950A5D36D5E8D9A59859A8B6AED24E9D12C2BC5E76E601842F6C81A1F004C906525384307823802FF610243DF43C7A68FF6260569E8FC8737B5C2249EC8D19AE6D49635B68655334FD4449CB9ECD01F8117BC8BEAAAE862A0553A3920E30A4C9C8E338DAA65EE29973F47D2F43847C11F186F3C59DAA53EE0834AAEE X-C1DE0DAB: 0D63561A33F958A5E8B46E50CD19473E686CD051DEAC74A3A2BF153FEF1B1791D59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA7502E6951B79FF9A3F410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34AE88D5ADEBE7C983CC1EAC7559D0981FC659668068548DE7EEEC4CEBB22590DC273759494C37C6131D7E09C32AA3244CEAFD23814BDB15D999F7525DF093511BF2F5F14F68F1805B927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojM00ve/f+0onNowLSpl9WTQ== X-Mailru-Sender: 583F1D7ACE8F49BDD2846D59FC20E9F8780E8F176AE90FABE357A83EC0882C3551A3CEA36855C84F424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BC112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH 2/3] recovery: make it transactional X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" 03.04.2021 16:18, Vladislav Shpilevoy пишет: > Hi! Thanks for the review! > >> I ran the tests locally and replication suite fails occasionally with >> ``` >> [009] 2021-04-02 14:28:32.925 [30257] main/103/master box.cc:539 E> error at request: {type: 'INSERT', replica_id: 0, lsn: 7, space_id: 517, index_id: 0, tuple: [21]} >> [009] 2021-04-02 14:28:32.925 [30257] main/103/master box.cc:481 E> XlogError: found a first row in a transaction with LSN/TSN mismatch >> [009] 2021-04-02 14:28:32.925 [30257] main/103/master F> can't initialize storage: found a first row in a transaction with LSN/TSN mismatch >> [009] 2021-04-02 14:28:32.925 [30257] main/103/master F> can't initialize storage: found a first row in a transaction with LSN/TSN mismatch >> ``` > Thanks for noticing! There was a bug with transactions starting > from a local row - its LSN is from vclock[0] while for global > rows it is from vclock[instance_id > 0]. Oh, I see. > > I fixed it by checking LSN == TSN only for the first global row. > If all rows are local, I check LSN matches the first row's LSN. > > Here is the diff. Thanks! LGTM. > > ==================== > diff --git a/src/box/box.cc b/src/box/box.cc > index f70a2bd0e..67b44c053 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -338,12 +338,22 @@ struct wal_stream { > struct xstream base; > /** Current transaction ID. 0 when no transaction. */ > int64_t tsn; > + /** > + * LSN of the first row saved to check TSN and LSN match in case all > + * rows of the tx appeared to be local. > + */ > + int64_t first_row_lsn; > /** > * Flag whether there is a pending yield to do when the current > * transaction is finished. It can't always be done right away because > * would abort the current transaction if it is memtx. > */ > bool has_yield; > + /** > + * True if any row in the transaction was global. Saved to check if TSN > + * matches LSN of a first global row. > + */ > + bool has_global_row; > /** How many rows have been recovered so far. */ > size_t rows; > }; > @@ -484,12 +494,9 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) > diag_set(XlogError, "found a row without TSN"); > goto end_diag_request; > } > - if (row->tsn != row->lsn) { > - diag_set(XlogError, "found a first row in a " > - "transaction with LSN/TSN mismatch"); > - goto end_diag_request; > - } > stream->tsn = row->tsn; > + stream->first_row_lsn = row->lsn; > + stream->has_global_row = false; > /* > * Rows are not stacked into a list like during replication, > * because recovery does not yield while reading the rows. All > @@ -510,6 +517,15 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) > } else { > txn = in_txn(); > } > + /* Ensure TSN is equal to LSN of the first global row. */ > + if (!stream->has_global_row && row->group_id != GROUP_LOCAL) { > + if (row->tsn != row->lsn) { > + diag_set(XlogError, "found a first global row in a " > + "transaction with LSN/TSN mismatch"); > + goto end_diag_request; > + } > + stream->has_global_row = true; > + } > assert(wal_stream_has_tx(stream)); > /* Nops might appear at least after before_replace skipping rows. */ > if (request.type != IPROTO_NOP) { > @@ -526,8 +542,26 @@ wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) > assert(txn != NULL); > if (!row->is_commit) > return 0; > - > + /* > + * For fully local transactions the TSN check won't work like for global > + * transactions, because it is not known if there are global rows until > + * commit arrives. > + */ > + if (!stream->has_global_row && stream->tsn != stream->first_row_lsn) { > + diag_set(XlogError, "fully local transaction's TSN does not " > + "match LSN of the first row"); > + return -1; > + } > stream->tsn = 0; > + /* > + * During local recovery the commit procedure should be async, otherwise > + * the only fiber processing recovery will get stuck on the first > + * synchronous tx it meets until confirm timeout is reached and the tx > + * is rolled back, yielding an error. > + * Moreover, txn_commit_try_async() doesn't hurt at all during local > + * recovery, since journal_write is faked at this stage and returns > + * immediately. > + */ > if (txn_commit_try_async(txn) != 0) { > /* Commit fail automatically leads to rollback. */ > assert(in_txn() == NULL); > @@ -555,20 +589,15 @@ end_diag_request: > static void > wal_stream_try_yield(struct wal_stream *stream) > { > - bool needs_yield = (stream->rows % WAL_ROWS_PER_YIELD == 0); > - if (wal_stream_has_tx(stream)) { > - /* > - * Save the yield. Otherwise it would happen only on rows which > - * are a multiple of WAL_ROWS_PER_YIELD and are last in their > - * transaction, which is probably a very rare coincidence. > - */ > - stream->has_yield = true; > - return; > - } > - if (stream->has_yield) > - stream->has_yield = false; > - else if (!needs_yield) > + /* > + * Save the yield. Otherwise it would happen only on rows which > + * are a multiple of WAL_ROWS_PER_YIELD and are last in their > + * transaction, which is probably a very rare coincidence. > + */ > + stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0); > + if (wal_stream_has_tx(stream) || !stream->has_yield) > return; > + stream->has_yield = false; > fiber_sleep(0); > } > > @@ -605,7 +634,9 @@ wal_stream_create(struct wal_stream *ctx) > { > xstream_create(&ctx->base, wal_stream_apply_row); > ctx->tsn = 0; > + ctx->first_row_lsn = 0; > ctx->has_yield = false; > + ctx->has_global_row = false; > ctx->rows = 0; > } > > diff --git a/test/replication/gh-5874-qsync-txn-recovery.result b/test/replication/gh-5874-qsync-txn-recovery.result > index a64eadd9c..73f903ca7 100644 > --- a/test/replication/gh-5874-qsync-txn-recovery.result > +++ b/test/replication/gh-5874-qsync-txn-recovery.result > @@ -95,6 +95,31 @@ box.commit() > | --- > | ... > > +-- > +-- First row might be for a local space and its LSN won't match TSN. Need to be > +-- ok with that. > +-- > +loc = box.schema.create_space('loc', {is_local = true, engine = engine}) > + | --- > + | ... > +_ = loc:create_index('pk') > + | --- > + | ... > +box.begin() \ > +loc:replace{1} \ > +async:replace{9} \ > +box.commit() > + | --- > + | ... > + > +-- All local. > +box.begin() \ > +loc:replace{2} \ > +loc:replace{3} \ > +box.commit() > + | --- > + | ... > + > test_run:cmd('restart server default') > | > async = box.space.async > @@ -103,12 +128,16 @@ async = box.space.async > sync = box.space.sync > | --- > | ... > +loc = box.space.loc > + | --- > + | ... > async:select() > | --- > | - - [1] > | - [3] > | - [4] > | - [6] > + | - [9] > | ... > sync:select() > | --- > @@ -116,9 +145,18 @@ sync:select() > | - [2] > | - [3] > | ... > +loc:select() > + | --- > + | - - [1] > + | - [2] > + | - [3] > + | ... > async:drop() > | --- > | ... > sync:drop() > | --- > | ... > +loc:drop() > + | --- > + | ... > diff --git a/test/replication/gh-5874-qsync-txn-recovery.test.lua b/test/replication/gh-5874-qsync-txn-recovery.test.lua > index efcf727cc..f35eb68de 100644 > --- a/test/replication/gh-5874-qsync-txn-recovery.test.lua > +++ b/test/replication/gh-5874-qsync-txn-recovery.test.lua > @@ -55,10 +55,30 @@ async:replace{8} > do_skip = false \ > box.commit() > > +-- > +-- First row might be for a local space and its LSN won't match TSN. Need to be > +-- ok with that. > +-- > +loc = box.schema.create_space('loc', {is_local = true, engine = engine}) > +_ = loc:create_index('pk') > +box.begin() \ > +loc:replace{1} \ > +async:replace{9} \ > +box.commit() > + > +-- All local. > +box.begin() \ > +loc:replace{2} \ > +loc:replace{3} \ > +box.commit() > + > test_run:cmd('restart server default') > async = box.space.async > sync = box.space.sync > +loc = box.space.loc > async:select() > sync:select() > +loc:select() > async:drop() > sync:drop() > +loc:drop() > > ==================== > >> Consider this diff. It looks simpler IMO, but feel free to ignore. > Applied your diff, see above. > > The full patch: > > ==================== > diff --git a/changelogs/unreleased/qsync-multi-statement-recovery b/changelogs/unreleased/qsync-multi-statement-recovery > new file mode 100644 > index 000000000..c902cbe24 > --- /dev/null > +++ b/changelogs/unreleased/qsync-multi-statement-recovery > @@ -0,0 +1,5 @@ > +## bugfix/replication > + > +* Fix recovery of a rolled back multi-statement synchronous transaction which > + could lead to the transaction being applied partially, and to recovery errors. > + It happened in case the transaction worked with non-sync spaces (gh-5874). > diff --git a/src/box/box.cc b/src/box/box.cc > index 4da274976..67b44c053 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -334,7 +334,26 @@ box_set_orphan(bool orphan) > } > > struct wal_stream { > + /** Base class. */ > struct xstream base; > + /** Current transaction ID. 0 when no transaction. */ > + int64_t tsn; > + /** > + * LSN of the first row saved to check TSN and LSN match in case all > + * rows of the tx appeared to be local. > + */ > + int64_t first_row_lsn; > + /** > + * Flag whether there is a pending yield to do when the current > + * transaction is finished. It can't always be done right away because > + * would abort the current transaction if it is memtx. > + */ > + bool has_yield; > + /** > + * True if any row in the transaction was global. Saved to check if TSN > + * matches LSN of a first global row. > + */ > + bool has_global_row; > /** How many rows have been recovered so far. */ > size_t rows; > }; > @@ -379,47 +398,245 @@ recovery_journal_create(struct vclock *v) > journal_set(&journal.base); > } > > +/** > + * Drop the stream to the initial state. It is supposed to be done when an error > + * happens. Because in case of force recovery the stream will continue getting > + * tuples. For that it must stay in a valid state and must handle them somehow. > + * > + * Now the stream simply drops the current transaction like it never happened, > + * even if its commit-row wasn't met yet. Should be good enough for > + * force-recovery when the consistency is already out of the game. > + */ > static void > -apply_wal_row(struct xstream *stream, struct xrow_header *row) > +wal_stream_abort(struct wal_stream *stream) > +{ > + struct txn *tx = in_txn(); > + if (tx != NULL) > + txn_rollback(tx); > + stream->tsn = 0; > + fiber_gc(); > +} > + > +/** > + * The wrapper exists only for the debug purposes, to ensure tsn being non-0 is > + * in sync with the fiber's txn being non-NULL. It has nothing to do with the > + * journal content, and therefore can use assertions instead of rigorous error > + * checking even in release. > + */ > +static bool > +wal_stream_has_tx(const struct wal_stream *stream) > +{ > + bool has = stream->tsn != 0; > + assert(has == (in_txn() != NULL)); > + return has; > +} > + > +static int > +wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row) > +{ > + assert(iproto_type_is_synchro_request(row->type)); > + if (wal_stream_has_tx(stream)) { > + diag_set(XlogError, "found synchro request in a transaction"); > + return -1; > + } > + struct synchro_request syn_req; > + if (xrow_decode_synchro(row, &syn_req) != 0) { > + say_error("couldn't decode a synchro request"); > + return -1; > + } > + txn_limbo_process(&txn_limbo, &syn_req); > + return 0; > +} > + > +static int > +wal_stream_apply_raft_row(struct wal_stream *stream, struct xrow_header *row) > +{ > + assert(iproto_type_is_raft_request(row->type)); > + if (wal_stream_has_tx(stream)) { > + diag_set(XlogError, "found raft request in a transaction"); > + return -1; > + } > + struct raft_request raft_req; > + /* Vclock is never persisted in WAL by Raft. */ > + if (xrow_decode_raft(row, &raft_req, NULL) != 0) { > + say_error("couldn't decode a raft request"); > + return -1; > + } > + box_raft_recover(&raft_req); > + return 0; > +} > + > +/** > + * Rows of the same transaction are wrapped into begin/commit. Mostly for the > + * sake of synchronous replication, when the log can contain rolled back > + * transactions, which must be entirely reverted during recovery when ROLLBACK > + * records are met. Row-by-row recovery wouldn't work for multi-statement > + * synchronous transactions. > + */ > +static int > +wal_stream_apply_dml_row(struct wal_stream *stream, struct xrow_header *row) > { > struct request request; > - if (iproto_type_is_synchro_request(row->type)) { > - struct synchro_request syn_req; > - if (xrow_decode_synchro(row, &syn_req) != 0) > - diag_raise(); > - txn_limbo_process(&txn_limbo, &syn_req); > - return; > + uint64_t req_type = dml_request_key_map(row->type); > + if (xrow_decode_dml(row, &request, req_type) != 0) { > + say_error("couldn't decode a DML request"); > + return -1; > } > - if (iproto_type_is_raft_request(row->type)) { > - struct raft_request raft_req; > - /* Vclock is never persisted in WAL by Raft. */ > - if (xrow_decode_raft(row, &raft_req, NULL) != 0) > - diag_raise(); > - box_raft_recover(&raft_req); > - return; > + /* > + * Note that all the information which came from the log is validated > + * and the errors are handled. Not asserted or paniced. That is for the > + * sake of force recovery, which must be able to recover just everything > + * what possible instead of terminating the instance. > + */ > + struct txn *txn; > + if (stream->tsn == 0) { > + if (row->tsn == 0) { > + diag_set(XlogError, "found a row without TSN"); > + goto end_diag_request; > + } > + stream->tsn = row->tsn; > + stream->first_row_lsn = row->lsn; > + stream->has_global_row = false; > + /* > + * Rows are not stacked into a list like during replication, > + * because recovery does not yield while reading the rows. All > + * the yields are controlled by the stream, and therefore no > + * need to wait for all the rows to start a transaction. Can > + * start now, apply the rows, and make a yield after commit if > + * necessary. Helps to avoid a lot of copying. > + */ > + txn = txn_begin(); > + if (txn == NULL) { > + say_error("couldn't begin a recovery transaction"); > + return -1; > + } > + } else if (row->tsn != stream->tsn) { > + diag_set(XlogError, "found a next transaction with the " > + "previous one not yet committed"); > + goto end_diag_request; > + } else { > + txn = in_txn(); > + } > + /* Ensure TSN is equal to LSN of the first global row. */ > + if (!stream->has_global_row && row->group_id != GROUP_LOCAL) { > + if (row->tsn != row->lsn) { > + diag_set(XlogError, "found a first global row in a " > + "transaction with LSN/TSN mismatch"); > + goto end_diag_request; > + } > + stream->has_global_row = true; > } > - xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); > + assert(wal_stream_has_tx(stream)); > + /* Nops might appear at least after before_replace skipping rows. */ > if (request.type != IPROTO_NOP) { > - struct space *space = space_cache_find_xc(request.space_id); > + struct space *space = space_cache_find(request.space_id); > + if (space == NULL) { > + say_error("couldn't find space by ID"); > + goto end_diag_request; > + } > if (box_process_rw(&request, space, NULL) != 0) { > - say_error("error applying row: %s", request_str(&request)); > - diag_raise(); > + say_error("couldn't apply the request"); > + goto end_diag_request; > } > } > - struct wal_stream *xstream = > - container_of(stream, struct wal_stream, base); > - /** > - * Yield once in a while, but not too often, > - * mostly to allow signal handling to take place. > + assert(txn != NULL); > + if (!row->is_commit) > + return 0; > + /* > + * For fully local transactions the TSN check won't work like for global > + * transactions, because it is not known if there are global rows until > + * commit arrives. > */ > - if (++xstream->rows % WAL_ROWS_PER_YIELD == 0) > - fiber_sleep(0); > + if (!stream->has_global_row && stream->tsn != stream->first_row_lsn) { > + diag_set(XlogError, "fully local transaction's TSN does not " > + "match LSN of the first row"); > + return -1; > + } > + stream->tsn = 0; > + /* > + * During local recovery the commit procedure should be async, otherwise > + * the only fiber processing recovery will get stuck on the first > + * synchronous tx it meets until confirm timeout is reached and the tx > + * is rolled back, yielding an error. > + * Moreover, txn_commit_try_async() doesn't hurt at all during local > + * recovery, since journal_write is faked at this stage and returns > + * immediately. > + */ > + if (txn_commit_try_async(txn) != 0) { > + /* Commit fail automatically leads to rollback. */ > + assert(in_txn() == NULL); > + say_error("couldn't commit a recovery transaction"); > + return -1; > + } > + assert(in_txn() == NULL); > + fiber_gc(); > + return 0; > + > +end_diag_request: > + /* > + * The label must be used only for the errors related directly to the > + * request. Errors like txn_begin() fail has nothing to do with it, and > + * therefore don't log the request as the fault reason. > + */ > + say_error("error at request: %s", request_str(&request)); > + return -1; > +} > + > +/** > + * Yield once in a while, but not too often, mostly to allow signal handling to > + * take place. > + */ > +static void > +wal_stream_try_yield(struct wal_stream *stream) > +{ > + /* > + * Save the yield. Otherwise it would happen only on rows which > + * are a multiple of WAL_ROWS_PER_YIELD and are last in their > + * transaction, which is probably a very rare coincidence. > + */ > + stream->has_yield |= (stream->rows % WAL_ROWS_PER_YIELD == 0); > + if (wal_stream_has_tx(stream) || !stream->has_yield) > + return; > + stream->has_yield = false; > + fiber_sleep(0); > +} > + > +static void > +wal_stream_apply_row(struct xstream *base, struct xrow_header *row) > +{ > + struct wal_stream *stream = > + container_of(base, struct wal_stream, base); > + /* > + * Account all rows, even non-DML, and even leading to an error. Because > + * still need to yield sometimes. > + */ > + ++stream->rows; > + if (iproto_type_is_synchro_request(row->type)) { > + if (wal_stream_apply_synchro_row(stream, row) != 0) > + goto end_error; > + } else if (iproto_type_is_raft_request(row->type)) { > + if (wal_stream_apply_raft_row(stream, row) != 0) > + goto end_error; > + } else if (wal_stream_apply_dml_row(stream, row) != 0) { > + goto end_error; > + } > + wal_stream_try_yield(stream); > + return; > + > +end_error: > + wal_stream_abort(stream); > + wal_stream_try_yield(stream); > + diag_raise(); > } > > static void > wal_stream_create(struct wal_stream *ctx) > { > - xstream_create(&ctx->base, apply_wal_row); > + xstream_create(&ctx->base, wal_stream_apply_row); > + ctx->tsn = 0; > + ctx->first_row_lsn = 0; > + ctx->has_yield = false; > + ctx->has_global_row = false; > ctx->rows = 0; > } > > @@ -2797,9 +3014,13 @@ local_recovery(const struct tt_uuid *instance_uuid, > > struct wal_stream wal_stream; > wal_stream_create(&wal_stream); > + auto stream_guard = make_scoped_guard([&]{ > + wal_stream_abort(&wal_stream); > + }); > > struct recovery *recovery; > - recovery = recovery_new(wal_dir(), cfg_geti("force_recovery"), > + bool is_force_recovery = cfg_geti("force_recovery"); > + recovery = recovery_new(wal_dir(), is_force_recovery, > checkpoint_vclock); > > /* > @@ -2861,6 +3082,14 @@ local_recovery(const struct tt_uuid *instance_uuid, > > engine_begin_final_recovery_xc(); > recover_remaining_wals(recovery, &wal_stream.base, NULL, false); > + if (wal_stream_has_tx(&wal_stream)) { > + wal_stream_abort(&wal_stream); > + diag_set(XlogError, "found a not finished transaction " > + "in the log"); > + if (!is_force_recovery) > + diag_raise(); > + diag_log(); > + } > engine_end_recovery_xc(); > /* > * Leave hot standby mode, if any, only after > @@ -2880,6 +3109,14 @@ local_recovery(const struct tt_uuid *instance_uuid, > } > recovery_stop_local(recovery); > recover_remaining_wals(recovery, &wal_stream.base, NULL, true); > + if (wal_stream_has_tx(&wal_stream)) { > + wal_stream_abort(&wal_stream); > + diag_set(XlogError, "found a not finished transaction " > + "in the log in hot standby mode"); > + if (!is_force_recovery) > + diag_raise(); > + diag_log(); > + } > /* > * Advance replica set vclock to reflect records > * applied in hot standby mode. > @@ -2888,6 +3125,7 @@ local_recovery(const struct tt_uuid *instance_uuid, > box_listen(); > box_sync_replication(false); > } > + stream_guard.is_active = false; > recovery_finalize(recovery); > is_local_recovery = false; > > diff --git a/test/replication/gh-5874-qsync-txn-recovery.result b/test/replication/gh-5874-qsync-txn-recovery.result > new file mode 100644 > index 000000000..73f903ca7 > --- /dev/null > +++ b/test/replication/gh-5874-qsync-txn-recovery.result > @@ -0,0 +1,162 @@ > +-- test-run result file version 2 > +test_run = require('test_run').new() > + | --- > + | ... > +-- > +-- gh-5874: synchronous transactions should be recovered as whole units, not row > +-- by row. So as to be able to roll them back when ROLLBACK is recovered > +-- afterwards. > +-- > +old_synchro_quorum = box.cfg.replication_synchro_quorum > + | --- > + | ... > +old_synchro_timeout = box.cfg.replication_synchro_timeout > + | --- > + | ... > +box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001} > + | --- > + | ... > +engine = test_run:get_cfg('engine') > + | --- > + | ... > +async = box.schema.create_space('async', {engine = engine}) > + | --- > + | ... > +_ = async:create_index('pk') > + | --- > + | ... > +sync = box.schema.create_space('sync', {is_sync = true, engine = engine}) > + | --- > + | ... > +_ = sync:create_index('pk') > + | --- > + | ... > + > +-- The transaction fails, but is written to the log anyway. > +box.begin() async:insert{1} sync:insert{1} box.commit() > + | --- > + | - error: Quorum collection for a synchronous transaction is timed out > + | ... > +-- Ok, the previous txn is rolled back. > +_ = async:insert{1} > + | --- > + | ... > +box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000} > + | --- > + | ... > +_ = sync:insert{1} > + | --- > + | ... > +-- Try multi-statement sync txn to see how it recovers. > +box.begin() sync:insert{2} sync:insert{3} box.commit() > + | --- > + | ... > + > +-- See if NOP multi-statement recovery works fine. > +-- > +-- Start with NOP. > +do_skip = false > + | --- > + | ... > +_ = async:before_replace(function() \ > + if do_skip then \ > + return nil \ > + end \ > +end) > + | --- > + | ... > +box.begin() \ > +do_skip = true \ > +async:replace{2} \ > +do_skip = false \ > +async:replace{3} \ > +box.commit() > + | --- > + | ... > + > +-- NOP in the middle. > +box.begin() \ > +async:replace{4} \ > +do_skip = true \ > +async:replace{5} \ > +do_skip = false \ > +async:replace{6} \ > +box.commit() > + | --- > + | ... > + > +-- All NOP. > +box.begin() \ > +do_skip = true \ > +async:replace{7} \ > +async:replace{8} \ > +do_skip = false \ > +box.commit() > + | --- > + | ... > + > +-- > +-- First row might be for a local space and its LSN won't match TSN. Need to be > +-- ok with that. > +-- > +loc = box.schema.create_space('loc', {is_local = true, engine = engine}) > + | --- > + | ... > +_ = loc:create_index('pk') > + | --- > + | ... > +box.begin() \ > +loc:replace{1} \ > +async:replace{9} \ > +box.commit() > + | --- > + | ... > + > +-- All local. > +box.begin() \ > +loc:replace{2} \ > +loc:replace{3} \ > +box.commit() > + | --- > + | ... > + > +test_run:cmd('restart server default') > + | > +async = box.space.async > + | --- > + | ... > +sync = box.space.sync > + | --- > + | ... > +loc = box.space.loc > + | --- > + | ... > +async:select() > + | --- > + | - - [1] > + | - [3] > + | - [4] > + | - [6] > + | - [9] > + | ... > +sync:select() > + | --- > + | - - [1] > + | - [2] > + | - [3] > + | ... > +loc:select() > + | --- > + | - - [1] > + | - [2] > + | - [3] > + | ... > +async:drop() > + | --- > + | ... > +sync:drop() > + | --- > + | ... > +loc:drop() > + | --- > + | ... > diff --git a/test/replication/gh-5874-qsync-txn-recovery.test.lua b/test/replication/gh-5874-qsync-txn-recovery.test.lua > new file mode 100644 > index 000000000..f35eb68de > --- /dev/null > +++ b/test/replication/gh-5874-qsync-txn-recovery.test.lua > @@ -0,0 +1,84 @@ > +test_run = require('test_run').new() > +-- > +-- gh-5874: synchronous transactions should be recovered as whole units, not row > +-- by row. So as to be able to roll them back when ROLLBACK is recovered > +-- afterwards. > +-- > +old_synchro_quorum = box.cfg.replication_synchro_quorum > +old_synchro_timeout = box.cfg.replication_synchro_timeout > +box.cfg{replication_synchro_quorum = 2, replication_synchro_timeout = 0.001} > +engine = test_run:get_cfg('engine') > +async = box.schema.create_space('async', {engine = engine}) > +_ = async:create_index('pk') > +sync = box.schema.create_space('sync', {is_sync = true, engine = engine}) > +_ = sync:create_index('pk') > + > +-- The transaction fails, but is written to the log anyway. > +box.begin() async:insert{1} sync:insert{1} box.commit() > +-- Ok, the previous txn is rolled back. > +_ = async:insert{1} > +box.cfg{replication_synchro_quorum = 1, replication_synchro_timeout = 1000} > +_ = sync:insert{1} > +-- Try multi-statement sync txn to see how it recovers. > +box.begin() sync:insert{2} sync:insert{3} box.commit() > + > +-- See if NOP multi-statement recovery works fine. > +-- > +-- Start with NOP. > +do_skip = false > +_ = async:before_replace(function() \ > + if do_skip then \ > + return nil \ > + end \ > +end) > +box.begin() \ > +do_skip = true \ > +async:replace{2} \ > +do_skip = false \ > +async:replace{3} \ > +box.commit() > + > +-- NOP in the middle. > +box.begin() \ > +async:replace{4} \ > +do_skip = true \ > +async:replace{5} \ > +do_skip = false \ > +async:replace{6} \ > +box.commit() > + > +-- All NOP. > +box.begin() \ > +do_skip = true \ > +async:replace{7} \ > +async:replace{8} \ > +do_skip = false \ > +box.commit() > + > +-- > +-- First row might be for a local space and its LSN won't match TSN. Need to be > +-- ok with that. > +-- > +loc = box.schema.create_space('loc', {is_local = true, engine = engine}) > +_ = loc:create_index('pk') > +box.begin() \ > +loc:replace{1} \ > +async:replace{9} \ > +box.commit() > + > +-- All local. > +box.begin() \ > +loc:replace{2} \ > +loc:replace{3} \ > +box.commit() > + > +test_run:cmd('restart server default') > +async = box.space.async > +sync = box.space.sync > +loc = box.space.loc > +async:select() > +sync:select() > +loc:select() > +async:drop() > +sync:drop() > +loc:drop() -- Serge Petrenko