[Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Tue Mar 30 00:51:03 MSK 2021
Good job on the fixes!
See 4 comments below.
> =================================
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index b96eb360b..0f4492fe3 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -497,6 +496,7 @@ struct applier_tx_row {
> static uint64_t
> applier_wait_register(struct applier *applier, uint64_t row_count)
> {
> +#define ROWS_PER_LOG 100000
1. Better avoid in-function macro. This can be done as 'const uint64_t' or
as a enum in the beginning of the file.
> /*
> * Tarantool < 1.7.0: there is no "final join" stage.
> * Proceed to "subscribe" and do not finish bootstrap
> @@ -505,16 +505,23 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
> if (applier->version_id < version_id(1, 7, 0))
> return row_count;
>
> + uint64_t next_log_cnt =
> + row_count + ROWS_PER_LOG - row_count % ROWS_PER_LOG;
> /*
> * Receive final data.
> */
> while (true) {
> struct stailq rows;
> - applier_read_tx(applier, &rows, &row_count);
> + row_count += applier_read_tx(applier, &rows, TIMEOUT_INFINITY);
> + if (row_count >= next_log_cnt) {
> + say_info("%.1fM rows received", next_log_cnt / 1e6);
> + next_log_cnt += ROWS_PER_LOG;
2. What if row_count > ROWS_PER_LOG? Then it would be printed on the
next transaction immediately again (although I don't know if it is possible
to have such a big transaction).
> + }
> struct xrow_header *first_row =
> &stailq_first_entry(&rows, struct applier_tx_row,
> next)->row;
> if (first_row->type == IPROTO_OK) {
> + /* Current vclock. This is not used now, ignore. */
> assert(first_row ==
> &stailq_last_entry(&rows, struct applier_tx_row,
> next)->row);
> @@ -1234,6 +1229,15 @@ applier_subscribe(struct applier *applier)
> trigger_clear(&on_rollback);
> });
>
> + /*
> + * Tarantool < 1.7.7 does not send periodic heartbeat
> + * messages so we can't assume that if we haven't heard
> + * from the master for quite a while the connection is
> + * broken - the master might just be idle.
> + */
> + double timeout = applier->version_id < version_id(1, 7, 7) ?
> + TIMEOUT_INFINITY : replication_disconnect_timeout();
3. What if replication_timeout is changed after first box.cfg{}? It
seems it won't affect the running appliers now, will it?
> +
> /*
> * Process a stream of rows from the binary log.
> */
<...>
> +/** A simpler version of applier_apply_tx() for final join stage. */
> +static int
> +apply_final_join_tx(struct stailq *rows)
> +{
> + struct xrow_header *first_row =
> + &stailq_first_entry(rows, struct applier_tx_row, next)->row;
> + struct xrow_header *last_row =
> + &stailq_last_entry(rows, struct applier_tx_row, next)->row;
> + int rc = 0;
> + /* WAL isn't enabled yet, so follow vclock manually. */
> + vclock_follow_xrow(&replicaset.vclock, last_row);
> + if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
> + assert(first_row == last_row);
> + rc = apply_synchro_row(first_row);
> + goto end;
> + }
4. You don't really need the 'end' label here:
====================
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -970,11 +970,9 @@ apply_final_join_tx(struct stailq *rows)
if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
assert(first_row == last_row);
rc = apply_synchro_row(first_row);
- goto end;
+ } else {
+ rc = apply_plain_tx(rows, false, false);
}
-
- rc = apply_plain_tx(rows, false, false);
-end:
fiber_gc();
return rc;
}
More information about the Tarantool-patches
mailing list