gwr_components/
sink.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! A data sink.
4//!
5//! A [Sink] is an object that will accept and count all the data that
6//! is received on its input port.
7//!
8//! # Ports
9//!
10//! This component has:
11//!  - One [input port](gwr_engine::port::InPort): `rx`
12
13use std::cell::RefCell;
14use std::rc::Rc;
15
16use async_trait::async_trait;
17use gwr_engine::engine::Engine;
18use gwr_engine::port::{InPort, PortStateResult};
19use gwr_engine::time::clock::Clock;
20use gwr_engine::traits::{Runnable, SimObject};
21use gwr_engine::types::{SimError, SimResult};
22use gwr_model_builder::{EntityDisplay, EntityGet};
23use gwr_track::entity::Entity;
24use gwr_track::tracker::aka::Aka;
25
26use crate::{port_rx, take_option};
27
28#[derive(EntityGet, EntityDisplay)]
29pub struct Sink<T>
30where
31    T: SimObject,
32{
33    entity: Rc<Entity>,
34    sunk_count: RefCell<usize>,
35    rx: RefCell<Option<InPort<T>>>,
36}
37
38impl<T> Sink<T>
39where
40    T: SimObject,
41{
42    pub fn new_and_register_with_renames(
43        engine: &Engine,
44        clock: &Clock,
45        parent: &Rc<Entity>,
46        name: &str,
47        aka: Option<&Aka>,
48    ) -> Result<Rc<Self>, SimError> {
49        let entity = Rc::new(Entity::new(parent, name));
50        let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
51        let rc_self = Rc::new(Self {
52            entity,
53            sunk_count: RefCell::new(0),
54            rx: RefCell::new(Some(rx)),
55        });
56        engine.register(rc_self.clone());
57        Ok(rc_self)
58    }
59
60    pub fn new_and_register(
61        engine: &Engine,
62        clock: &Clock,
63        parent: &Rc<Entity>,
64        name: &str,
65    ) -> Result<Rc<Self>, SimError> {
66        Self::new_and_register_with_renames(engine, clock, parent, name, None)
67    }
68
69    pub fn port_rx(&self) -> PortStateResult<T> {
70        port_rx!(self.rx, state)
71    }
72
73    #[must_use]
74    pub fn num_sunk(&self) -> usize {
75        *self.sunk_count.borrow()
76    }
77}
78
79#[async_trait(?Send)]
80impl<T> Runnable for Sink<T>
81where
82    T: SimObject,
83{
84    async fn run(&self) -> SimResult {
85        let rx = take_option!(self.rx);
86        loop {
87            let value = rx.get()?.await;
88            self.entity.track_enter(value.id());
89            *self.sunk_count.borrow_mut() += 1;
90        }
91    }
92}