[Tarantool-patches] [PATCH v2 4/6] box: rework clear_synchro_queue to commit everything

Serge Petrenko sergepetrenko at tarantool.org
Thu Dec 24 19:12:21 MSK 2020



23.12.2020 20:28, Vladislav Shpilevoy пишет:
> Thanks for the patch!
>
> See 9 comments below, my diff in the end of the email, and in
> the branch as a separate commit.

Thanks for your review & the patch!

I've applied your diff with a couple of changes.
Please find my answers and diff below.

>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index 2d403fc9a..38bf4034e 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -1002,6 +1002,36 @@ box_set_replication_anon(void)
>>   
>>   }
>>   
>> +struct ack_trigger_data {
>> +	bool fired;
> 1. This member can be deleted, if you would call
> trigger_clear(trigger) instead of setting the flag.

True.

>> +	int64_t *target_lsn;
>> +	uint32_t *replica_id;
>> +	int *quorum;
>> +	int *ack_count;
> 2. Most of these members can be stored by value, see my diff.
> Even if the idea in my diff will look bad, still some minor
> details could be taken.

I store quorum by reference so that the trigger gets the new value
once quorumis changed. This would work when the quorum is
increased, at least.
Speaking of other values, I thought it might be useful to change them
on the fly one day, even though this isn't used now.

Anyway, let's just retry when quorum is changed.

>> +	struct fiber *waiter;
>> +};
>> +
>> +struct ack_trigger {
>> +	struct ack_trigger_data data;
>> +	struct trigger trigger;
> 3. You can merge data right into the trigger object.
>
>> +};
>> +
>> +static int ack_trigger_f(struct trigger *trigger, void *event)
> 4. Normally we put return type on a separate line.

Sure. My bad.

>> +{
>> +	struct relay *relay = (struct relay *)event;
>> +	struct ack_trigger_data *data = (struct ack_trigger_data *)trigger->data;
>> +	if (data->fired)
>> +		return 0;
>> +	if (*data->target_lsn <= vclock_get(relay_vclock(relay),
>> +					    *data->replica_id)) {
>> +		++*data->ack_count;
>> +		data->fired = true;
>> +		if (*data->ack_count >= *data->quorum)
>> +			fiber_wakeup(data->waiter);
>> +	}
>> +	return 0;
>> +}
>> @@ -1030,37 +1064,104 @@ box_clear_synchro_queue(bool try_wait)
>>   				break;
>>   			fiber_sleep(0.001);
>>   		}
>> +		/*
>> +		 * Our mission was to clear the limbo from former leader's
>> +		 * transactions. Exit in case someone did that for us.
>> +		 */
>> +		if (txn_limbo_is_empty(&txn_limbo) ||
>> +		    former_leader_id != txn_limbo.owner_id) {
>> +			in_clear_synchro_queue = false;
>> +			return 0;
>> +		}
>>   	}
>>   
>> -	if (!txn_limbo_is_empty(&txn_limbo)) {
>> -		int64_t lsns[VCLOCK_MAX];
>> -		int len = 0;
>> -		const struct vclock  *vclock;
>> -		replicaset_foreach(replica) {
>> -			if (replica->relay != NULL &&
>> -			    relay_get_state(replica->relay) != RELAY_OFF &&
>> -			    !replica->anon) {
>> -				assert(!tt_uuid_is_equal(&INSTANCE_UUID,
>> -							 &replica->uuid));
>> -				vclock = relay_vclock(replica->relay);
>> -				int64_t lsn = vclock_get(vclock,
>> -							 former_leader_id);
>> -				lsns[len++] = lsn;
>> -			}
>> -		}
>> -		lsns[len++] = vclock_get(box_vclock, former_leader_id);
>> -		assert(len < VCLOCK_MAX);
>> +	/*
>> +	 * clear_synchro_queue() is a no-op on the limbo owner, so all the rows
>> +	 * in the limbo must've come through the applier meaning they already
>> +	 * have an lsn assigned, even if their WAL write hasn't finished yet.
>> +	 */
>> +	int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn;
>> +	assert(wait_lsn > 0);
>> +
>> +	struct ack_trigger triggers[VCLOCK_MAX];
>> +
>> +	/* Take this node into account immediately. */
>> +	int ack_count = vclock_get(box_vclock, former_leader_id) >= wait_lsn;
>> +	int trigger_count = 0;
>> +
>> +	replicaset_foreach(replica) {
>> +		if (relay_get_state(replica->relay) != RELAY_FOLLOW ||
>> +		    replica->anon)
>> +			continue;
>> +
>> +		assert(replica->id != REPLICA_ID_NIL);
>> +		assert(!tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid));
>>   
>> -		int64_t confirm_lsn = 0;
>> -		if (len >= replication_synchro_quorum) {
>> -			qsort(lsns, len, sizeof(int64_t), cmp_i64);
>> -			confirm_lsn = lsns[len - replication_synchro_quorum];
>> +		if (vclock_get(relay_vclock(replica->relay),
>> +			       former_leader_id) >= wait_lsn) {
>> +			ack_count++;
>> +			continue;
>>   		}
>> +		int i = trigger_count++;
>> +		triggers[i].data = {
>> +			.fired = false,
>> +			.target_lsn = &wait_lsn,
>> +			.replica_id = &former_leader_id,
>> +			.quorum = &replication_synchro_quorum,
>> +			.ack_count = &ack_count,
>> +			.waiter = fiber(),
>> +		};
>> +		trigger_create(&triggers[i].trigger, ack_trigger_f,
>> +			       &triggers[i].data, NULL);
>> +		relay_on_status_update(replica->relay, &triggers[i].trigger);
>> +	}
>> +
>> +	assert(trigger_count <= VCLOCK_MAX);
>> +
>> +	if (ack_count + trigger_count < replication_synchro_quorum) {
>> +		/* Don't even bother waiting when not enough replicas. */
>> +		say_warn("clear_synchro_queue cannot gather quorum. "
>> +			 "There're only %d replicas (including this one), while"
>> +			 "quorum should be %d.", ack_count + trigger_count,
>> +			 replication_synchro_quorum);
>> +		for (int i = 0; i < trigger_count; i++)
>> +			trigger_clear(&triggers[i].trigger);
>> +		goto end;
> 5. Better wait anyway. Because more replicas may appear soon and confirm
> everything. During the timeout wait. Although it is probably not possible
> if we bind triggers to individual relays. I tried to fix it with a global
> trigger in my diff.

Ok, your variant looks good.

>> +	}
>> +
>> +	if (trigger_count > 0) {
>> +		/* Allow to interrupt the function when it takes too long. */
>> +		bool cancellable = fiber_set_cancellable(true);
>> +		fiber_sleep(replication_synchro_timeout);
>> +		fiber_set_cancellable(cancellable);
> 6. I think a sleep is enough. If a user made the fiber non-cancellable, better
> leave it as is. We only change it to 'false', and only when do some operation
> when a wakeup is not legal, such as WAL write.

Okay.

>> +	}
>> +
>> +	for (int i = 0; i < trigger_count; i++)
>> +		trigger_clear(&triggers[i].trigger);
>> +
>> +	/*
>> +	 * No point to proceed after cancellation even if got the quorum.
>> +	 * Emptying the limbo involves a pair of blocking WAL writes,
>> +	 * making the fiber sleep even longer, which isn't appropriate
>> +	 * when it's cancelled.
>> +	 */
>> +	if (fiber_is_cancelled()) {
>> +		say_info("clear_synchro_queue interrupted by the fiber "
>> +			 "cancellation.");
>> +		goto end;
>> +	}
>>   
>> -		txn_limbo_force_empty(&txn_limbo, confirm_lsn);
>> -		assert(txn_limbo_is_empty(&txn_limbo));
>> +	if (ack_count < replication_synchro_quorum) {
>> +		say_warn("clear_synchro_queue timed out after %.2f "
>> +		         "seconds. Collected %d acks, quorum is %d. ",
>> +			 replication_synchro_timeout, ack_count,
>> +			 replication_synchro_quorum);
>> +		goto end;
> 7. Why don't you return an error? The queue couldn't be cleared, so it
> looks like an error, no?
>
> I added diag_set() in my diff, but didn't change it to return -1 so far.

Ok, let's return an error then.

>>   	}
>>   
>> +	txn_limbo_force_empty(&txn_limbo, wait_lsn);
>> +	assert(txn_limbo_is_empty(&txn_limbo));
>> +end:
>>   	in_clear_synchro_queue = false;
>>   	return 0;
>>   }
>> diff --git a/test/replication/gh-5435-clear-synchro-queue-commit-all.result b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
>> new file mode 100644
>> index 000000000..e806d9d53
>> --- /dev/null
>> +++ b/test/replication/gh-5435-clear-synchro-queue-commit-all.result
> 8. Perhaps better rename to gh-5435-qsync-.... . Because now it is very
> useful that I can run all qsync tests with a small command
> `python test-run.py qsync`.

No problem. done.

My diff's below:


===============================================
diff --git a/src/box/box.cc b/src/box/box.cc
index 90c07c342..22e3057f8 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1035,9 +1035,10 @@ box_quorum_on_ack_f(struct trigger *trigger, void 
*event)

      vclock_follow(&t->vclock, ack->source, new_lsn);
      ++t->ack_count;
-    if (t->ack_count >= t->quorum)
+    if (t->ack_count >= t->quorum) {
          fiber_wakeup(t->waiter);
-    trigger_clear(trigger);
+        trigger_clear(trigger);
+    }
      return 0;
  }

@@ -1153,16 +1154,25 @@ box_clear_synchro_queue(bool try_wait)
      int64_t wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn;
      assert(wait_lsn > 0);

-    if (box_wait_quorum(former_leader_id, wait_lsn,
-                replication_synchro_quorum,
-                replication_synchro_timeout) == 0) {
-        txn_limbo_force_empty(&txn_limbo, wait_lsn);
-        assert(txn_limbo_is_empty(&txn_limbo));
-    } else {
-        diag_log();
+    int quorum = replication_synchro_quorum;
+    int rc = box_wait_quorum(former_leader_id, wait_lsn, quorum,
+                 replication_synchro_timeout);
+    if (rc == 0) {
+        if (quorum < replication_synchro_quorum) {
+            diag_set(ClientError, ER_QUORUM_WAIT, quorum,
+                 "quorum was increased while waiting");
+            rc = -1;
+        } else if (wait_lsn < 
txn_limbo_last_synchro_entry(&txn_limbo)->lsn) {
+            diag_set(ClientError, ER_QUORUM_WAIT, quorum,
+                 "new synchronous transactions appeared");
+            rc = -1;
+        } else {
+            txn_limbo_force_empty(&txn_limbo, wait_lsn);
+            assert(txn_limbo_is_empty(&txn_limbo));
+        }
      }
      in_clear_synchro_queue = false;
-    return 0;
+    return rc;
  }

  void
diff --git a/src/box/raft.c b/src/box/raft.c
index 1942df952..634740570 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -94,8 +94,17 @@ box_raft_update_synchro_queue(struct raft *raft)
       * simply log a warning.
       */
      if (raft->state == RAFT_STATE_LEADER) {
-        if (box_clear_synchro_queue(false) != 0)
-            diag_log();
+        int rc = 0;
+        uint32_t errcode = 0;
+        do {
+            rc = box_clear_synchro_queue(false);
+            if (rc) {
+                struct error *err = diag_last_error(diag_get());
+                errcode = box_error_code(err);
+                diag_log();
+            }
+        } while (rc != 0 && errcode == ER_QUORUM_WAIT &&
+               !fiber_is_cancelled());
      }
  }

diff --git 
a/test/replication/gh-5435-clear-synchro-queue-commit-all.result 
b/test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.result
similarity index 100%
rename from test/replication/gh-5435-clear-synchro-queue-commit-all.result
rename to 
test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.result
diff --git 
a/test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua 
b/test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.test.lua
similarity index 100%
rename from test/replication/gh-5435-clear-synchro-queue-commit-all.test.lua
rename to 
test/replication/gh-5435-qsync-clear-synchro-queue-commit-all.test.lua

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list