gwr_components/flow_controls/
limiter.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! This component can be placed between two components in order to limit the
4//! bandwidth between them.
5//!
6//! # Ports
7//!
8//! This component has the following ports:
9//!  - One [input port](gwr_engine::port::InPort): `rx`
10//!  - One [output port](gwr_engine::port::OutPort): `tx`
11
12use std::cell::RefCell;
13use std::rc::Rc;
14
15use async_trait::async_trait;
16use gwr_engine::engine::Engine;
17use gwr_engine::port::{InPort, OutPort, PortStateResult};
18use gwr_engine::time::clock::Clock;
19use gwr_engine::traits::{Runnable, SimObject};
20use gwr_engine::types::{SimError, SimResult};
21use gwr_model_builder::{EntityDisplay, EntityGet};
22use gwr_track::entity::Entity;
23use gwr_track::tracker::aka::Aka;
24use gwr_track::{enter, exit};
25
26use super::rate_limiter::RateLimiter;
27use crate::{connect_tx, port_rx, take_option};
28
29/// The [`Limiter`] is a component that will allow data through at a
30/// specified rate.
31///
32/// The rate is defined in bits-per-second.
33#[derive(EntityGet, EntityDisplay)]
34pub struct Limiter<T>
35where
36    T: SimObject,
37{
38    entity: Rc<Entity>,
39    limiter: Rc<RateLimiter<T>>,
40    tx: RefCell<Option<OutPort<T>>>,
41    rx: RefCell<Option<InPort<T>>>,
42}
43
44impl<T> Limiter<T>
45where
46    T: SimObject,
47{
48    pub fn new_and_register_with_renames(
49        engine: &Engine,
50        clock: &Clock,
51        parent: &Rc<Entity>,
52        name: &str,
53        aka: Option<&Aka>,
54        limiter: Rc<RateLimiter<T>>,
55    ) -> Result<Rc<Self>, SimError> {
56        let entity = Rc::new(Entity::new(parent, name));
57        let tx = OutPort::new_with_renames(&entity, "tx", aka);
58        let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
59        let rc_self = Rc::new(Self {
60            entity,
61            limiter,
62            tx: RefCell::new(Some(tx)),
63            rx: RefCell::new(Some(rx)),
64        });
65        engine.register(rc_self.clone());
66        Ok(rc_self)
67    }
68
69    pub fn new_and_register(
70        engine: &Engine,
71        clock: &Clock,
72        parent: &Rc<Entity>,
73        name: &str,
74        limiter: Rc<RateLimiter<T>>,
75    ) -> Result<Rc<Self>, SimError> {
76        Self::new_and_register_with_renames(engine, clock, parent, name, None, limiter)
77    }
78
79    pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
80        connect_tx!(self.tx, connect ; port_state)
81    }
82
83    pub fn port_rx(&self) -> PortStateResult<T> {
84        port_rx!(self.rx, state)
85    }
86}
87
88#[async_trait(?Send)]
89impl<T> Runnable for Limiter<T>
90where
91    T: SimObject,
92{
93    async fn run(&self) -> SimResult {
94        let rx = take_option!(self.rx);
95        let tx = take_option!(self.tx);
96        let limiter = &self.limiter;
97        loop {
98            // Get the value but without letting the OutPort complete
99            let value = rx.start_get()?.await;
100
101            let value_id = value.id();
102            let ticks = limiter.ticks(&value);
103            enter!(self.entity ; value_id);
104
105            tx.put(value)?.await;
106            limiter.delay_ticks(ticks).await;
107            exit!(self.entity ; value_id);
108
109            // Allow the OutPort to complete
110            rx.finish_get();
111        }
112    }
113}