gwr_engine/events/
any_of.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! An event that is triggered when one event in the provided set is triggered.
4//!
5//! The [AnyOf] will also return the data associated with the event that fired.
6//!
7//! # Example
8//!
9//! Here is a basic example of creating a custom enum and using it to handle
10//! different events in different ways.
11//!
12//! ```rust
13//! # use gwr_engine::engine::Engine;
14//! # use gwr_engine::events::once::Once;
15//! # use gwr_engine::events::any_of::AnyOf;
16//! # use gwr_engine::traits::Event;
17//! #
18//! // Create the `enum` that defines all values returned by the event. It must
19//! // implement `Clone` and `Copy`
20//! #[derive(Clone, Copy)]
21//! enum EventResult {
22//!     TimedOut,
23//!     AllOk,
24//!     // ... other results
25//! }
26//! #
27//! # let mut engine = Engine::default();
28//! #
29//! // Create the events
30//! let timeout = Once::new(EventResult::TimedOut);
31//! let ok = Once::new(EventResult::AllOk);
32//! let anyof = AnyOf::new(vec![Box::new(timeout.clone()), Box::new(ok.clone())]);
33//!
34//! // Spawn a task that will trigger the timeout
35//! # let clock = engine.default_clock();
36//! engine.spawn(async move {
37//!     clock.wait_ticks(10000).await;
38//!     timeout.notify();
39//!     Ok(())
40//! });
41//!
42//! // Spawn a task that will say all is ok
43//! # let clock = engine.default_clock();
44//! engine.spawn(async move {
45//!     clock.wait_ticks(1000).await;
46//!     ok.notify();
47//!     Ok(())
48//! });
49//!
50//! // Handle the events
51//! engine.spawn(async move {
52//!     match anyof.listen().await {
53//!         EventResult::TimedOut => panic!("Timed out"),
54//!         EventResult::AllOk => println!("All Ok"),
55//!         // ...
56//!     }
57//!     Ok(())
58//! });
59//!
60//! # engine.run().unwrap();
61//! ```
62
63use std::cell::RefCell;
64use std::future::Future;
65use std::pin::Pin;
66use std::rc::Rc;
67use std::task::{Context, Poll};
68
69use futures::StreamExt;
70use futures::future::{FusedFuture, LocalBoxFuture};
71use futures::stream::FuturesUnordered;
72
73use crate::traits::{BoxFuture, Event};
74use crate::types::Eventable;
75
76pub struct AnyOfState<T> {
77    any_of: RefCell<Vec<Eventable<T>>>,
78}
79
80impl<T> AnyOfState<T> {
81    #[must_use]
82    pub fn new(any_of: Vec<Eventable<T>>) -> Self {
83        Self {
84            any_of: RefCell::new(any_of),
85        }
86    }
87}
88
89pub struct AnyOf<T> {
90    state: Rc<AnyOfState<T>>,
91}
92
93impl<T> AnyOf<T> {
94    #[must_use]
95    pub fn new(any_of: Vec<Eventable<T>>) -> Self {
96        Self {
97            state: Rc::new(AnyOfState::new(any_of)),
98        }
99    }
100}
101
102impl<T> Clone for AnyOf<T> {
103    fn clone(&self) -> Self {
104        let any_of = self.state.any_of.borrow();
105        let cloned = any_of.to_vec();
106        Self {
107            state: Rc::new(AnyOfState::new(cloned)),
108        }
109    }
110}
111
112impl<T> Event<T> for AnyOf<T>
113where
114    T: 'static,
115{
116    fn listen(&self) -> BoxFuture<'static, T> {
117        Box::pin(AnyOfFuture {
118            state: self.state.clone(),
119            future: None,
120            done: false,
121        })
122    }
123
124    /// Allow cloning of Boxed elements of vector
125    fn clone_dyn(&self) -> Box<dyn Event<T>> {
126        Box::new(self.clone())
127    }
128}
129
130pub struct AnyOfFuture<T> {
131    state: Rc<AnyOfState<T>>,
132    future: Option<LocalBoxFuture<'static, T>>,
133    done: bool,
134}
135
136impl<T> FusedFuture for AnyOfFuture<T>
137where
138    T: 'static,
139{
140    fn is_terminated(&self) -> bool {
141        self.done
142    }
143}
144
145impl<T> Future for AnyOfFuture<T>
146where
147    T: 'static,
148{
149    type Output = T;
150
151    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
152        if self.future.is_none() {
153            let events = self.state.any_of.borrow_mut().drain(..).collect();
154            let future = any_of(events);
155            let pinned_future = Box::pin(future);
156            self.future = Some(pinned_future);
157        }
158
159        let future = self.future.as_mut().unwrap();
160        match future.as_mut().poll(cx) {
161            Poll::Ready(value) => {
162                self.done = true;
163                Poll::Ready(value)
164            }
165            Poll::Pending => Poll::Pending,
166        }
167    }
168}
169
170async fn any_of<T>(events: Vec<Box<dyn Event<T>>>) -> T {
171    let mut futures = FuturesUnordered::new();
172    for e in &events {
173        futures.push(e.listen());
174    }
175
176    futures.next().await.unwrap()
177}