gwr_components/
sink.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! A data sink.
4//!
5//! A [Sink] is an object that will accept and count all the data that
6//! is received on its input port.
7//!
8//! # Ports
9//!
10//! This component has:
11//!  - One [input port](gwr_engine::port::InPort): `rx`
12
13use std::cell::RefCell;
14use std::rc::Rc;
15
16use async_trait::async_trait;
17use gwr_engine::engine::Engine;
18use gwr_engine::port::{InPort, PortStateResult};
19use gwr_engine::time::clock::Clock;
20use gwr_engine::traits::{Runnable, SimObject};
21use gwr_engine::types::{SimError, SimResult};
22use gwr_model_builder::{EntityDisplay, EntityGet};
23use gwr_track::enter;
24use gwr_track::entity::Entity;
25use gwr_track::tracker::aka::Aka;
26
27use crate::{port_rx, take_option};
28
29#[derive(EntityGet, EntityDisplay)]
30pub struct Sink<T>
31where
32    T: SimObject,
33{
34    entity: Rc<Entity>,
35    sunk_count: RefCell<usize>,
36    rx: RefCell<Option<InPort<T>>>,
37}
38
39impl<T> Sink<T>
40where
41    T: SimObject,
42{
43    pub fn new_and_register_with_renames(
44        engine: &Engine,
45        clock: &Clock,
46        parent: &Rc<Entity>,
47        name: &str,
48        aka: Option<&Aka>,
49    ) -> Result<Rc<Self>, SimError> {
50        let entity = Rc::new(Entity::new(parent, name));
51        let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
52        let rc_self = Rc::new(Self {
53            entity,
54            sunk_count: RefCell::new(0),
55            rx: RefCell::new(Some(rx)),
56        });
57        engine.register(rc_self.clone());
58        Ok(rc_self)
59    }
60
61    pub fn new_and_register(
62        engine: &Engine,
63        clock: &Clock,
64        parent: &Rc<Entity>,
65        name: &str,
66    ) -> Result<Rc<Self>, SimError> {
67        Self::new_and_register_with_renames(engine, clock, parent, name, None)
68    }
69
70    pub fn port_rx(&self) -> PortStateResult<T> {
71        port_rx!(self.rx, state)
72    }
73
74    #[must_use]
75    pub fn num_sunk(&self) -> usize {
76        *self.sunk_count.borrow()
77    }
78}
79
80#[async_trait(?Send)]
81impl<T> Runnable for Sink<T>
82where
83    T: SimObject,
84{
85    async fn run(&self) -> SimResult {
86        let rx = take_option!(self.rx);
87        loop {
88            let value = rx.get()?.await;
89            enter!(self.entity ; value.id());
90            *self.sunk_count.borrow_mut() += 1;
91        }
92    }
93}