gwr_components/
sink.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! A data sink.
4//!
5//! A [Sink] is an object that will accept and count all the data that
6//! is received on its input port.
7//!
8//! # Ports
9//!
10//! This component has:
11//!  - One [input port](gwr_engine::port::InPort): `rx`
12
13use std::cell::RefCell;
14use std::rc::Rc;
15
16use async_trait::async_trait;
17use gwr_engine::engine::Engine;
18use gwr_engine::port::{InPort, PortStateResult};
19use gwr_engine::traits::{Runnable, SimObject};
20use gwr_engine::types::{SimError, SimResult};
21use gwr_model_builder::EntityDisplay;
22use gwr_track::enter;
23use gwr_track::entity::Entity;
24
25use crate::{port_rx, take_option};
26
27#[derive(EntityDisplay)]
28pub struct Sink<T>
29where
30    T: SimObject,
31{
32    pub entity: Rc<Entity>,
33    sunk_count: RefCell<usize>,
34    rx: RefCell<Option<InPort<T>>>,
35}
36
37impl<T> Sink<T>
38where
39    T: SimObject,
40{
41    pub fn new_and_register(
42        engine: &Engine,
43        parent: &Rc<Entity>,
44        name: &str,
45    ) -> Result<Rc<Self>, SimError> {
46        let entity = Rc::new(Entity::new(parent, name));
47        let rx = InPort::new(&entity, "rx");
48        let rc_self = Rc::new(Self {
49            entity,
50            sunk_count: RefCell::new(0),
51            rx: RefCell::new(Some(rx)),
52        });
53        engine.register(rc_self.clone());
54        Ok(rc_self)
55    }
56
57    pub fn port_rx(&self) -> PortStateResult<T> {
58        port_rx!(self.rx, state)
59    }
60
61    #[must_use]
62    pub fn num_sunk(&self) -> usize {
63        *self.sunk_count.borrow()
64    }
65}
66
67#[async_trait(?Send)]
68impl<T> Runnable for Sink<T>
69where
70    T: SimObject,
71{
72    async fn run(&self) -> SimResult {
73        let rx = take_option!(self.rx);
74        loop {
75            let value = rx.get()?.await;
76            enter!(self.entity ; value.id());
77            *self.sunk_count.borrow_mut() += 1;
78        }
79    }
80}