gwr_engine/port/
mod.rs

1// Copyright (c) 2024 Graphcore Ltd. All rights reserved.
2
3//! Port
4
5use std::cell::RefCell;
6use std::fmt;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::task::{Context, Poll, Waker};
10
11use futures::Future;
12use futures::future::FusedFuture;
13use gwr_track::connect;
14use gwr_track::entity::{Entity, GetEntity};
15use gwr_track::tracker::aka::Aka;
16
17use crate::engine::Engine;
18use crate::port::monitor::Monitor;
19use crate::sim_error;
20use crate::time::clock::Clock;
21use crate::traits::SimObject;
22use crate::types::{SimError, SimResult};
23
24pub mod monitor;
25
26pub type PortStateResult<T> = Result<Rc<PortState<T>>, SimError>;
27pub type PortGetResult<T> = Result<PortGet<T>, SimError>;
28pub type PortStartGetResult<T> = Result<PortStartGet<T>, SimError>;
29pub type PortPutResult<T> = Result<PortPut<T>, SimError>;
30pub type PortTryPutResult<T> = Result<PortTryPut<T>, SimError>;
31
32pub struct PortState<T>
33where
34    T: SimObject,
35{
36    value: RefCell<Option<T>>,
37    waiting_get: RefCell<Option<Waker>>,
38    waiting_put: RefCell<Option<Waker>>,
39    pub in_port_entity: Rc<Entity>,
40    monitor: Option<Rc<Monitor>>,
41}
42
43impl<T> PortState<T>
44where
45    T: SimObject,
46{
47    fn new(
48        engine: &Engine,
49        clock: &Clock,
50        in_port_entity: Rc<Entity>,
51        window_size_ticks: Option<u64>,
52    ) -> Self {
53        let monitor = window_size_ticks.map(|window_size_ticks| {
54            Monitor::new_and_register(engine, &in_port_entity, clock, window_size_ticks)
55        });
56        Self {
57            value: RefCell::new(None),
58            waiting_get: RefCell::new(None),
59            waiting_put: RefCell::new(None),
60            in_port_entity,
61            monitor,
62        }
63    }
64}
65
66pub struct InPort<T>
67where
68    T: SimObject,
69{
70    entity: Rc<Entity>,
71    state: Rc<PortState<T>>,
72    connected: RefCell<bool>,
73}
74
75impl<T> fmt::Display for InPort<T>
76where
77    T: SimObject,
78{
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        self.entity.fmt(f)
81    }
82}
83
84impl<T> InPort<T>
85where
86    T: SimObject,
87{
88    #[must_use]
89    pub fn new(engine: &Engine, clock: &Clock, parent: &Rc<Entity>, name: &str) -> Self {
90        Self::new_with_renames(engine, clock, parent, name, None)
91    }
92
93    #[must_use]
94    pub fn new_with_renames(
95        engine: &Engine,
96        clock: &Clock,
97        parent: &Rc<Entity>,
98        name: &str,
99        aka: Option<&Aka>,
100    ) -> Self {
101        let entity = Rc::new(Entity::new_with_renames(parent, name, aka));
102        let monitor_window_size = entity.tracker.monitoring_window_size_for(entity.id);
103        Self {
104            entity: entity.clone(),
105            state: Rc::new(PortState::new(engine, clock, entity, monitor_window_size)),
106            connected: RefCell::new(false),
107        }
108    }
109
110    pub fn state(&self) -> PortStateResult<T> {
111        if *self.connected.borrow() {
112            return sim_error!(format!("{self} already connected"));
113        }
114
115        *self.connected.borrow_mut() = true;
116        Ok(self.state.clone())
117    }
118
119    #[must_use = "Futures do nothing unless you `.await` or otherwise use them"]
120    pub fn get(&self) -> PortGetResult<T> {
121        if !*self.connected.borrow() {
122            return sim_error!(format!("{self} not connected"));
123        }
124
125        Ok(PortGet {
126            state: self.state.clone(),
127            done: false,
128        })
129    }
130
131    /// Must be matched with a `finish_get` to allow the OutPort to continue.
132    #[must_use = "Futures do nothing unless you `.await` or otherwise use them"]
133    pub fn start_get(&self) -> PortStartGetResult<T> {
134        if !*self.connected.borrow() {
135            return sim_error!(format!("{self} not connected"));
136        }
137
138        Ok(PortStartGet {
139            state: self.state.clone(),
140            done: false,
141        })
142    }
143
144    /// Must be matched with a `start_get ` to consume the value.
145    pub fn finish_get(&self) {
146        if let Some(waker) = self.state.waiting_put.borrow_mut().take() {
147            waker.wake();
148        }
149    }
150}
151
152pub struct OutPort<T>
153where
154    T: SimObject,
155{
156    entity: Rc<Entity>,
157    state: Option<Rc<PortState<T>>>,
158}
159
160impl<T> GetEntity for OutPort<T>
161where
162    T: SimObject,
163{
164    fn entity(&self) -> &Rc<Entity> {
165        &self.entity
166    }
167}
168
169impl<T> fmt::Display for OutPort<T>
170where
171    T: SimObject,
172{
173    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
174        self.entity.fmt(f)
175    }
176}
177
178impl<T> OutPort<T>
179where
180    T: SimObject,
181{
182    #[must_use]
183    pub fn new(parent: &Rc<Entity>, name: &str) -> Self {
184        Self::new_with_renames(parent, name, None)
185    }
186
187    #[must_use]
188    pub fn new_with_renames(parent: &Rc<Entity>, name: &str, aka: Option<&Aka>) -> Self {
189        let entity = Rc::new(Entity::new_with_renames(parent, name, aka));
190        Self {
191            entity,
192            state: None,
193        }
194    }
195
196    pub fn connect(&mut self, port_state: PortStateResult<T>) -> SimResult {
197        let port_state = port_state?;
198
199        connect!(self.entity ; port_state.in_port_entity);
200        match self.state {
201            Some(_) => {
202                return sim_error!(format!("{self} already connected"));
203            }
204            None => {
205                self.state = Some(port_state);
206            }
207        }
208        Ok(())
209    }
210
211    #[must_use = "Futures do nothing unless you `.await` or otherwise use them"]
212    pub fn put(&self, value: T) -> PortPutResult<T> {
213        let state = match self.state.as_ref() {
214            Some(s) => s.clone(),
215            None => return sim_error!(format!("{self} not connected")),
216        };
217        Ok(PortPut {
218            state,
219            value: RefCell::new(Some(value)),
220            done: RefCell::new(false),
221        })
222    }
223
224    #[must_use = "Futures do nothing unless you `.await` or otherwise use them"]
225    pub fn try_put(&self) -> PortTryPutResult<T> {
226        let state = match self.state.as_ref() {
227            Some(s) => s.clone(),
228            None => return sim_error!(format!("{self} not connected")),
229        };
230        Ok(PortTryPut { state, done: false })
231    }
232}
233
234pub struct PortPut<T>
235where
236    T: SimObject,
237{
238    state: Rc<PortState<T>>,
239    value: RefCell<Option<T>>,
240    done: RefCell<bool>,
241}
242
243impl<T> Future for PortPut<T>
244where
245    T: SimObject,
246{
247    type Output = ();
248
249    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
250        match self.value.take() {
251            Some(value) => {
252                // The state is designed to be shared between one put/get pair so it should
253                // not be possible for the value in the state to be set at this point.
254                assert!(self.state.value.borrow().is_none());
255
256                *self.state.value.borrow_mut() = Some(value);
257                if let Some(waker) = self.state.waiting_get.borrow_mut().take() {
258                    waker.wake();
259                }
260                *self.state.waiting_put.borrow_mut() = Some(cx.waker().clone());
261                Poll::Pending
262            }
263            None => {
264                // Value already sent, woken because it has been consumed
265                *self.done.borrow_mut() = true;
266                Poll::Ready(())
267            }
268        }
269    }
270}
271
272impl<T> FusedFuture for PortPut<T>
273where
274    T: SimObject,
275{
276    fn is_terminated(&self) -> bool {
277        *self.done.borrow()
278    }
279}
280
281pub struct PortTryPut<T>
282where
283    T: SimObject,
284{
285    state: Rc<PortState<T>>,
286    done: bool,
287}
288
289impl<T> Future for PortTryPut<T>
290where
291    T: SimObject,
292{
293    type Output = ();
294
295    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
296        if self.state.waiting_get.borrow().is_some() {
297            self.done = true;
298            Poll::Ready(())
299        } else {
300            *self.state.waiting_put.borrow_mut() = Some(cx.waker().clone());
301            Poll::Pending
302        }
303    }
304}
305
306impl<T> FusedFuture for PortTryPut<T>
307where
308    T: SimObject,
309{
310    fn is_terminated(&self) -> bool {
311        self.done
312    }
313}
314
315pub struct PortGet<T>
316where
317    T: SimObject,
318{
319    state: Rc<PortState<T>>,
320    done: bool,
321}
322
323impl<T> Future for PortGet<T>
324where
325    T: SimObject,
326{
327    type Output = T;
328
329    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
330        let value = self.state.value.borrow_mut().take();
331        if let Some(value) = value {
332            self.done = true;
333
334            // Track the object through the port monitor if there is one
335            if let Some(monitor) = self.state.monitor.as_ref() {
336                monitor.sample(&value);
337            }
338
339            if let Some(waker) = self.state.waiting_put.borrow_mut().take() {
340                waker.wake();
341            }
342            Poll::Ready(value)
343        } else {
344            if let Some(waker) = self.state.waiting_put.borrow_mut().take() {
345                waker.wake();
346            }
347
348            *self.state.waiting_get.borrow_mut() = Some(cx.waker().clone());
349            Poll::Pending
350        }
351    }
352}
353
354impl<T> FusedFuture for PortGet<T>
355where
356    T: SimObject,
357{
358    fn is_terminated(&self) -> bool {
359        self.done
360    }
361}
362
363pub struct PortStartGet<T>
364where
365    T: SimObject,
366{
367    state: Rc<PortState<T>>,
368    done: bool,
369}
370
371impl<T> Future for PortStartGet<T>
372where
373    T: SimObject,
374{
375    type Output = T;
376
377    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
378        let value = self.state.value.borrow_mut().take();
379        if let Some(value) = value {
380            self.done = true;
381
382            // Track the object through the port monitor if there is one
383            if let Some(monitor) = self.state.monitor.as_ref() {
384                monitor.sample(&value);
385            }
386
387            Poll::Ready(value)
388        } else {
389            *self.state.waiting_get.borrow_mut() = Some(cx.waker().clone());
390            Poll::Pending
391        }
392    }
393}
394
395impl<T> FusedFuture for PortStartGet<T>
396where
397    T: SimObject,
398{
399    fn is_terminated(&self) -> bool {
400        self.done
401    }
402}