1use std::cell::RefCell;
14use std::rc::Rc;
15
16use async_trait::async_trait;
17use gwr_engine::engine::Engine;
18use gwr_engine::port::{InPort, PortStateResult};
19use gwr_engine::traits::{Runnable, SimObject};
20use gwr_engine::types::{SimError, SimResult};
21use gwr_model_builder::EntityDisplay;
22use gwr_track::enter;
23use gwr_track::entity::Entity;
24
25use crate::{port_rx, take_option};
26
27#[derive(EntityDisplay)]
28pub struct Sink<T>
29where
30 T: SimObject,
31{
32 pub entity: Rc<Entity>,
33 sunk_count: RefCell<usize>,
34 rx: RefCell<Option<InPort<T>>>,
35}
36
37impl<T> Sink<T>
38where
39 T: SimObject,
40{
41 pub fn new_and_register(
42 engine: &Engine,
43 parent: &Rc<Entity>,
44 name: &str,
45 ) -> Result<Rc<Self>, SimError> {
46 let entity = Rc::new(Entity::new(parent, name));
47 let rx = InPort::new(&entity, "rx");
48 let rc_self = Rc::new(Self {
49 entity,
50 sunk_count: RefCell::new(0),
51 rx: RefCell::new(Some(rx)),
52 });
53 engine.register(rc_self.clone());
54 Ok(rc_self)
55 }
56
57 pub fn port_rx(&self) -> PortStateResult<T> {
58 port_rx!(self.rx, state)
59 }
60
61 #[must_use]
62 pub fn num_sunk(&self) -> usize {
63 *self.sunk_count.borrow()
64 }
65}
66
67#[async_trait(?Send)]
68impl<T> Runnable for Sink<T>
69where
70 T: SimObject,
71{
72 async fn run(&self) -> SimResult {
73 let rx = take_option!(self.rx);
74 loop {
75 let value = rx.get()?.await;
76 enter!(self.entity ; value.id());
77 *self.sunk_count.borrow_mut() += 1;
78 }
79 }
80}