[Tarantool-patches] [PATCH v2 5/7] applier: make final join transactional
Serge Petrenko
sergepetrenko at tarantool.org
Tue Mar 30 11:15:43 MSK 2021
30.03.2021 00:51, Vladislav Shpilevoy пишет:
> Good job on the fixes!
>
> See 4 comments below.
Thanks for the review!
>
>> =================================
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index b96eb360b..0f4492fe3 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -497,6 +496,7 @@ struct applier_tx_row {
>> static uint64_t
>> applier_wait_register(struct applier *applier, uint64_t row_count)
>> {
>> +#define ROWS_PER_LOG 100000
> 1. Better avoid in-function macro. This can be done as 'const uint64_t' or
> as a enum in the beginning of the file.
Ok.
>
>> /*
>> * Tarantool < 1.7.0: there is no "final join" stage.
>> * Proceed to "subscribe" and do not finish bootstrap
>> @@ -505,16 +505,23 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
>> if (applier->version_id < version_id(1, 7, 0))
>> return row_count;
>>
>> + uint64_t next_log_cnt =
>> + row_count + ROWS_PER_LOG - row_count % ROWS_PER_LOG;
>> /*
>> * Receive final data.
>> */
>> while (true) {
>> struct stailq rows;
>> - applier_read_tx(applier, &rows, &row_count);
>> + row_count += applier_read_tx(applier, &rows, TIMEOUT_INFINITY);
>> + if (row_count >= next_log_cnt) {
>> + say_info("%.1fM rows received", next_log_cnt / 1e6);
>> + next_log_cnt += ROWS_PER_LOG;
> 2. What if row_count > ROWS_PER_LOG? Then it would be printed on the
> next transaction immediately again (although I don't know if it is possible
> to have such a big transaction).
First of all, I don't think someone will have a 100k-row-long transaction.
Secondly, even if this is the case, yes, the second info message will be
printed almost immediately after the first one. But is it a problem?
Say, we had row_count = 2 599 999, then we receive a transaction worth
100 001 rows. We'll print 2.6M rows received first, and then 2.7M rows
received
after the next transaction.
An alternative would be:
```
while (row_count >= next_log_cnt) {
say_info(...)
next_log_cnt += ROWS_PER_LOG
}
```
I like this more, actually, so let's change. Thanks for pointing this out!
>
>> + }
>> struct xrow_header *first_row =
>> &stailq_first_entry(&rows, struct applier_tx_row,
>> next)->row;
>> if (first_row->type == IPROTO_OK) {
>> + /* Current vclock. This is not used now, ignore. */
>> assert(first_row ==
>> &stailq_last_entry(&rows, struct applier_tx_row,
>> next)->row);
>> @@ -1234,6 +1229,15 @@ applier_subscribe(struct applier *applier)
>> trigger_clear(&on_rollback);
>> });
>>
>> + /*
>> + * Tarantool < 1.7.7 does not send periodic heartbeat
>> + * messages so we can't assume that if we haven't heard
>> + * from the master for quite a while the connection is
>> + * broken - the master might just be idle.
>> + */
>> + double timeout = applier->version_id < version_id(1, 7, 7) ?
>> + TIMEOUT_INFINITY : replication_disconnect_timeout();
> 3. What if replication_timeout is changed after first box.cfg{}? It
> seems it won't affect the running appliers now, will it?
Missed that, thanks!
>
>> +
>> /*
>> * Process a stream of rows from the binary log.
>> */
> <...>
>
>> +/** A simpler version of applier_apply_tx() for final join stage. */
>> +static int
>> +apply_final_join_tx(struct stailq *rows)
>> +{
>> + struct xrow_header *first_row =
>> + &stailq_first_entry(rows, struct applier_tx_row, next)->row;
>> + struct xrow_header *last_row =
>> + &stailq_last_entry(rows, struct applier_tx_row, next)->row;
>> + int rc = 0;
>> + /* WAL isn't enabled yet, so follow vclock manually. */
>> + vclock_follow_xrow(&replicaset.vclock, last_row);
>> + if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
>> + assert(first_row == last_row);
>> + rc = apply_synchro_row(first_row);
>> + goto end;
>> + }
> 4. You don't really need the 'end' label here:
>
> ====================
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -970,11 +970,9 @@ apply_final_join_tx(struct stailq *rows)
> if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
> assert(first_row == last_row);
> rc = apply_synchro_row(first_row);
> - goto end;
> + } else {
> + rc = apply_plain_tx(rows, false, false);
> }
> -
> - rc = apply_plain_tx(rows, false, false);
> -end:
> fiber_gc();
> return rc;
> }
Good point, thanks!
==========================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 0f4492fe3..f00ffbd34 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -59,6 +59,13 @@
STRS(applier_state, applier_STATE);
+enum {
+ /**
+ * How often to log received row count. Used during join and register.
+ */
+ ROWS_PER_LOG = 100000,
+};
+
static inline void
applier_set_state(struct applier *applier, enum applier_state state)
{
@@ -435,7 +442,7 @@ applier_wait_snapshot(struct applier *applier)
if (iproto_type_is_dml(row.type)) {
if (apply_snapshot_row(&row) != 0)
diag_raise();
- if (++row_count % 100000 == 0)
+ if (++row_count % ROWS_PER_LOG == 0)
say_info("%.1fM rows received", row_count / 1e6);
} else if (row.type == IPROTO_OK) {
if (applier->version_id < version_id(1, 7, 0)) {
@@ -496,7 +503,6 @@ struct applier_tx_row {
static uint64_t
applier_wait_register(struct applier *applier, uint64_t row_count)
{
-#define ROWS_PER_LOG 100000
/*
* Tarantool < 1.7.0: there is no "final join" stage.
* Proceed to "subscribe" and do not finish bootstrap
@@ -513,7 +519,7 @@ applier_wait_register(struct applier *applier,
uint64_t row_count)
while (true) {
struct stailq rows;
row_count += applier_read_tx(applier, &rows, TIMEOUT_INFINITY);
- if (row_count >= next_log_cnt) {
+ while (row_count >= next_log_cnt) {
say_info("%.1fM rows received", next_log_cnt / 1e6);
next_log_cnt += ROWS_PER_LOG;
}
@@ -532,7 +538,6 @@ applier_wait_register(struct applier *applier,
uint64_t row_count)
}
return row_count;
-#undef ROWS_PER_LOG
}
static void
@@ -970,11 +975,9 @@ apply_final_join_tx(struct stailq *rows)
if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
assert(first_row == last_row);
rc = apply_synchro_row(first_row);
- goto end;
+ } else {
+ rc = apply_plain_tx(rows, false, false);
}
-
- rc = apply_plain_tx(rows, false, false);
-end:
fiber_gc();
return rc;
}
@@ -1229,15 +1232,6 @@ applier_subscribe(struct applier *applier)
trigger_clear(&on_rollback);
});
- /*
- * Tarantool < 1.7.7 does not send periodic heartbeat
- * messages so we can't assume that if we haven't heard
- * from the master for quite a while the connection is
- * broken - the master might just be idle.
- */
- double timeout = applier->version_id < version_id(1, 7, 7) ?
- TIMEOUT_INFINITY : replication_disconnect_timeout();
-
/*
* Process a stream of rows from the binary log.
*/
@@ -1250,6 +1244,16 @@ applier_subscribe(struct applier *applier)
applier_set_state(applier, APPLIER_FOLLOW);
}
+ /*
+ * Tarantool < 1.7.7 does not send periodic heartbeat
+ * messages so we can't assume that if we haven't heard
+ * from the master for quite a while the connection is
+ * broken - the master might just be idle.
+ */
+ double timeout = applier->version_id < version_id(1, 7, 7) ?
+ TIMEOUT_INFINITY :
+ replication_disconnect_timeout();
+
struct stailq rows;
applier_read_tx(applier, &rows, timeout);
--
Serge Petrenko
More information about the Tarantool-patches
mailing list