gwr_engine/events/
repeated.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! An event that can be triggered multiple times. The event allows
4//! the notifier to pass a custom result to its listeners on each
5//! notification, using the `notify_result()` method. Alternatively,
6//! the last set result will be provided to the listeners. If no
7//! result has been set, the default value for the result type will
8//! be used.
9
10use std::cell::{Cell, RefCell};
11use std::pin::Pin;
12use std::rc::Rc;
13use std::task::{Context, Poll};
14
15use futures::Future;
16use futures::future::FusedFuture;
17
18use super::waiting::Waiting;
19use crate::traits::{BoxFuture, Event};
20
21pub struct RepeatedState<T>
22where
23    T: Copy,
24{
25    waiting: Waiting,
26    generation: Cell<u64>,
27    result: RefCell<T>,
28}
29
30impl<T> RepeatedState<T>
31where
32    T: Copy,
33{
34    pub fn new(value: T) -> Self {
35        Self {
36            waiting: Waiting::new(),
37            generation: Cell::new(0),
38            result: RefCell::new(value),
39        }
40    }
41}
42
43impl Default for RepeatedState<()> {
44    fn default() -> Self {
45        Self::new(())
46    }
47}
48
49#[derive(Clone)]
50pub struct Repeated<T>
51where
52    T: Copy,
53{
54    state: Rc<RepeatedState<T>>,
55}
56
57pub struct RepeatedFuture<T>
58where
59    T: Copy,
60{
61    state: Rc<RepeatedState<T>>,
62    done: bool,
63    listener_id: Option<u64>,
64    observed_generation: u64,
65}
66
67impl<T> FusedFuture for RepeatedFuture<T>
68where
69    T: Copy,
70{
71    fn is_terminated(&self) -> bool {
72        self.done
73    }
74}
75
76impl<T> Repeated<T>
77where
78    T: Copy,
79{
80    pub fn with_value(value: T) -> Self {
81        Self {
82            state: Rc::new(RepeatedState::new(value)),
83        }
84    }
85
86    pub fn notify(&self) {
87        self.state.generation.set(self.state.generation.get() + 1);
88        self.state.waiting.wake_all();
89    }
90
91    pub fn notify_result(&self, result: T) {
92        *self.state.result.borrow_mut() = result;
93        self.state.generation.set(self.state.generation.get() + 1);
94        self.state.waiting.wake_all();
95    }
96}
97
98impl<T> Repeated<T>
99where
100    T: Copy,
101{
102    pub fn new(value: T) -> Self {
103        Self {
104            state: Rc::new(RepeatedState::new(value)),
105        }
106    }
107}
108
109impl Default for Repeated<()> {
110    fn default() -> Self {
111        Self::new(())
112    }
113}
114
115impl<T> Event<T> for Repeated<T>
116where
117    T: Copy + 'static,
118{
119    fn listen(&self) -> BoxFuture<'static, T> {
120        Box::pin(RepeatedFuture {
121            state: self.state.clone(),
122            done: false,
123            listener_id: None,
124            observed_generation: self.state.generation.get(),
125        })
126    }
127
128    /// Allow cloning of Boxed elements of vector
129    fn clone_dyn(&self) -> Box<dyn Event<T>> {
130        Box::new(self.clone())
131    }
132}
133
134impl<T> Future for RepeatedFuture<T>
135where
136    T: Copy,
137{
138    type Output = T;
139
140    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
141        if self.state.generation.get() > self.observed_generation {
142            self.done = true;
143            self.listener_id = None;
144            Poll::Ready(*self.state.result.borrow())
145        } else {
146            if let Some(listener_id) = self.listener_id.take() {
147                self.state.waiting.remove_listener(listener_id);
148            }
149            self.listener_id = Some(self.state.waiting.register_listener(cx.waker().clone()));
150            Poll::Pending
151        }
152    }
153}
154
155impl<T> Drop for RepeatedFuture<T>
156where
157    T: Copy,
158{
159    fn drop(&mut self) {
160        if !self.done
161            && let Some(listener_id) = self.listener_id.take()
162        {
163            self.state.waiting.remove_listener(listener_id);
164        }
165    }
166}