From: Serge Petrenko <sergepetrenko@tarantool.org> To: v.shpilevoy@tarantool.org, cyrillos@gmail.com Cc: tarantool-patches@dev.tarantool.org Subject: [Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher Date: Thu, 10 Dec 2020 23:55:13 +0300 [thread overview] Message-ID: <449ae5e1cd4b4e22f55ff53a4404717c0229638a.1607633488.git.sergepetrenko@tarantool.org> (raw) In-Reply-To: <cover.1607633488.git.sergepetrenko@tarantool.org> Add a list of lsn watchers to relay. Relay_lsn_watcher is a structure that lives in tx thread and monitors replica's progress in applying rows coming from a given replica id. The watcher fires once relay reports that replica has acked the target lsn for the given replica id. The watcher is owned by some tx fiber, which typically sleeps until the watcher wakes it up. Besides waking the waiting fiber up, the watcher may perform some work as dictated by the notify function. Prerequisite #5435 --- src/box/relay.cc | 73 ++++++++++++++++++++++++++++++++++++++++++++++-- src/box/relay.h | 44 +++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 3 deletions(-) diff --git a/src/box/relay.cc b/src/box/relay.cc index f342a79dd..4014792a6 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -152,6 +152,8 @@ struct relay { * anonymous replica, for example. */ bool is_raft_enabled; + /** A list of lsn watchers registered for this relay. */ + struct rlist lsn_watchers; } tx; }; @@ -200,6 +202,7 @@ relay_new(struct replica *replica) fiber_cond_create(&relay->reader_cond); diag_create(&relay->diag); stailq_create(&relay->pending_gc); + rlist_create(&relay->tx.lsn_watchers); relay->state = RELAY_OFF; return relay; } @@ -394,6 +397,54 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, }); } +void +relay_set_lsn_watcher(struct relay *relay, struct relay_lsn_watcher *watcher) +{ + rlist_add_tail_entry(&relay->tx.lsn_watchers, watcher, in_list); +} + +void +relay_lsn_watcher_create(struct relay_lsn_watcher *watcher, uint32_t replica_id, + int64_t target_lsn, relay_lsn_watcher_f notify, + relay_lsn_watcher_f destroy, void *data) +{ + watcher->replica_id = replica_id; + watcher->target_lsn = target_lsn; + watcher->notify = notify; + watcher->destroy = destroy; + watcher->data = data; + watcher->waiter = fiber(); +} + +/** + * Destroy the watcher. + * Wake the waiting fiber up, fire the on destroy callback and remove the + * watcher from the relay's watcher list. + */ +static void +relay_lsn_watcher_destroy(struct relay_lsn_watcher *watcher) +{ + watcher->destroy(watcher->data); + fiber_wakeup(watcher->waiter); + rlist_del_entry(watcher, in_list); +} + +/** + * Notify the watcher that the replica has advanced to the given vclock. + * In case target_lsn is hit for watcher's replica_id, fire the notify + * callback and destroy the watcher. + */ +static void +relay_lsn_watcher_notify(struct relay_lsn_watcher *watcher, + struct vclock *vclock) +{ + int64_t lsn = vclock_get(vclock, watcher->replica_id); + if (lsn >= watcher->target_lsn) { + watcher->notify(watcher->data); + relay_lsn_watcher_destroy(watcher); + } +} + /** * The message which updated tx thread with a new vclock has returned back * to the relay. @@ -411,7 +462,8 @@ static void tx_status_update(struct cmsg *msg) { struct relay_status_msg *status = (struct relay_status_msg *)msg; - vclock_copy(&status->relay->tx.vclock, &status->vclock); + struct relay *relay = status->relay; + vclock_copy(&relay->tx.vclock, &status->vclock); /* * Let pending synchronous transactions know, which of * them were successfully sent to the replica. Acks are @@ -420,14 +472,18 @@ tx_status_update(struct cmsg *msg) * for master's CONFIRM message instead. */ if (txn_limbo.owner_id == instance_id) { - txn_limbo_ack(&txn_limbo, status->relay->replica->id, + txn_limbo_ack(&txn_limbo, relay->replica->id, vclock_get(&status->vclock, instance_id)); } + struct relay_lsn_watcher *watcher, *tmp; + rlist_foreach_entry_safe(watcher, &relay->tx.lsn_watchers, in_list, tmp) { + relay_lsn_watcher_notify(watcher, &status->vclock); + } static const struct cmsg_hop route[] = { {relay_status_update, NULL} }; cmsg_init(msg, route); - cpipe_push(&status->relay->relay_pipe, msg); + cpipe_push(&relay->relay_pipe, msg); } /** @@ -604,6 +660,17 @@ tx_notify_is_relay_running(struct cmsg *base) /* Never subscribe anonymous replicas to raft updates. */ if (!msg->relay->replica->anon) msg->relay->tx.is_raft_enabled = msg->is_running; + /* + * Notify everyone waiting for a specific row to be replicated through + * this relay there's nothing to wait for anymore. + */ + if (!msg->is_running) { + struct relay_lsn_watcher *watcher, *tmp; + rlist_foreach_entry_safe(watcher, &msg->relay->tx.lsn_watchers, + in_list, tmp) { + relay_lsn_watcher_destroy(watcher); + } + } } /** Relay thread part of the relay is running notification, second hop. */ diff --git a/src/box/relay.h b/src/box/relay.h index b32e2ea2a..da2e3498a 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -32,6 +32,7 @@ */ #include <stdint.h> +#include "small/rlist.h" #if defined(__cplusplus) extern "C" { @@ -134,4 +135,47 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, struct vclock *replica_vclock, uint32_t replica_version_id, uint32_t replica_id_filter); +/** + * A callback invoked once lsn watcher sees the replica has reached the target + * lsn for the given replica id. + */ +typedef void (*relay_lsn_watcher_f)(void *data); + +/** + * A relay watcher which notifies tx via calling a given function once the + * replica confirms it has received the rows from replica_id up to target_lsn. + */ +struct relay_lsn_watcher { + /** A replica_id to monitor rows from. */ + uint32_t replica_id; + /** The lsn to wait for. */ + int64_t target_lsn; + /** + * A callback invoked in the tx thread once the watcher sees that + * replica has reached the target lsn. + */ + relay_lsn_watcher_f notify; + /** + * A callback fired once the relay thread exits to notify the waiter + * there's nothing to wait for anymore. + */ + relay_lsn_watcher_f destroy; + /** Data passed to \a notify and \a destroy. */ + void *data; + /** A fiber waiting for this watcher to fire. */ + struct fiber *waiter; + /** A link in relay's watcher list. */ + struct rlist in_list; +}; + +/** Initialize a pre-allocated lsn watcher. */ +void +relay_lsn_watcher_create(struct relay_lsn_watcher *watcher, uint32_t replica_id, + int64_t target_lsn, relay_lsn_watcher_f notify, + relay_lsn_watcher_f destroy, void *data); + +/** Attach the given lsn watcher to the relay. */ +void +relay_set_lsn_watcher(struct relay *relay, struct relay_lsn_watcher *watcher); + #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */ -- 2.24.3 (Apple Git-128)
next prev parent reply other threads:[~2020-12-10 20:55 UTC|newest] Thread overview: 17+ messages / expand[flat|nested] mbox.gz Atom feed top 2020-12-10 20:55 [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue commit everything Serge Petrenko 2020-12-10 20:55 ` [Tarantool-patches] [PATCH 1/4] box: add a single execution guard to clear_synchro_queue Serge Petrenko 2020-12-17 21:43 ` Vladislav Shpilevoy 2020-12-21 10:18 ` Serge Petrenko 2020-12-21 17:11 ` Vladislav Shpilevoy 2020-12-23 12:01 ` Serge Petrenko 2020-12-10 20:55 ` [Tarantool-patches] [PATCH 2/4] relay: rename is_raft_enabled message to relay_is_running Serge Petrenko 2020-12-17 21:43 ` Vladislav Shpilevoy 2020-12-23 12:01 ` Serge Petrenko 2020-12-10 20:55 ` Serge Petrenko [this message] 2020-12-17 21:43 ` [Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher Vladislav Shpilevoy [not found] ` <4b7f4fc1-6d48-4332-c432-1eeb0b28c016@tarantool.org> 2020-12-23 12:03 ` Serge Petrenko 2020-12-10 20:55 ` [Tarantool-patches] [PATCH 4/4] box: rework clear_synchro_queue to commit everything Serge Petrenko 2020-12-17 21:43 ` Vladislav Shpilevoy 2020-12-23 12:04 ` Serge Petrenko 2020-12-11 7:15 ` [Tarantool-patches] [PATCH 0/4] make clear_synchro_queue " Serge Petrenko 2020-12-11 9:19 ` Serge Petrenko
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=449ae5e1cd4b4e22f55ff53a4404717c0229638a.1607633488.git.sergepetrenko@tarantool.org \ --to=sergepetrenko@tarantool.org \ --cc=cyrillos@gmail.com \ --cc=tarantool-patches@dev.tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH 3/4] relay: introduce relay_lsn_watcher' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox