gwr_engine/events/
all_of.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! An event that is triggered when all events in the provided set are
4//! triggered.
5
6use std::cell::RefCell;
7use std::future::Future;
8use std::pin::Pin;
9use std::rc::Rc;
10use std::task::{Context, Poll};
11
12use futures::StreamExt;
13use futures::future::{FusedFuture, LocalBoxFuture};
14use futures::stream::FuturesUnordered;
15
16use crate::traits::{BoxFuture, Event};
17use crate::types::Eventable;
18
19pub struct AllOfState<T> {
20    all_of: RefCell<Vec<Eventable<T>>>,
21}
22
23impl<T> AllOfState<T> {
24    #[must_use]
25    pub fn new(all_of: Vec<Eventable<T>>) -> Self {
26        Self {
27            all_of: RefCell::new(all_of),
28        }
29    }
30}
31
32pub struct AllOf<T> {
33    state: Rc<AllOfState<T>>,
34}
35
36impl<T> AllOf<T> {
37    #[must_use]
38    pub fn new(all_of: Vec<Eventable<T>>) -> Self {
39        Self {
40            state: Rc::new(AllOfState::new(all_of)),
41        }
42    }
43}
44
45impl<T> Clone for AllOf<T> {
46    fn clone(&self) -> Self {
47        let all_of = self.state.all_of.borrow();
48        let cloned = all_of.to_vec();
49        Self {
50            state: Rc::new(AllOfState::new(cloned)),
51        }
52    }
53}
54
55impl<T> Event<T> for AllOf<T>
56where
57    T: Default + 'static,
58{
59    fn listen(&self) -> BoxFuture<'static, T> {
60        Box::pin(AllOfFuture {
61            state: self.state.clone(),
62            future: None,
63            done: false,
64        })
65    }
66
67    /// Allow cloning of Boxed elements of vector
68    fn clone_dyn(&self) -> Box<dyn Event<T>> {
69        Box::new(self.clone())
70    }
71}
72
73pub struct AllOfFuture<T>
74where
75    T: Default,
76{
77    state: Rc<AllOfState<T>>,
78    future: Option<LocalBoxFuture<'static, ()>>,
79    done: bool,
80}
81
82impl<T> FusedFuture for AllOfFuture<T>
83where
84    T: Default + 'static,
85{
86    fn is_terminated(&self) -> bool {
87        self.done
88    }
89}
90
91impl<T> Future for AllOfFuture<T>
92where
93    T: Default + 'static,
94{
95    type Output = T;
96
97    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
98        if self.future.is_none() {
99            let events = self.state.all_of.borrow_mut().drain(..).collect();
100            let future = all_of(events);
101            let pinned_future = Box::pin(future);
102            self.future = Some(pinned_future);
103        }
104
105        let future = self.future.as_mut().unwrap();
106        match future.as_mut().poll(cx) {
107            Poll::Ready(_) => {
108                self.done = true;
109                Poll::Ready(T::default())
110            }
111            Poll::Pending => Poll::Pending,
112        }
113    }
114}
115
116async fn all_of<T: Default>(events: Vec<Box<dyn Event<T>>>) {
117    let mut futures = FuturesUnordered::new();
118    for e in &events {
119        futures.push(e.listen());
120    }
121
122    while (futures.next().await).is_some() {
123        // Keep waiting till all are done
124    }
125}