gwr_components/flow_controls/
limiter.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! This component can be placed between two components in order to limit the
4//! bandwidth between them.
5//!
6//! # Ports
7//!
8//! This component has the following ports:
9//!  - One [input port](gwr_engine::port::InPort): `rx`
10//!  - One [output port](gwr_engine::port::OutPort): `tx`
11
12use std::cell::RefCell;
13use std::rc::Rc;
14
15use async_trait::async_trait;
16use gwr_engine::engine::Engine;
17use gwr_engine::port::{InPort, OutPort, PortStateResult};
18use gwr_engine::time::clock::Clock;
19use gwr_engine::traits::{Runnable, SimObject};
20use gwr_engine::types::{SimError, SimResult};
21use gwr_model_builder::{EntityDisplay, EntityGet};
22use gwr_track::entity::Entity;
23use gwr_track::tracker::aka::Aka;
24
25use super::rate_limiter::RateLimiter;
26use crate::{connect_tx, port_rx, take_option};
27
28/// The [`Limiter`] is a component that will allow data through at a
29/// specified rate.
30///
31/// The rate is defined in bits-per-second.
32#[derive(EntityGet, EntityDisplay)]
33pub struct Limiter<T>
34where
35    T: SimObject,
36{
37    entity: Rc<Entity>,
38    limiter: Rc<RateLimiter<T>>,
39    tx: RefCell<Option<OutPort<T>>>,
40    rx: RefCell<Option<InPort<T>>>,
41}
42
43impl<T> Limiter<T>
44where
45    T: SimObject,
46{
47    pub fn new_and_register_with_renames(
48        engine: &Engine,
49        clock: &Clock,
50        parent: &Rc<Entity>,
51        name: &str,
52        aka: Option<&Aka>,
53        limiter: Rc<RateLimiter<T>>,
54    ) -> Result<Rc<Self>, SimError> {
55        let entity = Rc::new(Entity::new(parent, name));
56        let tx = OutPort::new_with_renames(&entity, "tx", aka);
57        let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
58        let rc_self = Rc::new(Self {
59            entity,
60            limiter,
61            tx: RefCell::new(Some(tx)),
62            rx: RefCell::new(Some(rx)),
63        });
64        engine.register(rc_self.clone());
65        Ok(rc_self)
66    }
67
68    pub fn new_and_register(
69        engine: &Engine,
70        clock: &Clock,
71        parent: &Rc<Entity>,
72        name: &str,
73        limiter: Rc<RateLimiter<T>>,
74    ) -> Result<Rc<Self>, SimError> {
75        Self::new_and_register_with_renames(engine, clock, parent, name, None, limiter)
76    }
77
78    pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
79        connect_tx!(self.tx, connect ; port_state)
80    }
81
82    pub fn port_rx(&self) -> PortStateResult<T> {
83        port_rx!(self.rx, state)
84    }
85}
86
87#[async_trait(?Send)]
88impl<T> Runnable for Limiter<T>
89where
90    T: SimObject,
91{
92    async fn run(&self) -> SimResult {
93        let rx = take_option!(self.rx);
94        let tx = take_option!(self.tx);
95        let limiter = &self.limiter;
96        loop {
97            // Get the value but without letting the OutPort complete
98            let value = rx.start_get()?.await;
99
100            let value_id = value.id();
101            let ticks = limiter.ticks(&value);
102            self.entity.track_enter(value_id);
103
104            tx.put(value)?.await;
105            limiter.delay_ticks(ticks).await;
106            self.entity.track_exit(value_id);
107
108            // Allow the OutPort to complete
109            rx.finish_get();
110        }
111    }
112}