[Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Mar 30 00:51:03 MSK 2021


Good job on the fixes!

See 4 comments below.

> =================================
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index b96eb360b..0f4492fe3 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -497,6 +496,7 @@ struct applier_tx_row {
>  static uint64_t
>  applier_wait_register(struct applier *applier, uint64_t row_count)
>  {
> +#define ROWS_PER_LOG 100000

1. Better avoid in-function macro. This can be done as 'const uint64_t' or
as a enum in the beginning of the file.

>      /*
>       * Tarantool < 1.7.0: there is no "final join" stage.
>       * Proceed to "subscribe" and do not finish bootstrap
> @@ -505,16 +505,23 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>      if (applier->version_id < version_id(1, 7, 0))
>          return row_count;
> 
> +    uint64_t next_log_cnt =
> +        row_count + ROWS_PER_LOG - row_count % ROWS_PER_LOG;
>      /*
>       * Receive final data.
>       */
>      while (true) {
>          struct stailq rows;
> -        applier_read_tx(applier, &rows, &row_count);
> +        row_count += applier_read_tx(applier, &rows, TIMEOUT_INFINITY);
> +        if (row_count >= next_log_cnt) {
> +            say_info("%.1fM rows received", next_log_cnt / 1e6);
> +            next_log_cnt += ROWS_PER_LOG;

2. What if row_count > ROWS_PER_LOG? Then it would be printed on the
next transaction immediately again (although I don't know if it is possible
to have such a big transaction).

> +        }
>          struct xrow_header *first_row =
>              &stailq_first_entry(&rows, struct applier_tx_row,
>                          next)->row;
>          if (first_row->type == IPROTO_OK) {
> +            /* Current vclock. This is not used now, ignore. */
>              assert(first_row ==
>                     &stailq_last_entry(&rows, struct applier_tx_row,
>                            next)->row);
> @@ -1234,6 +1229,15 @@ applier_subscribe(struct applier *applier)
>          trigger_clear(&on_rollback);
>      });
> 
> +    /*
> +     * Tarantool < 1.7.7 does not send periodic heartbeat
> +     * messages so we can't assume that if we haven't heard
> +     * from the master for quite a while the connection is
> +     * broken - the master might just be idle.
> +     */
> +    double timeout = applier->version_id < version_id(1, 7, 7) ?
> +             TIMEOUT_INFINITY : replication_disconnect_timeout();

3. What if replication_timeout is changed after first box.cfg{}? It
seems it won't affect the running appliers now, will it?

> +
>      /*
>       * Process a stream of rows from the binary log.
>       */
<...>

> +/** A simpler version of applier_apply_tx() for final join stage. */
> +static int
> +apply_final_join_tx(struct stailq *rows)
> +{
> +	struct xrow_header *first_row =
> +		&stailq_first_entry(rows, struct applier_tx_row, next)->row;
> +	struct xrow_header *last_row =
> +		&stailq_last_entry(rows, struct applier_tx_row, next)->row;
> +	int rc = 0;
> +	/* WAL isn't enabled yet, so follow vclock manually. */
> +	vclock_follow_xrow(&replicaset.vclock, last_row);
> +	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
> +		assert(first_row == last_row);
> +		rc = apply_synchro_row(first_row);
> +		goto end;
> +	}

4. You don't really need the 'end' label here:

====================
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -970,11 +970,9 @@ apply_final_join_tx(struct stailq *rows)
 	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
 		assert(first_row == last_row);
 		rc = apply_synchro_row(first_row);
-		goto end;
+	} else {
+		rc = apply_plain_tx(rows, false, false);
 	}
-
-	rc = apply_plain_tx(rows, false, false);
-end:
 	fiber_gc();
 	return rc;
 }


More information about the Tarantool-patches mailing list