1use std::cell::RefCell;
14use std::rc::Rc;
15
16use async_trait::async_trait;
17use gwr_engine::engine::Engine;
18use gwr_engine::port::{InPort, PortStateResult};
19use gwr_engine::time::clock::Clock;
20use gwr_engine::traits::{Runnable, SimObject};
21use gwr_engine::types::{SimError, SimResult};
22use gwr_model_builder::{EntityDisplay, EntityGet};
23use gwr_track::entity::Entity;
24use gwr_track::tracker::aka::Aka;
25
26use crate::{port_rx, take_option};
27
28#[derive(EntityGet, EntityDisplay)]
29pub struct Sink<T>
30where
31 T: SimObject,
32{
33 entity: Rc<Entity>,
34 sunk_count: RefCell<usize>,
35 rx: RefCell<Option<InPort<T>>>,
36}
37
38impl<T> Sink<T>
39where
40 T: SimObject,
41{
42 pub fn new_and_register_with_renames(
43 engine: &Engine,
44 clock: &Clock,
45 parent: &Rc<Entity>,
46 name: &str,
47 aka: Option<&Aka>,
48 ) -> Result<Rc<Self>, SimError> {
49 let entity = Rc::new(Entity::new(parent, name));
50 let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
51 let rc_self = Rc::new(Self {
52 entity,
53 sunk_count: RefCell::new(0),
54 rx: RefCell::new(Some(rx)),
55 });
56 engine.register(rc_self.clone());
57 Ok(rc_self)
58 }
59
60 pub fn new_and_register(
61 engine: &Engine,
62 clock: &Clock,
63 parent: &Rc<Entity>,
64 name: &str,
65 ) -> Result<Rc<Self>, SimError> {
66 Self::new_and_register_with_renames(engine, clock, parent, name, None)
67 }
68
69 pub fn port_rx(&self) -> PortStateResult<T> {
70 port_rx!(self.rx, state)
71 }
72
73 #[must_use]
74 pub fn num_sunk(&self) -> usize {
75 *self.sunk_count.borrow()
76 }
77}
78
79#[async_trait(?Send)]
80impl<T> Runnable for Sink<T>
81where
82 T: SimObject,
83{
84 async fn run(&self) -> SimResult {
85 let rx = take_option!(self.rx);
86 loop {
87 let value = rx.get()?.await;
88 self.entity.track_enter(value.id());
89 *self.sunk_count.borrow_mut() += 1;
90 }
91 }
92}