From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id C2A746F3C8; Sat, 27 Mar 2021 22:05:06 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org C2A746F3C8 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1616871906; bh=SGzVb168E22jwcZGj5hdFp1CsL1l6rBd666XKW65BLM=; h=To:Cc:References:Date:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=rbCOuBxJQIQyhiQktfHVHRcjX4aJhl57uim6IEv7SX58w5XM4XkvYspb2SWsDR+BU h5JGNtX6yCq9xb9jPn482o9/kesxOcu5/6mTfcJprj6kUkAh4ivE+UUE4aQcYAQJ1M T96WTev82Au+g9S0phSzveNpLE/JGHgTYHGiNv2U= Received: from smtp36.i.mail.ru (smtp36.i.mail.ru [94.100.177.96]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 5147A6F3C8 for ; Sat, 27 Mar 2021 22:05:04 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 5147A6F3C8 Received: by smtp36.i.mail.ru with esmtpa (envelope-from ) id 1lQEF5-0008WO-Lq; Sat, 27 Mar 2021 22:05:04 +0300 To: Vladislav Shpilevoy , gorcunov@gmail.com Cc: tarantool-patches@dev.tarantool.org References: <5cfa8f8e4d733aabd53138dd3ffc6f524ffac743.1616588119.git.sergepetrenko@tarantool.org> Message-ID: <63555ea6-6f0a-964e-65e2-6213eecb2ea1@tarantool.org> Date: Sat, 27 Mar 2021 22:05:03 +0300 User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.16; rv:78.0) Gecko/20100101 Thunderbird/78.8.1 MIME-Version: 1.0 In-Reply-To: Content-Type: text/plain; charset=utf-8; format=flowed Content-Transfer-Encoding: 8bit Content-Language: en-GB X-7564579A: 78E4E2B564C1792B X-77F55803: 4F1203BC0FB41BD9ED7173E37F4E329498ADEA61F680B110809A4DE3E6FC56EA182A05F5380850406785DCAF4DAEF3349BA14CF5FECC74CA7E4B0537E566F29E259F5DA3CDDEFB89 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7A3589DC202DD7369EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637BB680D3A894950458638F802B75D45FF914D58D5BE9E6BC131B5C99E7648C95C7428A34725AB662D1FB5F30ED0073928366F244A3FCC3AB4A471835C12D1D9774AD6D5ED66289B5259CC434672EE6371117882F4460429724CE54428C33FAD30A8DF7F3B2552694AC26CFBAC0749D213D2E47CDBA5A9658378DA827A17800CE7328B01A8D746D8839FA2833FD35BB23DF004C90652538430302FCEF25BFAB3454AD6D5ED66289B5278DA827A17800CE7B1C0E70C7AF2149CD32BA5DBAC0009BE395957E7521B51C20BC6067A898B09E4090A508E0FED6299176DF2183F8FC7C0A85FC399AC366E1BCD04E86FAF290E2D7E9C4E3C761E06A71DD303D21008E29813377AFFFEAFD269A417C69337E82CC275ECD9A6C639B01B78DA827A17800CE716A28148DC3F776F731C566533BA786AA5CC5B56E945C8DA X-B7AD71C0: AC4F5C86D027EB782CDD5689AFBDA7A2368A440D3B0F6089093C9A16E5BC824AC8B6CDF511875BC4E8F7B195E1C97831CA66B1C43C9E1C69E7DC6BFB44DFC7CA X-C1DE0DAB: 0D63561A33F958A5F4BB9183B31F430FB2C99253928ED47405FD1BB8954CB259D59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA7502E6951B79FF9A3F410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D3454548929AF40B428D9F841F7FB4E1C099EC70C2A11C10B2CD8CD41B2AEF31F49AAFE85DFBAF427931D7E09C32AA3244C368A8B81EEA57A1D5B7C31381F9CAFF4435BF7150578642FFACE5A9C96DEB163 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojhfg4BOnpz0qs3E+j//8Axg== X-Mailru-Sender: 583F1D7ACE8F49BDD2846D59FC20E9F8BB28D5CEB2C5EFC3109C6D07522081F038388D7629360032424AE0EB1F3D1D21E2978F233C3FAE6EE63DB1732555E4A8EE80603BA4A5B0BC112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Serge Petrenko via Tarantool-patches Reply-To: Serge Petrenko Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" 26.03.2021 23:49, Vladislav Shpilevoy пишет: > I appreciate the work you did here! Thanks! > > See 3 comments below. > > On 24.03.2021 13:24, Serge Petrenko wrote: >> Now applier assembles rows into transactions not only on subscribe >> stage, but also during final join / register. >> >> This was necessary for correct handling of rolled back synchronous >> transactions in final join stream. >> >> Part of #5566 >> --- >> src/box/applier.cc | 126 ++++++++++++++++++++++----------------------- >> 1 file changed, 61 insertions(+), 65 deletions(-) >> >> diff --git a/src/box/applier.cc b/src/box/applier.cc >> index d53f13711..9a8b0f0fc 100644 >> --- a/src/box/applier.cc >> +++ b/src/box/applier.cc >> @@ -524,27 +509,19 @@ applier_wait_register(struct applier *applier, uint64_t row_count) >> * Receive final data. >> */ >> while (true) { >> - coio_read_xrow(coio, ibuf, &row); >> - applier->last_row_time = ev_monotonic_now(loop()); >> - if (iproto_type_is_dml(row.type)) { >> - vclock_follow_xrow(&replicaset.vclock, &row); >> - if (apply_final_join_row(&row) != 0) >> - diag_raise(); >> - if (++row_count % 100000 == 0) >> - say_info("%.1fM rows received", row_count / 1e6); >> - } else if (row.type == IPROTO_OK) { >> - /* >> - * Current vclock. This is not used now, >> - * ignore. >> - */ > 1. The comment was helpful, lets keep it. Ok, sure. > >> - ++row_count; >> - break; /* end of stream */ >> - } else if (iproto_type_is_error(row.type)) { >> - xrow_decode_error_xc(&row); /* rethrow error */ >> - } else { >> - tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, >> - (uint32_t) row.type); >> + struct stailq rows; >> + applier_read_tx(applier, &rows, &row_count); >> + struct xrow_header *first_row = >> + &stailq_first_entry(&rows, struct applier_tx_row, >> + next)->row; >> + if (first_row->type == IPROTO_OK) { >> + assert(first_row == >> + &stailq_last_entry(&rows, struct applier_tx_row, >> + next)->row); >> + break; >> } >> + if (apply_final_join_tx(&rows) != 0) >> + diag_raise(); >> } >> >> return row_count; >> @@ -646,8 +613,11 @@ applier_read_tx_row(struct applier *applier) >> * messages so we can't assume that if we haven't heard >> * from the master for quite a while the connection is >> * broken - the master might just be idle. >> + * Also there are no timeouts during final join and register. >> */ >> - if (applier->version_id < version_id(1, 7, 7)) >> + if (applier->version_id < version_id(1, 7, 7) || >> + applier->state == APPLIER_FINAL_JOIN || >> + applier->state == APPLIER_REGISTER) > 2. Maybe it would be better to pass the timeout from the upper level and > always use coio_read_xrow_timeout_xc(). For the mentioned conditions > it would be infinity. Anyway the non-timed version in the end uses > TIMEOUT_INFINITY too (coio_read_ahead). That way it would be less > tricky conditions checks in the generic code. Thanks for the suggestion! Applied. > >> coio_read_xrow(coio, ibuf, row); >> else >> coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); >> @@ -731,6 +702,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows) >> do { >> struct applier_tx_row *tx_row = applier_read_tx_row(applier); >> tsn = set_next_tx_row(rows, tx_row, tsn); >> + >> + if (row_count != NULL && ++*row_count % 100000 == 0) >> + say_info("%.1fM rows received", *row_count / 1e6); > 3. Hm. That adds branching and heavy '%' operation. Maybe you could make it > return number of rows and in the caller code do this check + log. So it > would affect only the joins. Good idea, thanks! Please see an incremental diff below. > >> @@ -1254,7 +1250,7 @@ applier_subscribe(struct applier *applier) >> } >> >> struct stailq rows; >> - applier_read_tx(applier, &rows); >> + applier_read_tx(applier, &rows, NULL); >> >> /* >> * In case of an heartbeat message wake a writer up >> ================================= diff --git a/src/box/applier.cc b/src/box/applier.cc index b96eb360b..0f4492fe3 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -477,9 +477,8 @@ applier_fetch_snapshot(struct applier *applier)      applier_set_state(applier, APPLIER_READY);  } -static void -applier_read_tx(struct applier *applier, struct stailq *rows, -        uint64_t *row_count); +static uint64_t +applier_read_tx(struct applier *applier, struct stailq *rows, double timeout);  static int  apply_final_join_tx(struct stailq *rows); @@ -497,6 +496,7 @@ struct applier_tx_row {  static uint64_t  applier_wait_register(struct applier *applier, uint64_t row_count)  { +#define ROWS_PER_LOG 100000      /*       * Tarantool < 1.7.0: there is no "final join" stage.       * Proceed to "subscribe" and do not finish bootstrap @@ -505,16 +505,23 @@ applier_wait_register(struct applier *applier, uint64_t row_count)      if (applier->version_id < version_id(1, 7, 0))          return row_count; +    uint64_t next_log_cnt = +        row_count + ROWS_PER_LOG - row_count % ROWS_PER_LOG;      /*       * Receive final data.       */      while (true) {          struct stailq rows; -        applier_read_tx(applier, &rows, &row_count); +        row_count += applier_read_tx(applier, &rows, TIMEOUT_INFINITY); +        if (row_count >= next_log_cnt) { +            say_info("%.1fM rows received", next_log_cnt / 1e6); +            next_log_cnt += ROWS_PER_LOG; +        }          struct xrow_header *first_row =              &stailq_first_entry(&rows, struct applier_tx_row,                          next)->row;          if (first_row->type == IPROTO_OK) { +            /* Current vclock. This is not used now, ignore. */              assert(first_row ==                     &stailq_last_entry(&rows, struct applier_tx_row,                            next)->row); @@ -525,6 +532,7 @@ applier_wait_register(struct applier *applier, uint64_t row_count)      }      return row_count; +#undef ROWS_PER_LOG  }  static void @@ -594,7 +602,7 @@ applier_join(struct applier *applier)  }  static struct applier_tx_row * -applier_read_tx_row(struct applier *applier) +applier_read_tx_row(struct applier *applier, double timeout)  {      struct ev_io *coio = &applier->io;      struct ibuf *ibuf = &applier->ibuf; @@ -607,20 +615,7 @@ applier_read_tx_row(struct applier *applier)      struct xrow_header *row = &tx_row->row; -    double timeout = replication_disconnect_timeout(); -    /* -     * Tarantool < 1.7.7 does not send periodic heartbeat -     * messages so we can't assume that if we haven't heard -     * from the master for quite a while the connection is -     * broken - the master might just be idle. -     * Also there are no timeouts during final join and register. -     */ -    if (applier->version_id < version_id(1, 7, 7) || -        applier->state == APPLIER_FINAL_JOIN || -        applier->state == APPLIER_REGISTER) -        coio_read_xrow(coio, ibuf, row); -    else -        coio_read_xrow_timeout_xc(coio, ibuf, row, timeout); +    coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);      applier->lag = ev_now(loop()) - row->tm;      applier->last_row_time = ev_monotonic_now(loop()); @@ -692,20 +687,20 @@ set_next_tx_row(struct stailq *rows, struct applier_tx_row *tx_row, int64_t tsn)   * rpos is adjusted as xrow is decoded and the corresponding   * network input space is reused for the next xrow.   */ -static void -applier_read_tx(struct applier *applier, struct stailq *rows, -        uint64_t *row_count) +static uint64_t +applier_read_tx(struct applier *applier, struct stailq *rows, double timeout)  {      int64_t tsn = 0; +    uint64_t row_count = 0;      stailq_create(rows);      do { -        struct applier_tx_row *tx_row = applier_read_tx_row(applier); +        struct applier_tx_row *tx_row = applier_read_tx_row(applier, +                                    timeout);          tsn = set_next_tx_row(rows, tx_row, tsn); - -        if (row_count != NULL && ++*row_count % 100000 == 0) -            say_info("%.1fM rows received", *row_count / 1e6); +        ++row_count;      } while (tsn != 0); +    return row_count;  }  static void @@ -1234,6 +1229,15 @@ applier_subscribe(struct applier *applier)          trigger_clear(&on_rollback);      }); +    /* +     * Tarantool < 1.7.7 does not send periodic heartbeat +     * messages so we can't assume that if we haven't heard +     * from the master for quite a while the connection is +     * broken - the master might just be idle. +     */ +    double timeout = applier->version_id < version_id(1, 7, 7) ? +             TIMEOUT_INFINITY : replication_disconnect_timeout(); +      /*       * Process a stream of rows from the binary log.       */ @@ -1247,7 +1251,7 @@ applier_subscribe(struct applier *applier)          }          struct stailq rows; -        applier_read_tx(applier, &rows, NULL); +        applier_read_tx(applier, &rows, timeout);          /*           * In case of an heartbeat message wake a writer up -- Serge Petrenko