[Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher
Serge Petrenko
sergepetrenko at tarantool.org
Wed Dec 23 15:03:59 MSK 2020
21.12.2020 18:31, Serge Petrenko пишет:
>
>
> 18.12.2020 00:43, Vladislav Shpilevoy пишет:
>> Thanks for the patch and for the design!
Thanks for the review!
>>
>> See 3 comments below.
>>
>> On 10.12.2020 21:55, Serge Petrenko via Tarantool-patches wrote:
>>> Add a list of lsn watchers to relay.
>>>
>>> Relay_lsn_watcher is a structure that lives in tx thread and monitors
>>> replica's progress in applying rows coming from a given replica id.
>>>
>>> The watcher fires once relay reports that replica has acked the target lsn
>>> for the given replica id.
>>>
>>> The watcher is owned by some tx fiber, which typically sleeps until the
>>> watcher wakes it up. Besides waking the waiting fiber up, the watcher may
>>> perform some work as dictated by the notify function.
>>>
>>> Prerequisite #5435
>>> ---
>>> src/box/relay.cc | 73 ++++++++++++++++++++++++++++++++++++++++++++++--
>>> src/box/relay.h | 44 +++++++++++++++++++++++++++++
>>> 2 files changed, 114 insertions(+), 3 deletions(-)
>>>
>>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>>> index f342a79dd..4014792a6 100644
>>> --- a/src/box/relay.cc
>>> +++ b/src/box/relay.cc
>>> @@ -394,6 +397,54 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
>>> });
>>> }
>>>
>>> +void
>>> +relay_set_lsn_watcher(struct relay *relay, struct relay_lsn_watcher *watcher)
>>> +{
>>> + rlist_add_tail_entry(&relay->tx.lsn_watchers, watcher, in_list);
>>> +}
>>> +
>>> +void
>>> +relay_lsn_watcher_create(struct relay_lsn_watcher *watcher, uint32_t replica_id,
>>> + int64_t target_lsn, relay_lsn_watcher_f notify,
>>> + relay_lsn_watcher_f destroy, void *data)
>>> +{
>>> + watcher->replica_id = replica_id;
>>> + watcher->target_lsn = target_lsn;
>>> + watcher->notify = notify;
>>> + watcher->destroy = destroy;
>>> + watcher->data = data;
>>> + watcher->waiter = fiber();
>>> +}
>> 1. The notify + destroy + invocation of the watchers looks very similar
>> to what trigger.h API offers. Can you try to implement this as triggers?
>> I mean as a list of struct trigger objects.
>
> Yes, it's possible to implement this as a trigger. It'll be called on
> every
> tx_status_update() invocation then.
P.S. I ended up implementing this watcher as a trigger.
>
>>> +
>>> +/**
>>> + * Destroy the watcher.
>>> + * Wake the waiting fiber up, fire the on destroy callback and remove the
>>> + * watcher from the relay's watcher list.
>>> + */
>>> +static void
>>> +relay_lsn_watcher_destroy(struct relay_lsn_watcher *watcher)
>>> +{
>>> + watcher->destroy(watcher->data);
>>> + fiber_wakeup(watcher->waiter);
>>> + rlist_del_entry(watcher, in_list);
>>> +}
>>> +
>>> +/**
>>> + * Notify the watcher that the replica has advanced to the given vclock.
>>> + * In case target_lsn is hit for watcher's replica_id, fire the notify
>>> + * callback and destroy the watcher.
>>> + */
>>> +static void
>>> +relay_lsn_watcher_notify(struct relay_lsn_watcher *watcher,
>>> + struct vclock *vclock)
>>> +{
>>> + int64_t lsn = vclock_get(vclock, watcher->replica_id);
>>> + if (lsn >= watcher->target_lsn) {
>>> + watcher->notify(watcher->data);
>>> + relay_lsn_watcher_destroy(watcher);
>>> + }
>>> +}
>> 2. This all seems too intricate. Why does the watcher destroys itself?
>> Wouldn't it be easier to let the watcher decide if he wants to die, or
>> change LSN and wait more, or whatever else?
>>
>> I have a feeling that once you will try to switch to struct triggers,
>> it will all come together automatically. Since triggers API is notably
>> hard to misuse.
>
> Sounds good.
>
>>> +
>>> /**
>>> * The message which updated tx thread with a new vclock has returned back
>>> * to the relay.
>>> diff --git a/src/box/relay.h b/src/box/relay.h
>>> index b32e2ea2a..da2e3498a 100644
>>> --- a/src/box/relay.h
>>> +++ b/src/box/relay.h
>>> @@ -134,4 +135,47 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
>>> struct vclock *replica_vclock, uint32_t replica_version_id,
>>> uint32_t replica_id_filter);
>>>
>>> +/**
>>> + * A callback invoked once lsn watcher sees the replica has reached the target
>>> + * lsn for the given replica id.
>>> + */
>>> +typedef void (*relay_lsn_watcher_f)(void *data);
>>> +
>>> +/**
>>> + * A relay watcher which notifies tx via calling a given function once the
>>> + * replica confirms it has received the rows from replica_id up to target_lsn.
>>> + */
>>> +struct relay_lsn_watcher {
>>> + /** A replica_id to monitor rows from. */
>>> + uint32_t replica_id;
>>> + /** The lsn to wait for. */
>>> + int64_t target_lsn;
>>> + /**
>>> + * A callback invoked in the tx thread once the watcher sees that
>>> + * replica has reached the target lsn.
>>> + */
>>> + relay_lsn_watcher_f notify;
>>> + /**
>>> + * A callback fired once the relay thread exits to notify the waiter
>>> + * there's nothing to wait for anymore.
>>> + */
>>> + relay_lsn_watcher_f destroy;
>>> + /** Data passed to \a notify and \a destroy. */
>> 3. Lets better use @a. To be consistent with all the new code (except
>> cases when a whole file uses tons of \a).
>
> Ok, thanks for pointing this out!
>>> + void *data;
>>> + /** A fiber waiting for this watcher to fire. */
>>> + struct fiber *waiter;
>>> + /** A link in relay's watcher list. */
>>> + struct rlist in_list;
>>> +};
>
> --
> Serge Petrenko
--
Serge Petrenko
More information about the Tarantool-patches
mailing list