gwr_components/flow_controls/
limiter.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! This component can be placed between two components in order to limit the
4//! bandwidth between them.
5//!
6//! # Ports
7//!
8//! This component has the following ports:
9//!  - One [input port](gwr_engine::port::InPort): `rx`
10//!  - One [output port](gwr_engine::port::OutPort): `tx`
11
12use std::cell::RefCell;
13use std::rc::Rc;
14
15use async_trait::async_trait;
16use gwr_engine::engine::Engine;
17use gwr_engine::port::{InPort, OutPort, PortStateResult};
18use gwr_engine::traits::{Runnable, SimObject};
19use gwr_engine::types::{SimError, SimResult};
20use gwr_model_builder::EntityDisplay;
21use gwr_track::entity::Entity;
22use gwr_track::{enter, exit};
23
24use super::rate_limiter::RateLimiter;
25use crate::{connect_tx, port_rx, take_option};
26
27/// The [`Limiter`] is a component that will allow data through at a
28/// specified rate.
29///
30/// The rate is defined in bits-per-second.
31#[derive(EntityDisplay)]
32pub struct Limiter<T>
33where
34    T: SimObject,
35{
36    pub entity: Rc<Entity>,
37    limiter: Rc<RateLimiter<T>>,
38    tx: RefCell<Option<OutPort<T>>>,
39    rx: RefCell<Option<InPort<T>>>,
40}
41
42impl<T> Limiter<T>
43where
44    T: SimObject,
45{
46    pub fn new_and_register(
47        engine: &Engine,
48        parent: &Rc<Entity>,
49        name: &str,
50        limiter: Rc<RateLimiter<T>>,
51    ) -> Result<Rc<Self>, SimError> {
52        let entity = Rc::new(Entity::new(parent, name));
53        let tx = OutPort::new(&entity, "tx");
54        let rx = InPort::new(&entity, "rx");
55        let rc_self = Rc::new(Self {
56            entity,
57            limiter,
58            tx: RefCell::new(Some(tx)),
59            rx: RefCell::new(Some(rx)),
60        });
61        engine.register(rc_self.clone());
62        Ok(rc_self)
63    }
64
65    pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
66        connect_tx!(self.tx, connect ; port_state)
67    }
68
69    pub fn port_rx(&self) -> PortStateResult<T> {
70        port_rx!(self.rx, state)
71    }
72}
73
74#[async_trait(?Send)]
75impl<T> Runnable for Limiter<T>
76where
77    T: SimObject,
78{
79    async fn run(&self) -> SimResult {
80        let rx = take_option!(self.rx);
81        let tx = take_option!(self.tx);
82        let limiter = &self.limiter;
83        loop {
84            // Get the value but without letting the OutPort complete
85            let value = rx.start_get()?.await;
86
87            let value_id = value.id();
88            let ticks = limiter.ticks(&value);
89            enter!(self.entity ; value_id);
90
91            tx.put(value)?.await;
92            limiter.delay_ticks(ticks).await;
93            exit!(self.entity ; value_id);
94
95            // Allow the OutPort to complete
96            rx.finish_get();
97        }
98    }
99}