gwr_engine/events/
all_of.rs1use std::cell::RefCell;
7use std::future::Future;
8use std::pin::Pin;
9use std::rc::Rc;
10use std::task::{Context, Poll};
11
12use futures::StreamExt;
13use futures::future::{FusedFuture, LocalBoxFuture};
14use futures::stream::FuturesUnordered;
15
16use crate::traits::{BoxFuture, Event};
17use crate::types::Eventable;
18
19pub struct AllOfState<T> {
20 all_of: RefCell<Vec<Eventable<T>>>,
21}
22
23impl<T> AllOfState<T> {
24 #[must_use]
25 pub fn new(all_of: Vec<Eventable<T>>) -> Self {
26 Self {
27 all_of: RefCell::new(all_of),
28 }
29 }
30}
31
32pub struct AllOf<T> {
33 state: Rc<AllOfState<T>>,
34}
35
36impl<T> AllOf<T> {
37 #[must_use]
38 pub fn new(all_of: Vec<Eventable<T>>) -> Self {
39 Self {
40 state: Rc::new(AllOfState::new(all_of)),
41 }
42 }
43}
44
45impl<T> Clone for AllOf<T> {
46 fn clone(&self) -> Self {
47 let all_of = self.state.all_of.borrow();
48 let cloned = all_of.to_vec();
49 Self {
50 state: Rc::new(AllOfState::new(cloned)),
51 }
52 }
53}
54
55impl<T> Event<T> for AllOf<T>
56where
57 T: Default + 'static,
58{
59 fn listen(&self) -> BoxFuture<'static, T> {
60 Box::pin(AllOfFuture {
61 state: self.state.clone(),
62 future: None,
63 done: false,
64 })
65 }
66
67 fn clone_dyn(&self) -> Box<dyn Event<T>> {
69 Box::new(self.clone())
70 }
71}
72
73pub struct AllOfFuture<T>
74where
75 T: Default,
76{
77 state: Rc<AllOfState<T>>,
78 future: Option<LocalBoxFuture<'static, ()>>,
79 done: bool,
80}
81
82impl<T> FusedFuture for AllOfFuture<T>
83where
84 T: Default + 'static,
85{
86 fn is_terminated(&self) -> bool {
87 self.done
88 }
89}
90
91impl<T> Future for AllOfFuture<T>
92where
93 T: Default + 'static,
94{
95 type Output = T;
96
97 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
98 if self.future.is_none() {
99 let events = self.state.all_of.borrow_mut().drain(..).collect();
100 let future = all_of(events);
101 let pinned_future = Box::pin(future);
102 self.future = Some(pinned_future);
103 }
104
105 let future = self.future.as_mut().unwrap();
106 match future.as_mut().poll(cx) {
107 Poll::Ready(_) => {
108 self.done = true;
109 Poll::Ready(T::default())
110 }
111 Poll::Pending => Poll::Pending,
112 }
113 }
114}
115
116async fn all_of<T: Default>(events: Vec<Box<dyn Event<T>>>) {
117 let mut futures = FuturesUnordered::new();
118 for e in &events {
119 futures.push(e.listen());
120 }
121
122 while (futures.next().await).is_some() {
123 }
125}