gwr_engine/
port.rs

1// Copyright (c) 2024 Graphcore Ltd. All rights reserved.
2
3//! Port
4
5use std::cell::RefCell;
6use std::fmt;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::task::{Context, Poll, Waker};
10
11use futures::Future;
12use futures::future::FusedFuture;
13use gwr_track::connect;
14use gwr_track::entity::Entity;
15
16use crate::sim_error;
17use crate::traits::SimObject;
18use crate::types::{SimError, SimResult};
19
20pub type PortStateResult<T> = Result<Rc<PortState<T>>, SimError>;
21pub type PortGetResult<T> = Result<PortGet<T>, SimError>;
22pub type PortStartGetResult<T> = Result<PortStartGet<T>, SimError>;
23pub type PortPutResult<T> = Result<PortPut<T>, SimError>;
24pub type PortTryPutResult<T> = Result<PortTryPut<T>, SimError>;
25
26pub struct PortState<T>
27where
28    T: SimObject,
29{
30    value: RefCell<Option<T>>,
31    waiting_get: RefCell<Option<Waker>>,
32    waiting_put: RefCell<Option<Waker>>,
33    pub in_port_entity: Rc<Entity>,
34}
35
36impl<T> PortState<T>
37where
38    T: SimObject,
39{
40    fn new(in_port_entity: Rc<Entity>) -> Self {
41        Self {
42            value: RefCell::new(None),
43            waiting_get: RefCell::new(None),
44            waiting_put: RefCell::new(None),
45            in_port_entity,
46        }
47    }
48}
49
50pub struct InPort<T>
51where
52    T: SimObject,
53{
54    pub entity: Rc<Entity>,
55    state: Rc<PortState<T>>,
56    connected: RefCell<bool>,
57}
58
59impl<T> fmt::Display for InPort<T>
60where
61    T: SimObject,
62{
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        self.entity.fmt(f)
65    }
66}
67
68impl<T> InPort<T>
69where
70    T: SimObject,
71{
72    #[must_use]
73    pub fn new(parent: &Rc<Entity>, name: &str) -> Self {
74        let entity = Rc::new(Entity::new(parent, name));
75        Self {
76            entity: entity.clone(),
77            state: Rc::new(PortState::new(entity)),
78            connected: RefCell::new(false),
79        }
80    }
81
82    pub fn state(&self) -> PortStateResult<T> {
83        if *self.connected.borrow() {
84            return sim_error!(format!("{self} already connected"));
85        }
86
87        *self.connected.borrow_mut() = true;
88        Ok(self.state.clone())
89    }
90
91    #[must_use = "Futures do nothing unless you `.await` or otherwise use them"]
92    pub fn get(&self) -> PortGetResult<T> {
93        if !*self.connected.borrow() {
94            return sim_error!(format!("{self} not connected"));
95        }
96
97        Ok(PortGet {
98            state: self.state.clone(),
99            done: false,
100        })
101    }
102
103    /// Must be matched with a `finish_get` to allow the OutPort to continue.
104    #[must_use = "Futures do nothing unless you `.await` or otherwise use them"]
105    pub fn start_get(&self) -> PortStartGetResult<T> {
106        if !*self.connected.borrow() {
107            return sim_error!(format!("{self} not connected"));
108        }
109
110        Ok(PortStartGet {
111            state: self.state.clone(),
112            done: false,
113        })
114    }
115
116    /// Must be matched with a `start_get ` to consume the value.
117    pub fn finish_get(&self) {
118        if let Some(waker) = self.state.waiting_put.borrow_mut().take() {
119            waker.wake();
120        }
121    }
122}
123
124pub struct OutPort<T>
125where
126    T: SimObject,
127{
128    pub entity: Rc<Entity>,
129    state: Option<Rc<PortState<T>>>,
130}
131
132impl<T> fmt::Display for OutPort<T>
133where
134    T: SimObject,
135{
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        self.entity.fmt(f)
138    }
139}
140
141impl<T> OutPort<T>
142where
143    T: SimObject,
144{
145    #[must_use]
146    pub fn new(parent: &Rc<Entity>, name: &str) -> Self {
147        let entity = Rc::new(Entity::new(parent, name));
148        Self {
149            entity,
150            state: None,
151        }
152    }
153
154    pub fn connect(&mut self, port_state: PortStateResult<T>) -> SimResult {
155        let port_state = port_state?;
156
157        connect!(self.entity ; port_state.in_port_entity);
158        match self.state {
159            Some(_) => {
160                return sim_error!(format!("{self} already connected"));
161            }
162            None => {
163                self.state = Some(port_state);
164            }
165        }
166        Ok(())
167    }
168
169    #[must_use = "Futures do nothing unless you `.await` or otherwise use them"]
170    pub fn put(&self, value: T) -> PortPutResult<T> {
171        let state = match self.state.as_ref() {
172            Some(s) => s.clone(),
173            None => return sim_error!(format!("{self} not connected")),
174        };
175        Ok(PortPut {
176            state,
177            value: RefCell::new(Some(value)),
178            done: RefCell::new(false),
179        })
180    }
181
182    #[must_use = "Futures do nothing unless you `.await` or otherwise use them"]
183    pub fn try_put(&self) -> PortTryPutResult<T> {
184        let state = match self.state.as_ref() {
185            Some(s) => s.clone(),
186            None => return sim_error!(format!("{self} not connected")),
187        };
188        Ok(PortTryPut { state, done: false })
189    }
190}
191
192pub struct PortPut<T>
193where
194    T: SimObject,
195{
196    state: Rc<PortState<T>>,
197    value: RefCell<Option<T>>,
198    done: RefCell<bool>,
199}
200
201impl<T> Future for PortPut<T>
202where
203    T: SimObject,
204{
205    type Output = ();
206
207    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
208        match self.value.take() {
209            Some(value) => {
210                // The state is designed to be shared between one put/get pair so it should
211                // not be possible for the value in the state to be set at this point.
212                assert!(self.state.value.borrow().is_none());
213
214                *self.state.value.borrow_mut() = Some(value);
215                if let Some(waker) = self.state.waiting_get.borrow_mut().take() {
216                    waker.wake();
217                }
218                *self.state.waiting_put.borrow_mut() = Some(cx.waker().clone());
219                Poll::Pending
220            }
221            None => {
222                // Value already sent, woken because it has been consumed
223                *self.done.borrow_mut() = true;
224                Poll::Ready(())
225            }
226        }
227    }
228}
229
230impl<T> FusedFuture for PortPut<T>
231where
232    T: SimObject,
233{
234    fn is_terminated(&self) -> bool {
235        *self.done.borrow()
236    }
237}
238
239pub struct PortTryPut<T>
240where
241    T: SimObject,
242{
243    state: Rc<PortState<T>>,
244    done: bool,
245}
246
247impl<T> Future for PortTryPut<T>
248where
249    T: SimObject,
250{
251    type Output = ();
252
253    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
254        if self.state.waiting_get.borrow().is_some() {
255            self.done = true;
256            Poll::Ready(())
257        } else {
258            *self.state.waiting_put.borrow_mut() = Some(cx.waker().clone());
259            Poll::Pending
260        }
261    }
262}
263
264impl<T> FusedFuture for PortTryPut<T>
265where
266    T: SimObject,
267{
268    fn is_terminated(&self) -> bool {
269        self.done
270    }
271}
272
273pub struct PortGet<T>
274where
275    T: SimObject,
276{
277    state: Rc<PortState<T>>,
278    done: bool,
279}
280
281impl<T> Future for PortGet<T>
282where
283    T: SimObject,
284{
285    type Output = T;
286
287    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
288        let value = self.state.value.borrow_mut().take();
289        if let Some(value) = value {
290            self.done = true;
291
292            if let Some(waker) = self.state.waiting_put.borrow_mut().take() {
293                waker.wake();
294            }
295            Poll::Ready(value)
296        } else {
297            if let Some(waker) = self.state.waiting_put.borrow_mut().take() {
298                waker.wake();
299            }
300
301            *self.state.waiting_get.borrow_mut() = Some(cx.waker().clone());
302            Poll::Pending
303        }
304    }
305}
306
307impl<T> FusedFuture for PortGet<T>
308where
309    T: SimObject,
310{
311    fn is_terminated(&self) -> bool {
312        self.done
313    }
314}
315
316pub struct PortStartGet<T>
317where
318    T: SimObject,
319{
320    state: Rc<PortState<T>>,
321    done: bool,
322}
323
324impl<T> Future for PortStartGet<T>
325where
326    T: SimObject,
327{
328    type Output = T;
329
330    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
331        let value = self.state.value.borrow_mut().take();
332        if let Some(value) = value {
333            self.done = true;
334            Poll::Ready(value)
335        } else {
336            *self.state.waiting_get.borrow_mut() = Some(cx.waker().clone());
337            Poll::Pending
338        }
339    }
340}
341
342impl<T> FusedFuture for PortStartGet<T>
343where
344    T: SimObject,
345{
346    fn is_terminated(&self) -> bool {
347        self.done
348    }
349}