gwr_components/flow_controls/
limiter.rs1use std::cell::RefCell;
13use std::rc::Rc;
14
15use async_trait::async_trait;
16use gwr_engine::engine::Engine;
17use gwr_engine::port::{InPort, OutPort, PortStateResult};
18use gwr_engine::traits::{Runnable, SimObject};
19use gwr_engine::types::{SimError, SimResult};
20use gwr_model_builder::EntityDisplay;
21use gwr_track::entity::Entity;
22use gwr_track::{enter, exit};
23
24use super::rate_limiter::RateLimiter;
25use crate::{connect_tx, port_rx, take_option};
26
27#[derive(EntityDisplay)]
32pub struct Limiter<T>
33where
34 T: SimObject,
35{
36 pub entity: Rc<Entity>,
37 limiter: Rc<RateLimiter<T>>,
38 tx: RefCell<Option<OutPort<T>>>,
39 rx: RefCell<Option<InPort<T>>>,
40}
41
42impl<T> Limiter<T>
43where
44 T: SimObject,
45{
46 pub fn new_and_register(
47 engine: &Engine,
48 parent: &Rc<Entity>,
49 name: &str,
50 limiter: Rc<RateLimiter<T>>,
51 ) -> Result<Rc<Self>, SimError> {
52 let entity = Rc::new(Entity::new(parent, name));
53 let tx = OutPort::new(&entity, "tx");
54 let rx = InPort::new(&entity, "rx");
55 let rc_self = Rc::new(Self {
56 entity,
57 limiter,
58 tx: RefCell::new(Some(tx)),
59 rx: RefCell::new(Some(rx)),
60 });
61 engine.register(rc_self.clone());
62 Ok(rc_self)
63 }
64
65 pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
66 connect_tx!(self.tx, connect ; port_state)
67 }
68
69 pub fn port_rx(&self) -> PortStateResult<T> {
70 port_rx!(self.rx, state)
71 }
72}
73
74#[async_trait(?Send)]
75impl<T> Runnable for Limiter<T>
76where
77 T: SimObject,
78{
79 async fn run(&self) -> SimResult {
80 let rx = take_option!(self.rx);
81 let tx = take_option!(self.tx);
82 let limiter = &self.limiter;
83 loop {
84 let value = rx.start_get()?.await;
86
87 let value_id = value.id();
88 let ticks = limiter.ticks(&value);
89 enter!(self.entity ; value_id);
90
91 tx.put(value)?.await;
92 limiter.delay_ticks(ticks).await;
93 exit!(self.entity ; value_id);
94
95 rx.finish_get();
97 }
98 }
99}