[Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional

Serge Petrenko sergepetrenko at tarantool.org
Sat Mar 27 22:05:03 MSK 2021



26.03.2021 23:49, Vladislav Shpilevoy пишет:
> I appreciate the work you did here!
Thanks!

>
> See 3 comments below.
>
> On 24.03.2021 13:24, Serge Petrenko wrote:
>> Now applier assembles rows into transactions not only on subscribe
>> stage, but also during final join / register.
>>
>> This was necessary for correct handling of rolled back synchronous
>> transactions in final join stream.
>>
>> Part of #5566
>> ---
>>   src/box/applier.cc | 126 ++++++++++++++++++++++-----------------------
>>   1 file changed, 61 insertions(+), 65 deletions(-)
>>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index d53f13711..9a8b0f0fc 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -524,27 +509,19 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>>   	 * Receive final data.
>>   	 */
>>   	while (true) {
>> -		coio_read_xrow(coio, ibuf, &row);
>> -		applier->last_row_time = ev_monotonic_now(loop());
>> -		if (iproto_type_is_dml(row.type)) {
>> -			vclock_follow_xrow(&replicaset.vclock, &row);
>> -			if (apply_final_join_row(&row) != 0)
>> -				diag_raise();
>> -			if (++row_count % 100000 == 0)
>> -				say_info("%.1fM rows received", row_count / 1e6);
>> -		} else if (row.type == IPROTO_OK) {
>> -			/*
>> -			 * Current vclock. This is not used now,
>> -			 * ignore.
>> -			 */
> 1. The comment was helpful, lets keep it.

Ok, sure.

>
>> -			++row_count;
>> -			break; /* end of stream */
>> -		} else if (iproto_type_is_error(row.type)) {
>> -			xrow_decode_error_xc(&row);  /* rethrow error */
>> -		} else {
>> -			tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE,
>> -				  (uint32_t) row.type);
>> +		struct stailq rows;
>> +		applier_read_tx(applier, &rows, &row_count);
>> +		struct xrow_header *first_row =
>> +			&stailq_first_entry(&rows, struct applier_tx_row,
>> +					    next)->row;
>> +		if (first_row->type == IPROTO_OK) {
>> +			assert(first_row ==
>> +			       &stailq_last_entry(&rows, struct applier_tx_row,
>> +						  next)->row);
>> +			break;
>>   		}
>> +		if (apply_final_join_tx(&rows) != 0)
>> +			diag_raise();
>>   	}
>>   
>>   	return row_count;
>> @@ -646,8 +613,11 @@ applier_read_tx_row(struct applier *applier)
>>   	 * messages so we can't assume that if we haven't heard
>>   	 * from the master for quite a while the connection is
>>   	 * broken - the master might just be idle.
>> +	 * Also there are no timeouts during final join and register.
>>   	 */
>> -	if (applier->version_id < version_id(1, 7, 7))
>> +	if (applier->version_id < version_id(1, 7, 7) ||
>> +	    applier->state == APPLIER_FINAL_JOIN ||
>> +	    applier->state == APPLIER_REGISTER)
> 2. Maybe it would be better to pass the timeout from the upper level and
> always use coio_read_xrow_timeout_xc(). For the mentioned conditions
> it would be infinity. Anyway the non-timed version in the end uses
> TIMEOUT_INFINITY too (coio_read_ahead). That way it would be less
> tricky conditions checks in the generic code.

Thanks for the suggestion! Applied.

>
>>   		coio_read_xrow(coio, ibuf, row);
>>   	else
>>   		coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
>> @@ -731,6 +702,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
>>   	do {
>>   		struct applier_tx_row *tx_row = applier_read_tx_row(applier);
>>   		tsn = set_next_tx_row(rows, tx_row, tsn);
>> +
>> +		if (row_count != NULL && ++*row_count % 100000 == 0)
>> +			say_info("%.1fM rows received", *row_count / 1e6);
> 3. Hm. That adds branching and heavy '%' operation. Maybe you could make it
> return number of rows and in the caller code do this check + log. So it
> would affect only the joins.

Good idea, thanks! Please see an incremental diff below.

>
>> @@ -1254,7 +1250,7 @@ applier_subscribe(struct applier *applier)
>>   		}
>>   
>>   		struct stailq rows;
>> -		applier_read_tx(applier, &rows);
>> +		applier_read_tx(applier, &rows, NULL);
>>   
>>   		/*
>>   		 * In case of an heartbeat message wake a writer up
>>

=================================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index b96eb360b..0f4492fe3 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -477,9 +477,8 @@ applier_fetch_snapshot(struct applier *applier)
      applier_set_state(applier, APPLIER_READY);
  }

-static void
-applier_read_tx(struct applier *applier, struct stailq *rows,
-        uint64_t *row_count);
+static uint64_t
+applier_read_tx(struct applier *applier, struct stailq *rows, double 
timeout);

  static int
  apply_final_join_tx(struct stailq *rows);
@@ -497,6 +496,7 @@ struct applier_tx_row {
  static uint64_t
  applier_wait_register(struct applier *applier, uint64_t row_count)
  {
+#define ROWS_PER_LOG 100000
      /*
       * Tarantool < 1.7.0: there is no "final join" stage.
       * Proceed to "subscribe" and do not finish bootstrap
@@ -505,16 +505,23 @@ applier_wait_register(struct applier *applier, 
uint64_t row_count)
      if (applier->version_id < version_id(1, 7, 0))
          return row_count;

+    uint64_t next_log_cnt =
+        row_count + ROWS_PER_LOG - row_count % ROWS_PER_LOG;
      /*
       * Receive final data.
       */
      while (true) {
          struct stailq rows;
-        applier_read_tx(applier, &rows, &row_count);
+        row_count += applier_read_tx(applier, &rows, TIMEOUT_INFINITY);
+        if (row_count >= next_log_cnt) {
+            say_info("%.1fM rows received", next_log_cnt / 1e6);
+            next_log_cnt += ROWS_PER_LOG;
+        }
          struct xrow_header *first_row =
              &stailq_first_entry(&rows, struct applier_tx_row,
                          next)->row;
          if (first_row->type == IPROTO_OK) {
+            /* Current vclock. This is not used now, ignore. */
              assert(first_row ==
                     &stailq_last_entry(&rows, struct applier_tx_row,
                            next)->row);
@@ -525,6 +532,7 @@ applier_wait_register(struct applier *applier, 
uint64_t row_count)
      }

      return row_count;
+#undef ROWS_PER_LOG
  }

  static void
@@ -594,7 +602,7 @@ applier_join(struct applier *applier)
  }

  static struct applier_tx_row *
-applier_read_tx_row(struct applier *applier)
+applier_read_tx_row(struct applier *applier, double timeout)
  {
      struct ev_io *coio = &applier->io;
      struct ibuf *ibuf = &applier->ibuf;
@@ -607,20 +615,7 @@ applier_read_tx_row(struct applier *applier)

      struct xrow_header *row = &tx_row->row;

-    double timeout = replication_disconnect_timeout();
-    /*
-     * Tarantool < 1.7.7 does not send periodic heartbeat
-     * messages so we can't assume that if we haven't heard
-     * from the master for quite a while the connection is
-     * broken - the master might just be idle.
-     * Also there are no timeouts during final join and register.
-     */
-    if (applier->version_id < version_id(1, 7, 7) ||
-        applier->state == APPLIER_FINAL_JOIN ||
-        applier->state == APPLIER_REGISTER)
-        coio_read_xrow(coio, ibuf, row);
-    else
-        coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);
+    coio_read_xrow_timeout_xc(coio, ibuf, row, timeout);

      applier->lag = ev_now(loop()) - row->tm;
      applier->last_row_time = ev_monotonic_now(loop());
@@ -692,20 +687,20 @@ set_next_tx_row(struct stailq *rows, struct 
applier_tx_row *tx_row, int64_t tsn)
   * rpos is adjusted as xrow is decoded and the corresponding
   * network input space is reused for the next xrow.
   */
-static void
-applier_read_tx(struct applier *applier, struct stailq *rows,
-        uint64_t *row_count)
+static uint64_t
+applier_read_tx(struct applier *applier, struct stailq *rows, double 
timeout)
  {
      int64_t tsn = 0;
+    uint64_t row_count = 0;

      stailq_create(rows);
      do {
-        struct applier_tx_row *tx_row = applier_read_tx_row(applier);
+        struct applier_tx_row *tx_row = applier_read_tx_row(applier,
+                                    timeout);
          tsn = set_next_tx_row(rows, tx_row, tsn);
-
-        if (row_count != NULL && ++*row_count % 100000 == 0)
-            say_info("%.1fM rows received", *row_count / 1e6);
+        ++row_count;
      } while (tsn != 0);
+    return row_count;
  }

  static void
@@ -1234,6 +1229,15 @@ applier_subscribe(struct applier *applier)
          trigger_clear(&on_rollback);
      });

+    /*
+     * Tarantool < 1.7.7 does not send periodic heartbeat
+     * messages so we can't assume that if we haven't heard
+     * from the master for quite a while the connection is
+     * broken - the master might just be idle.
+     */
+    double timeout = applier->version_id < version_id(1, 7, 7) ?
+             TIMEOUT_INFINITY : replication_disconnect_timeout();
+
      /*
       * Process a stream of rows from the binary log.
       */
@@ -1247,7 +1251,7 @@ applier_subscribe(struct applier *applier)
          }

          struct stailq rows;
-        applier_read_tx(applier, &rows, NULL);
+        applier_read_tx(applier, &rows, timeout);

          /*
           * In case of an heartbeat message wake a writer up

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list