[Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional

Serge Petrenko sergepetrenko at tarantool.org
Tue Mar 30 11:15:43 MSK 2021



30.03.2021 00:51, Vladislav Shpilevoy пишет:
> Good job on the fixes!
>
> See 4 comments below.

Thanks for the review!

>
>> =================================
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index b96eb360b..0f4492fe3 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -497,6 +496,7 @@ struct applier_tx_row {
>>   static uint64_t
>>   applier_wait_register(struct applier *applier, uint64_t row_count)
>>   {
>> +#define ROWS_PER_LOG 100000
> 1. Better avoid in-function macro. This can be done as 'const uint64_t' or
> as a enum in the beginning of the file.

Ok.

>
>>       /*
>>        * Tarantool < 1.7.0: there is no "final join" stage.
>>        * Proceed to "subscribe" and do not finish bootstrap
>> @@ -505,16 +505,23 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>>       if (applier->version_id < version_id(1, 7, 0))
>>           return row_count;
>>
>> +    uint64_t next_log_cnt =
>> +        row_count + ROWS_PER_LOG - row_count % ROWS_PER_LOG;
>>       /*
>>        * Receive final data.
>>        */
>>       while (true) {
>>           struct stailq rows;
>> -        applier_read_tx(applier, &rows, &row_count);
>> +        row_count += applier_read_tx(applier, &rows, TIMEOUT_INFINITY);
>> +        if (row_count >= next_log_cnt) {
>> +            say_info("%.1fM rows received", next_log_cnt / 1e6);
>> +            next_log_cnt += ROWS_PER_LOG;
> 2. What if row_count > ROWS_PER_LOG? Then it would be printed on the
> next transaction immediately again (although I don't know if it is possible
> to have such a big transaction).

First of all, I don't think someone will have a 100k-row-long transaction.
Secondly, even if this is the case, yes, the second info message will be
printed almost immediately after the first one. But is it a problem?
Say, we had row_count = 2 599 999, then we receive a transaction worth
100 001 rows. We'll print 2.6M rows received first, and then 2.7M rows 
received
after the next transaction.

An alternative would be:
```
while (row_count >= next_log_cnt) {
     say_info(...)
     next_log_cnt += ROWS_PER_LOG
}
```
I like this more, actually, so let's change. Thanks for pointing this out!

>
>> +        }
>>           struct xrow_header *first_row =
>>               &stailq_first_entry(&rows, struct applier_tx_row,
>>                           next)->row;
>>           if (first_row->type == IPROTO_OK) {
>> +            /* Current vclock. This is not used now, ignore. */
>>               assert(first_row ==
>>                      &stailq_last_entry(&rows, struct applier_tx_row,
>>                             next)->row);
>> @@ -1234,6 +1229,15 @@ applier_subscribe(struct applier *applier)
>>           trigger_clear(&on_rollback);
>>       });
>>
>> +    /*
>> +     * Tarantool < 1.7.7 does not send periodic heartbeat
>> +     * messages so we can't assume that if we haven't heard
>> +     * from the master for quite a while the connection is
>> +     * broken - the master might just be idle.
>> +     */
>> +    double timeout = applier->version_id < version_id(1, 7, 7) ?
>> +             TIMEOUT_INFINITY : replication_disconnect_timeout();
> 3. What if replication_timeout is changed after first box.cfg{}? It
> seems it won't affect the running appliers now, will it?

Missed that, thanks!

>
>> +
>>       /*
>>        * Process a stream of rows from the binary log.
>>        */
> <...>
>
>> +/** A simpler version of applier_apply_tx() for final join stage. */
>> +static int
>> +apply_final_join_tx(struct stailq *rows)
>> +{
>> +	struct xrow_header *first_row =
>> +		&stailq_first_entry(rows, struct applier_tx_row, next)->row;
>> +	struct xrow_header *last_row =
>> +		&stailq_last_entry(rows, struct applier_tx_row, next)->row;
>> +	int rc = 0;
>> +	/* WAL isn't enabled yet, so follow vclock manually. */
>> +	vclock_follow_xrow(&replicaset.vclock, last_row);
>> +	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
>> +		assert(first_row == last_row);
>> +		rc = apply_synchro_row(first_row);
>> +		goto end;
>> +	}
> 4. You don't really need the 'end' label here:
>
> ====================
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -970,11 +970,9 @@ apply_final_join_tx(struct stailq *rows)
>   	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
>   		assert(first_row == last_row);
>   		rc = apply_synchro_row(first_row);
> -		goto end;
> +	} else {
> +		rc = apply_plain_tx(rows, false, false);
>   	}
> -
> -	rc = apply_plain_tx(rows, false, false);
> -end:
>   	fiber_gc();
>   	return rc;
>   }
Good point, thanks!

==========================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 0f4492fe3..f00ffbd34 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -59,6 +59,13 @@

  STRS(applier_state, applier_STATE);

+enum {
+    /**
+     * How often to log received row count. Used during join and register.
+     */
+    ROWS_PER_LOG = 100000,
+};
+
  static inline void
  applier_set_state(struct applier *applier, enum applier_state state)
  {
@@ -435,7 +442,7 @@ applier_wait_snapshot(struct applier *applier)
          if (iproto_type_is_dml(row.type)) {
              if (apply_snapshot_row(&row) != 0)
                  diag_raise();
-            if (++row_count % 100000 == 0)
+            if (++row_count % ROWS_PER_LOG == 0)
                  say_info("%.1fM rows received", row_count / 1e6);
          } else if (row.type == IPROTO_OK) {
              if (applier->version_id < version_id(1, 7, 0)) {
@@ -496,7 +503,6 @@ struct applier_tx_row {
  static uint64_t
  applier_wait_register(struct applier *applier, uint64_t row_count)
  {
-#define ROWS_PER_LOG 100000
      /*
       * Tarantool < 1.7.0: there is no "final join" stage.
       * Proceed to "subscribe" and do not finish bootstrap
@@ -513,7 +519,7 @@ applier_wait_register(struct applier *applier, 
uint64_t row_count)
      while (true) {
          struct stailq rows;
          row_count += applier_read_tx(applier, &rows, TIMEOUT_INFINITY);
-        if (row_count >= next_log_cnt) {
+        while (row_count >= next_log_cnt) {
              say_info("%.1fM rows received", next_log_cnt / 1e6);
              next_log_cnt += ROWS_PER_LOG;
          }
@@ -532,7 +538,6 @@ applier_wait_register(struct applier *applier, 
uint64_t row_count)
      }

      return row_count;
-#undef ROWS_PER_LOG
  }

  static void
@@ -970,11 +975,9 @@ apply_final_join_tx(struct stailq *rows)
      if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
          assert(first_row == last_row);
          rc = apply_synchro_row(first_row);
-        goto end;
+    } else {
+        rc = apply_plain_tx(rows, false, false);
      }
-
-    rc = apply_plain_tx(rows, false, false);
-end:
      fiber_gc();
      return rc;
  }
@@ -1229,15 +1232,6 @@ applier_subscribe(struct applier *applier)
          trigger_clear(&on_rollback);
      });

-    /*
-     * Tarantool < 1.7.7 does not send periodic heartbeat
-     * messages so we can't assume that if we haven't heard
-     * from the master for quite a while the connection is
-     * broken - the master might just be idle.
-     */
-    double timeout = applier->version_id < version_id(1, 7, 7) ?
-             TIMEOUT_INFINITY : replication_disconnect_timeout();
-
      /*
       * Process a stream of rows from the binary log.
       */
@@ -1250,6 +1244,16 @@ applier_subscribe(struct applier *applier)
              applier_set_state(applier, APPLIER_FOLLOW);
          }

+        /*
+         * Tarantool < 1.7.7 does not send periodic heartbeat
+         * messages so we can't assume that if we haven't heard
+         * from the master for quite a while the connection is
+         * broken - the master might just be idle.
+         */
+        double timeout = applier->version_id < version_id(1, 7, 7) ?
+                 TIMEOUT_INFINITY :
+                 replication_disconnect_timeout();
+
          struct stailq rows;
          applier_read_tx(applier, &rows, timeout);


-- 
Serge Petrenko



More information about the Tarantool-patches mailing list