1use std::cell::RefCell;
14use std::rc::Rc;
15
16use async_trait::async_trait;
17use gwr_engine::engine::Engine;
18use gwr_engine::port::{InPort, PortStateResult};
19use gwr_engine::time::clock::Clock;
20use gwr_engine::traits::{Runnable, SimObject};
21use gwr_engine::types::{SimError, SimResult};
22use gwr_model_builder::{EntityDisplay, EntityGet};
23use gwr_track::enter;
24use gwr_track::entity::Entity;
25use gwr_track::tracker::aka::Aka;
26
27use crate::{port_rx, take_option};
28
29#[derive(EntityGet, EntityDisplay)]
30pub struct Sink<T>
31where
32 T: SimObject,
33{
34 entity: Rc<Entity>,
35 sunk_count: RefCell<usize>,
36 rx: RefCell<Option<InPort<T>>>,
37}
38
39impl<T> Sink<T>
40where
41 T: SimObject,
42{
43 pub fn new_and_register_with_renames(
44 engine: &Engine,
45 clock: &Clock,
46 parent: &Rc<Entity>,
47 name: &str,
48 aka: Option<&Aka>,
49 ) -> Result<Rc<Self>, SimError> {
50 let entity = Rc::new(Entity::new(parent, name));
51 let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
52 let rc_self = Rc::new(Self {
53 entity,
54 sunk_count: RefCell::new(0),
55 rx: RefCell::new(Some(rx)),
56 });
57 engine.register(rc_self.clone());
58 Ok(rc_self)
59 }
60
61 pub fn new_and_register(
62 engine: &Engine,
63 clock: &Clock,
64 parent: &Rc<Entity>,
65 name: &str,
66 ) -> Result<Rc<Self>, SimError> {
67 Self::new_and_register_with_renames(engine, clock, parent, name, None)
68 }
69
70 pub fn port_rx(&self) -> PortStateResult<T> {
71 port_rx!(self.rx, state)
72 }
73
74 #[must_use]
75 pub fn num_sunk(&self) -> usize {
76 *self.sunk_count.borrow()
77 }
78}
79
80#[async_trait(?Send)]
81impl<T> Runnable for Sink<T>
82where
83 T: SimObject,
84{
85 async fn run(&self) -> SimResult {
86 let rx = take_option!(self.rx);
87 loop {
88 let value = rx.get()?.await;
89 enter!(self.entity ; value.id());
90 *self.sunk_count.borrow_mut() += 1;
91 }
92 }
93}