gwr_engine/events/
once.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! An event that can only be triggered once
4
5use std::cell::RefCell;
6use std::pin::Pin;
7use std::rc::Rc;
8use std::task::{Context, Poll};
9
10use futures::Future;
11use futures::future::FusedFuture;
12
13use super::waiting::Waiting;
14use crate::sim_error;
15use crate::traits::{BoxFuture, Event};
16use crate::types::SimResult;
17
18pub struct OnceState<T>
19where
20    T: Copy,
21{
22    waiting: Waiting,
23    triggered: RefCell<bool>,
24    result: T,
25}
26
27impl<T> OnceState<T>
28where
29    T: Copy,
30{
31    pub fn new(value: T) -> Self {
32        Self {
33            waiting: Waiting::new(),
34            triggered: RefCell::new(false),
35            result: value,
36        }
37    }
38}
39
40impl Default for OnceState<()> {
41    fn default() -> Self {
42        Self::new(())
43    }
44}
45
46#[derive(Clone)]
47pub struct Once<T>
48where
49    T: Copy,
50{
51    state: Rc<OnceState<T>>,
52}
53
54pub struct OnceFuture<T>
55where
56    T: Copy,
57{
58    state: Rc<OnceState<T>>,
59    done: bool,
60    listener_id: Option<u64>,
61}
62
63impl<T> FusedFuture for OnceFuture<T>
64where
65    T: Copy,
66{
67    fn is_terminated(&self) -> bool {
68        self.done
69    }
70}
71
72impl<T> Once<T>
73where
74    T: Copy,
75{
76    pub fn with_value(value: T) -> Self {
77        Self {
78            state: Rc::new(OnceState::new(value)),
79        }
80    }
81
82    pub fn notify(&self) -> SimResult {
83        if *self.state.triggered.borrow() {
84            return sim_error!("once event already triggered");
85        }
86        *self.state.triggered.borrow_mut() = true;
87        self.state.waiting.wake_all();
88        Ok(())
89    }
90}
91
92impl<T> Once<T>
93where
94    T: Copy + 'static,
95{
96    pub fn new(value: T) -> Self {
97        Self {
98            state: Rc::new(OnceState::new(value)),
99        }
100    }
101}
102
103impl Default for Once<()> {
104    fn default() -> Self {
105        Self::new(())
106    }
107}
108
109impl<T> Event<T> for Once<T>
110where
111    T: Copy + 'static,
112{
113    fn listen(&self) -> BoxFuture<'static, T> {
114        Box::pin(OnceFuture {
115            state: self.state.clone(),
116            done: false,
117            listener_id: None,
118        })
119    }
120
121    /// Allow cloning of Boxed elements of vector
122    fn clone_dyn(&self) -> Box<dyn Event<T>> {
123        Box::new(self.clone())
124    }
125}
126
127impl<T> Future for OnceFuture<T>
128where
129    T: Copy,
130{
131    type Output = T;
132
133    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
134        if *self.state.triggered.borrow() {
135            self.done = true;
136            self.listener_id = None;
137            Poll::Ready(self.state.result)
138        } else {
139            if let Some(listener_id) = self.listener_id.take() {
140                self.state.waiting.remove_listener(listener_id);
141            }
142            self.listener_id = Some(self.state.waiting.register_listener(cx.waker().clone()));
143            Poll::Pending
144        }
145    }
146}
147
148impl<T> Drop for OnceFuture<T>
149where
150    T: Copy,
151{
152    fn drop(&mut self) {
153        if !self.done
154            && let Some(listener_id) = self.listener_id.take()
155        {
156            self.state.waiting.remove_listener(listener_id);
157        }
158    }
159}