[Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher

Serge Petrenko sergepetrenko at tarantool.org
Wed Dec 23 15:03:59 MSK 2020



21.12.2020 18:31, Serge Petrenko пишет:
>
>
> 18.12.2020 00:43, Vladislav Shpilevoy пишет:
>> Thanks for the patch and for the design!

Thanks for the review!

>>
>> See 3 comments below.
>>
>> On 10.12.2020 21:55, Serge Petrenko via Tarantool-patches wrote:
>>> Add a list of lsn watchers to relay.
>>>
>>> Relay_lsn_watcher is a structure that lives in tx thread and monitors
>>> replica's progress in applying rows coming from a given replica id.
>>>
>>> The watcher fires once relay reports that replica has acked the target lsn
>>> for the given replica id.
>>>
>>> The watcher is owned by some tx fiber, which typically sleeps until the
>>> watcher wakes it up. Besides waking the waiting fiber up, the watcher may
>>> perform some work as dictated by the notify function.
>>>
>>> Prerequisite #5435
>>> ---
>>>   src/box/relay.cc | 73 ++++++++++++++++++++++++++++++++++++++++++++++--
>>>   src/box/relay.h  | 44 +++++++++++++++++++++++++++++
>>>   2 files changed, 114 insertions(+), 3 deletions(-)
>>>
>>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>>> index f342a79dd..4014792a6 100644
>>> --- a/src/box/relay.cc
>>> +++ b/src/box/relay.cc
>>> @@ -394,6 +397,54 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
>>>   	});
>>>   }
>>>   
>>> +void
>>> +relay_set_lsn_watcher(struct relay *relay, struct relay_lsn_watcher *watcher)
>>> +{
>>> +	rlist_add_tail_entry(&relay->tx.lsn_watchers, watcher, in_list);
>>> +}
>>> +
>>> +void
>>> +relay_lsn_watcher_create(struct relay_lsn_watcher *watcher, uint32_t replica_id,
>>> +			 int64_t target_lsn, relay_lsn_watcher_f notify,
>>> +			 relay_lsn_watcher_f destroy, void *data)
>>> +{
>>> +	watcher->replica_id = replica_id;
>>> +	watcher->target_lsn = target_lsn;
>>> +	watcher->notify = notify;
>>> +	watcher->destroy = destroy;
>>> +	watcher->data = data;
>>> +	watcher->waiter = fiber();
>>> +}
>> 1. The notify + destroy + invocation of the watchers looks very similar
>> to what trigger.h API offers. Can you try to implement this as triggers?
>> I mean as a list of struct trigger objects.
>
> Yes, it's possible  to implement this as a trigger. It'll be called on 
> every
> tx_status_update() invocation then.

P.S. I ended up implementing this watcher as a trigger.

>
>>> +
>>> +/**
>>> + * Destroy the watcher.
>>> + * Wake the waiting fiber up, fire the on destroy callback and remove the
>>> + * watcher from the relay's watcher list.
>>> + */
>>> +static void
>>> +relay_lsn_watcher_destroy(struct relay_lsn_watcher *watcher)
>>> +{
>>> +	watcher->destroy(watcher->data);
>>> +	fiber_wakeup(watcher->waiter);
>>> +	rlist_del_entry(watcher, in_list);
>>> +}
>>> +
>>> +/**
>>> + * Notify the watcher that the replica has advanced to the given vclock.
>>> + * In case target_lsn is hit for watcher's replica_id, fire the notify
>>> + * callback and destroy the watcher.
>>> + */
>>> +static void
>>> +relay_lsn_watcher_notify(struct relay_lsn_watcher *watcher,
>>> +			 struct vclock *vclock)
>>> +{
>>> +	int64_t lsn = vclock_get(vclock, watcher->replica_id);
>>> +	if (lsn >= watcher->target_lsn) {
>>> +		watcher->notify(watcher->data);
>>> +		relay_lsn_watcher_destroy(watcher);
>>> +	}
>>> +}
>> 2. This all seems too intricate. Why does the watcher destroys itself?
>> Wouldn't it be easier to let the watcher decide if he wants to die, or
>> change LSN and wait more, or whatever else?
>>
>> I have a feeling that once you will try to switch to struct triggers,
>> it will all come together automatically. Since triggers API is notably
>> hard to misuse.
>
> Sounds good.
>
>>> +
>>>   /**
>>>    * The message which updated tx thread with a new vclock has returned back
>>>    * to the relay.
>>> diff --git a/src/box/relay.h b/src/box/relay.h
>>> index b32e2ea2a..da2e3498a 100644
>>> --- a/src/box/relay.h
>>> +++ b/src/box/relay.h
>>> @@ -134,4 +135,47 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
>>>   		struct vclock *replica_vclock, uint32_t replica_version_id,
>>>   		uint32_t replica_id_filter);
>>>   
>>> +/**
>>> + * A callback invoked once lsn watcher sees the replica has reached the target
>>> + * lsn for the given replica id.
>>> + */
>>> +typedef void (*relay_lsn_watcher_f)(void *data);
>>> +
>>> +/**
>>> + * A relay watcher which notifies tx via calling a given function once the
>>> + * replica confirms it has received the rows from replica_id up to target_lsn.
>>> + */
>>> +struct relay_lsn_watcher {
>>> +	/** A replica_id to monitor rows from. */
>>> +	uint32_t replica_id;
>>> +	/** The lsn to wait for. */
>>> +	int64_t target_lsn;
>>> +	/**
>>> +	 * A callback invoked in the tx thread once the watcher sees that
>>> +	 * replica has reached the target lsn.
>>> +	 */
>>> +	relay_lsn_watcher_f notify;
>>> +	/**
>>> +	 * A callback fired once the relay thread exits to notify the waiter
>>> +	 * there's nothing to wait for anymore.
>>> +	 */
>>> +	relay_lsn_watcher_f destroy;
>>> +	/** Data passed to \a notify and \a destroy. */
>> 3. Lets better use @a. To be consistent with all the new code (except
>> cases when a whole file uses tons of \a).
>
> Ok, thanks for pointing this out!
>>> +	void *data;
>>> +	/** A fiber waiting for this watcher to fire. */
>>> +	struct fiber *waiter;
>>> +	/** A link in relay's watcher list. */
>>> +	struct rlist in_list;
>>> +};
>
> -- 
> Serge Petrenko

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list