1use std::cell::RefCell;
6use std::fmt;
7use std::pin::Pin;
8use std::rc::Rc;
9use std::task::{Context, Poll, Waker};
10
11use futures::Future;
12use futures::future::FusedFuture;
13use gwr_track::connect;
14use gwr_track::entity::{Entity, GetEntity};
15use gwr_track::tracker::aka::Aka;
16
17use crate::engine::Engine;
18use crate::port::monitor::Monitor;
19use crate::sim_error;
20use crate::time::clock::Clock;
21use crate::traits::SimObject;
22use crate::types::{SimError, SimResult};
23
24pub mod monitor;
25
26pub type PortStateResult<T> = Result<Rc<PortState<T>>, SimError>;
27pub type PortGetResult<T> = Result<PortGet<T>, SimError>;
28pub type PortStartGetResult<T> = Result<PortStartGet<T>, SimError>;
29pub type PortPutResult<T> = Result<PortPut<T>, SimError>;
30pub type PortTryPutResult<T> = Result<PortTryPut<T>, SimError>;
31
32pub struct PortState<T>
33where
34 T: SimObject,
35{
36 value: RefCell<Option<T>>,
37 waiting_get: RefCell<Option<Waker>>,
38 waiting_put: RefCell<Option<Waker>>,
39 pub in_port_entity: Rc<Entity>,
40 monitor: Option<Rc<Monitor>>,
41}
42
43impl<T> PortState<T>
44where
45 T: SimObject,
46{
47 fn new(
48 engine: &Engine,
49 clock: &Clock,
50 in_port_entity: Rc<Entity>,
51 window_size_ticks: Option<u64>,
52 ) -> Self {
53 let monitor = window_size_ticks.map(|window_size_ticks| {
54 Monitor::new_and_register(engine, &in_port_entity, clock, window_size_ticks)
55 });
56 Self {
57 value: RefCell::new(None),
58 waiting_get: RefCell::new(None),
59 waiting_put: RefCell::new(None),
60 in_port_entity,
61 monitor,
62 }
63 }
64}
65
66pub struct InPort<T>
67where
68 T: SimObject,
69{
70 entity: Rc<Entity>,
71 state: Rc<PortState<T>>,
72 connected: RefCell<bool>,
73}
74
75impl<T> fmt::Display for InPort<T>
76where
77 T: SimObject,
78{
79 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80 self.entity.fmt(f)
81 }
82}
83
84impl<T> InPort<T>
85where
86 T: SimObject,
87{
88 #[must_use]
89 pub fn new(engine: &Engine, clock: &Clock, parent: &Rc<Entity>, name: &str) -> Self {
90 Self::new_with_renames(engine, clock, parent, name, None)
91 }
92
93 #[must_use]
94 pub fn new_with_renames(
95 engine: &Engine,
96 clock: &Clock,
97 parent: &Rc<Entity>,
98 name: &str,
99 aka: Option<&Aka>,
100 ) -> Self {
101 let entity = Rc::new(Entity::new_with_renames(parent, name, aka));
102 let monitor_window_size = entity.tracker.monitoring_window_size_for(entity.id);
103 Self {
104 entity: entity.clone(),
105 state: Rc::new(PortState::new(engine, clock, entity, monitor_window_size)),
106 connected: RefCell::new(false),
107 }
108 }
109
110 pub fn state(&self) -> PortStateResult<T> {
111 if *self.connected.borrow() {
112 return sim_error!(format!("{self} already connected"));
113 }
114
115 *self.connected.borrow_mut() = true;
116 Ok(self.state.clone())
117 }
118
119 #[must_use = "Futures do nothing unless you `.await` or otherwise use them"]
120 pub fn get(&self) -> PortGetResult<T> {
121 if !*self.connected.borrow() {
122 return sim_error!(format!("{self} not connected"));
123 }
124
125 Ok(PortGet {
126 state: self.state.clone(),
127 done: false,
128 })
129 }
130
131 #[must_use = "Futures do nothing unless you `.await` or otherwise use them"]
133 pub fn start_get(&self) -> PortStartGetResult<T> {
134 if !*self.connected.borrow() {
135 return sim_error!(format!("{self} not connected"));
136 }
137
138 Ok(PortStartGet {
139 state: self.state.clone(),
140 done: false,
141 })
142 }
143
144 pub fn finish_get(&self) {
146 if let Some(waker) = self.state.waiting_put.borrow_mut().take() {
147 waker.wake();
148 }
149 }
150}
151
152pub struct OutPort<T>
153where
154 T: SimObject,
155{
156 entity: Rc<Entity>,
157 state: Option<Rc<PortState<T>>>,
158}
159
160impl<T> GetEntity for OutPort<T>
161where
162 T: SimObject,
163{
164 fn entity(&self) -> &Rc<Entity> {
165 &self.entity
166 }
167}
168
169impl<T> fmt::Display for OutPort<T>
170where
171 T: SimObject,
172{
173 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
174 self.entity.fmt(f)
175 }
176}
177
178impl<T> OutPort<T>
179where
180 T: SimObject,
181{
182 #[must_use]
183 pub fn new(parent: &Rc<Entity>, name: &str) -> Self {
184 Self::new_with_renames(parent, name, None)
185 }
186
187 #[must_use]
188 pub fn new_with_renames(parent: &Rc<Entity>, name: &str, aka: Option<&Aka>) -> Self {
189 let entity = Rc::new(Entity::new_with_renames(parent, name, aka));
190 Self {
191 entity,
192 state: None,
193 }
194 }
195
196 pub fn connect(&mut self, port_state: PortStateResult<T>) -> SimResult {
197 let port_state = port_state?;
198
199 connect!(self.entity ; port_state.in_port_entity);
200 match self.state {
201 Some(_) => {
202 return sim_error!(format!("{self} already connected"));
203 }
204 None => {
205 self.state = Some(port_state);
206 }
207 }
208 Ok(())
209 }
210
211 #[must_use = "Futures do nothing unless you `.await` or otherwise use them"]
212 pub fn put(&self, value: T) -> PortPutResult<T> {
213 let state = match self.state.as_ref() {
214 Some(s) => s.clone(),
215 None => return sim_error!(format!("{self} not connected")),
216 };
217 Ok(PortPut {
218 state,
219 value: RefCell::new(Some(value)),
220 done: RefCell::new(false),
221 })
222 }
223
224 #[must_use = "Futures do nothing unless you `.await` or otherwise use them"]
225 pub fn try_put(&self) -> PortTryPutResult<T> {
226 let state = match self.state.as_ref() {
227 Some(s) => s.clone(),
228 None => return sim_error!(format!("{self} not connected")),
229 };
230 Ok(PortTryPut { state, done: false })
231 }
232}
233
234pub struct PortPut<T>
235where
236 T: SimObject,
237{
238 state: Rc<PortState<T>>,
239 value: RefCell<Option<T>>,
240 done: RefCell<bool>,
241}
242
243impl<T> Future for PortPut<T>
244where
245 T: SimObject,
246{
247 type Output = ();
248
249 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
250 match self.value.take() {
251 Some(value) => {
252 assert!(self.state.value.borrow().is_none());
255
256 *self.state.value.borrow_mut() = Some(value);
257 if let Some(waker) = self.state.waiting_get.borrow_mut().take() {
258 waker.wake();
259 }
260 *self.state.waiting_put.borrow_mut() = Some(cx.waker().clone());
261 Poll::Pending
262 }
263 None => {
264 *self.done.borrow_mut() = true;
266 Poll::Ready(())
267 }
268 }
269 }
270}
271
272impl<T> FusedFuture for PortPut<T>
273where
274 T: SimObject,
275{
276 fn is_terminated(&self) -> bool {
277 *self.done.borrow()
278 }
279}
280
281pub struct PortTryPut<T>
282where
283 T: SimObject,
284{
285 state: Rc<PortState<T>>,
286 done: bool,
287}
288
289impl<T> Future for PortTryPut<T>
290where
291 T: SimObject,
292{
293 type Output = ();
294
295 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
296 if self.state.waiting_get.borrow().is_some() {
297 self.done = true;
298 Poll::Ready(())
299 } else {
300 *self.state.waiting_put.borrow_mut() = Some(cx.waker().clone());
301 Poll::Pending
302 }
303 }
304}
305
306impl<T> FusedFuture for PortTryPut<T>
307where
308 T: SimObject,
309{
310 fn is_terminated(&self) -> bool {
311 self.done
312 }
313}
314
315pub struct PortGet<T>
316where
317 T: SimObject,
318{
319 state: Rc<PortState<T>>,
320 done: bool,
321}
322
323impl<T> Future for PortGet<T>
324where
325 T: SimObject,
326{
327 type Output = T;
328
329 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
330 let value = self.state.value.borrow_mut().take();
331 if let Some(value) = value {
332 self.done = true;
333
334 if let Some(monitor) = self.state.monitor.as_ref() {
336 monitor.sample(&value);
337 }
338
339 if let Some(waker) = self.state.waiting_put.borrow_mut().take() {
340 waker.wake();
341 }
342 Poll::Ready(value)
343 } else {
344 if let Some(waker) = self.state.waiting_put.borrow_mut().take() {
345 waker.wake();
346 }
347
348 *self.state.waiting_get.borrow_mut() = Some(cx.waker().clone());
349 Poll::Pending
350 }
351 }
352}
353
354impl<T> FusedFuture for PortGet<T>
355where
356 T: SimObject,
357{
358 fn is_terminated(&self) -> bool {
359 self.done
360 }
361}
362
363pub struct PortStartGet<T>
364where
365 T: SimObject,
366{
367 state: Rc<PortState<T>>,
368 done: bool,
369}
370
371impl<T> Future for PortStartGet<T>
372where
373 T: SimObject,
374{
375 type Output = T;
376
377 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
378 let value = self.state.value.borrow_mut().take();
379 if let Some(value) = value {
380 self.done = true;
381
382 if let Some(monitor) = self.state.monitor.as_ref() {
384 monitor.sample(&value);
385 }
386
387 Poll::Ready(value)
388 } else {
389 *self.state.waiting_get.borrow_mut() = Some(cx.waker().clone());
390 Poll::Pending
391 }
392 }
393}
394
395impl<T> FusedFuture for PortStartGet<T>
396where
397 T: SimObject,
398{
399 fn is_terminated(&self) -> bool {
400 self.done
401 }
402}