From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 7FBDC6FC86; Wed, 24 Mar 2021 15:26:53 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 7FBDC6FC86 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1616588813; bh=Je2c16DUHSOtc01qBJLC35/BawAUIWr6FfQPNQNl1FM=; h=To:Date:In-Reply-To:References:Subject:List-Id:List-Unsubscribe: List-Archive:List-Post:List-Help:List-Subscribe:From:Reply-To:Cc: From; b=ruyFP1mnRfcHSNUhf2hq1O2o4GwCOIssbFwcP0XWhyGTjoJEpclssm0h9eQqRbKXi AgGsQNLU8karGbDXgZsHGwWTTYvocjWDwAtefO/hE1iivAXAZsWj2S/6+4XXKwC9vr zkVBwlN1wF7RbhLIbAGbDwPiF1Q/xCg9aVV9D6GU= Received: from smtp36.i.mail.ru (smtp36.i.mail.ru [94.100.177.96]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 3A8F26BD3A for ; Wed, 24 Mar 2021 15:24:26 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 3A8F26BD3A Received: by smtp36.i.mail.ru with esmtpa (envelope-from ) id 1lP2Yj-0004oK-Fm; Wed, 24 Mar 2021 15:24:25 +0300 To: v.shpilevoy@tarantool.org, gorcunov@gmail.com Date: Wed, 24 Mar 2021 15:24:15 +0300 Message-Id: <5cfa8f8e4d733aabd53138dd3ffc6f524ffac743.1616588119.git.sergepetrenko@tarantool.org> X-Mailer: git-send-email 2.24.3 (Apple Git-128) In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD95D6E7CC48CB1F5F1DDD90A25A8FA528D0BFD61B598B81272182A05F538085040D1C3A02FED4DC157692352BC0E1ADA5BD93E1D8ADB1481AA154C137D24FF110B X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7A33E1178EA603666EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F790063749A0D61DF1984B008638F802B75D45FF914D58D5BE9E6BC131B5C99E7648C95C5DD32608FC869F5DC62C64E4EAB3DE6C9822C491E1F65B70A471835C12D1D9774AD6D5ED66289B5259CC434672EE6371117882F4460429724CE54428C33FAD30A8DF7F3B2552694AC26CFBAC0749D213D2E47CDBA5A9658378DA827A17800CE767883B903EA3BAEA9FA2833FD35BB23DF004C90652538430302FCEF25BFAB3454AD6D5ED66289B5278DA827A17800CE7B00C92F7EC6501DCD32BA5DBAC0009BE395957E7521B51C20BC6067A898B09E4090A508E0FED629923F8577A6DFFEA7C70AD0A9583C3E4C8CD04E86FAF290E2D7E9C4E3C761E06A71DD303D21008E29813377AFFFEAFD269A417C69337E82CC2E827F84554CEF5012EF20D2F80756B5F868A13BD56FB6657E2021AF6380DFAD18AA50765F79006372E808ACE2090B5E1C4224003CC836476EC64975D915A344093EC92FD9297F6718AA50765F790063762EFFBA1C158ADAAA7F4EDE966BC389F395957E7521B51C24C7702A67D5C33162DBA43225CD8A89FDD9D78FC36703085C6EABA9B74D0DA47B5C8C57E37DE458B4C7702A67D5C3316FA3894348FB808DB48C21F01D89DB561574AF45C6390F7469DAA53EE0834AAEE X-B7AD71C0: AC4F5C86D027EB782CDD5689AFBDA7A24A6D60772A99906F8E1CD14B953EB46DD39A7868A386C967355D89D7DBCDD132 X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8183A4AFAF3EA6BDC44F3F687384632F7D25F433A0BE06BC465ACE76B1F73133BE9FA3373DCE65B1CBDB1881A6453793CE9C32612AADDFBE061C61BE10805914D3804EBA3D8E7E5B87ABF8C51168CD8EBDB63AF70AF8205D7DCDC48ACC2A39D04F89CDFB48F4795C241BDAD6C7F3747799A X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34CAFBC0A7A4BEEE018BBBA03D57C2D9BEDB42264384E8CDC3BF75B54BC7EEF4FE2BC58D0AADA4D6581D7E09C32AA3244C711D16A57AF76A0113A0E6647CFC2F633E8609A02908F271927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojbL9S8ysBdXjhXhGERY7yLzR0kTkwInbu X-Mailru-Sender: 583F1D7ACE8F49BDD2846D59FC20E9F86C523D0B3819F67E15318BA547B5BD0A57CC1FBD6EC305E6424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BC112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: [Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Cc: tarantool-patches@dev.tarantool.org Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Now applier assembles rows into transactions not only on subscribe stage, but also during final join / register. This was necessary for correct handling of rolled back synchronous transactions in final join stream. Part of #5566 --- src/box/applier.cc | 126 ++++++++++++++++++++++----------------------- 1 file changed, 61 insertions(+), 65 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index d53f13711..9a8b0f0fc 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -292,34 +292,6 @@ apply_row(struct xrow_header *row) return 0; } -static int -apply_final_join_row(struct xrow_header *row) -{ - /* - * Confirms are ignored during join. All the data master - * sends us is valid. - */ - if (iproto_type_is_synchro_request(row->type)) - return 0; - struct txn *txn = txn_begin(); - if (txn == NULL) - return -1; - /* - * Do not wait for confirmation while processing final - * join rows. See apply_snapshot_row(). - */ - txn_set_flags(txn, TXN_FORCE_ASYNC); - if (apply_row(row) != 0) { - txn_rollback(txn); - fiber_gc(); - return -1; - } - if (txn_commit(txn) != 0) - return -1; - fiber_gc(); - return 0; -} - /** * Connect to a remote host and authenticate the client. */ @@ -505,13 +477,26 @@ applier_fetch_snapshot(struct applier *applier) applier_set_state(applier, APPLIER_READY); } +static void +applier_read_tx(struct applier *applier, struct stailq *rows, + uint64_t *row_count); + +static int +apply_final_join_tx(struct stailq *rows); + +/** + * A helper struct to link xrow objects in a list. + */ +struct applier_tx_row { + /* Next transaction row. */ + struct stailq_entry next; + /* xrow_header struct for the current transaction row. */ + struct xrow_header row; +}; + static uint64_t applier_wait_register(struct applier *applier, uint64_t row_count) { - struct ev_io *coio = &applier->io; - struct ibuf *ibuf = &applier->ibuf; - struct xrow_header row; - /* * Tarantool < 1.7.0: there is no "final join" stage. * Proceed to "subscribe" and do not finish bootstrap @@ -524,27 +509,19 @@ applier_wait_register(struct applier *applier, uint64_t row_count) * Receive final data. */ while (true) { - coio_read_xrow(coio, ibuf, &row); - applier->last_row_time = ev_monotonic_now(loop()); - if (iproto_type_is_dml(row.type)) { - vclock_follow_xrow(&replicaset.vclock, &row); - if (apply_final_join_row(&row) != 0) - diag_raise(); - if (++row_count % 100000 == 0) - say_info("%.1fM rows received", row_count / 1e6); - } else if (row.type == IPROTO_OK) { - /* - * Current vclock. This is not used now, - * ignore. - */ - ++row_count; - break; /* end of stream */ - } else if (iproto_type_is_error(row.type)) { - xrow_decode_error_xc(&row); /* rethrow error */ - } else { - tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, - (uint32_t) row.type); + struct stailq rows; + applier_read_tx(applier, &rows, &row_count); + struct xrow_header *first_row = + &stailq_first_entry(&rows, struct applier_tx_row, + next)->row; + if (first_row->type == IPROTO_OK) { + assert(first_row == + &stailq_last_entry(&rows, struct applier_tx_row, + next)->row); + break; } + if (apply_final_join_tx(&rows) != 0) + diag_raise(); } return row_count; @@ -616,16 +593,6 @@ applier_join(struct applier *applier) applier_set_state(applier, APPLIER_READY); } -/** - * A helper struct to link xrow objects in a list. - */ -struct applier_tx_row { - /* Next transaction row. */ - struct stailq_entry next; - /* xrow_header struct for the current transaction row. */ - struct xrow_header row; -}; - static struct applier_tx_row * applier_read_tx_row(struct applier *applier) { @@ -646,8 +613,11 @@ applier_read_tx_row(struct applier *applier) * messages so we can't assume that if we haven't heard * from the master for quite a while the connection is * broken - the master might just be idle. + * Also there are no timeouts during final join and register. */ - if (applier->version_id < version_id(1, 7, 7)) + if (applier->version_id < version_id(1, 7, 7) || + applier->state == APPLIER_FINAL_JOIN || + applier->state == APPLIER_REGISTER) coio_read_xrow(coio, ibuf, row); else coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); @@ -723,7 +693,8 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn) * network input space is reused for the next xrow. */ static void -applier_read_tx(struct applier *applier, struct stailq *rows) +applier_read_tx(struct applier *applier, struct stailq *rows, + uint64_t *row_count) { int64_t tsn = 0; @@ -731,6 +702,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows) do { struct applier_tx_row *tx_row = applier_read_tx_row(applier); tsn = set_next_tx_row(rows, tx_row, tsn); + + if (row_count != NULL && ++*row_count % 100000 == 0) + say_info("%.1fM rows received", *row_count / 1e6); } while (tsn != 0); } @@ -988,6 +962,28 @@ fail: return -1; } +/** A simpler version of applier_apply_tx() for final join stage. */ +static int +apply_final_join_tx(struct stailq *rows) +{ + struct xrow_header *first_row = + &stailq_first_entry(rows, struct applier_tx_row, next)->row; + struct xrow_header *last_row = + &stailq_last_entry(rows, struct applier_tx_row, next)->row; + int rc = 0; + /* WAL isn't enabled yet, so follow vclock manually. */ + vclock_follow_xrow(&replicaset.vclock, last_row); + if (unlikely(iproto_type_is_synchro_request(first_row->type))) { + assert(first_row == last_row); + rc = apply_synchro_row(first_row); + goto end; + } + + rc = apply_plain_tx(rows, false, false); +end: + fiber_gc(); + return rc; +} /** * Apply all rows in the rows queue as a single transaction. @@ -1254,7 +1250,7 @@ applier_subscribe(struct applier *applier) } struct stailq rows; - applier_read_tx(applier, &rows); + applier_read_tx(applier, &rows, NULL); /* * In case of an heartbeat message wake a writer up -- 2.24.3 (Apple Git-128)