gwr_components/flow_controls/
limiter.rs1use std::cell::RefCell;
13use std::rc::Rc;
14
15use async_trait::async_trait;
16use gwr_engine::engine::Engine;
17use gwr_engine::port::{InPort, OutPort, PortStateResult};
18use gwr_engine::time::clock::Clock;
19use gwr_engine::traits::{Runnable, SimObject};
20use gwr_engine::types::{SimError, SimResult};
21use gwr_model_builder::{EntityDisplay, EntityGet};
22use gwr_track::entity::Entity;
23use gwr_track::tracker::aka::Aka;
24
25use super::rate_limiter::RateLimiter;
26use crate::{connect_tx, port_rx, take_option};
27
28#[derive(EntityGet, EntityDisplay)]
33pub struct Limiter<T>
34where
35 T: SimObject,
36{
37 entity: Rc<Entity>,
38 limiter: Rc<RateLimiter<T>>,
39 tx: RefCell<Option<OutPort<T>>>,
40 rx: RefCell<Option<InPort<T>>>,
41}
42
43impl<T> Limiter<T>
44where
45 T: SimObject,
46{
47 pub fn new_and_register_with_renames(
48 engine: &Engine,
49 clock: &Clock,
50 parent: &Rc<Entity>,
51 name: &str,
52 aka: Option<&Aka>,
53 limiter: Rc<RateLimiter<T>>,
54 ) -> Result<Rc<Self>, SimError> {
55 let entity = Rc::new(Entity::new(parent, name));
56 let tx = OutPort::new_with_renames(&entity, "tx", aka);
57 let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
58 let rc_self = Rc::new(Self {
59 entity,
60 limiter,
61 tx: RefCell::new(Some(tx)),
62 rx: RefCell::new(Some(rx)),
63 });
64 engine.register(rc_self.clone());
65 Ok(rc_self)
66 }
67
68 pub fn new_and_register(
69 engine: &Engine,
70 clock: &Clock,
71 parent: &Rc<Entity>,
72 name: &str,
73 limiter: Rc<RateLimiter<T>>,
74 ) -> Result<Rc<Self>, SimError> {
75 Self::new_and_register_with_renames(engine, clock, parent, name, None, limiter)
76 }
77
78 pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
79 connect_tx!(self.tx, connect ; port_state)
80 }
81
82 pub fn port_rx(&self) -> PortStateResult<T> {
83 port_rx!(self.rx, state)
84 }
85}
86
87#[async_trait(?Send)]
88impl<T> Runnable for Limiter<T>
89where
90 T: SimObject,
91{
92 async fn run(&self) -> SimResult {
93 let rx = take_option!(self.rx);
94 let tx = take_option!(self.tx);
95 let limiter = &self.limiter;
96 loop {
97 let value = rx.start_get()?.await;
99
100 let value_id = value.id();
101 let ticks = limiter.ticks(&value);
102 self.entity.track_enter(value_id);
103
104 tx.put(value)?.await;
105 limiter.delay_ticks(ticks).await;
106 self.entity.track_exit(value_id);
107
108 rx.finish_get();
110 }
111 }
112}