gwr_components/flow_controls/
limiter.rs1use std::cell::RefCell;
13use std::rc::Rc;
14
15use async_trait::async_trait;
16use gwr_engine::engine::Engine;
17use gwr_engine::port::{InPort, OutPort, PortStateResult};
18use gwr_engine::time::clock::Clock;
19use gwr_engine::traits::{Runnable, SimObject};
20use gwr_engine::types::{SimError, SimResult};
21use gwr_model_builder::{EntityDisplay, EntityGet};
22use gwr_track::entity::Entity;
23use gwr_track::tracker::aka::Aka;
24use gwr_track::{enter, exit};
25
26use super::rate_limiter::RateLimiter;
27use crate::{connect_tx, port_rx, take_option};
28
29#[derive(EntityGet, EntityDisplay)]
34pub struct Limiter<T>
35where
36 T: SimObject,
37{
38 entity: Rc<Entity>,
39 limiter: Rc<RateLimiter<T>>,
40 tx: RefCell<Option<OutPort<T>>>,
41 rx: RefCell<Option<InPort<T>>>,
42}
43
44impl<T> Limiter<T>
45where
46 T: SimObject,
47{
48 pub fn new_and_register_with_renames(
49 engine: &Engine,
50 clock: &Clock,
51 parent: &Rc<Entity>,
52 name: &str,
53 aka: Option<&Aka>,
54 limiter: Rc<RateLimiter<T>>,
55 ) -> Result<Rc<Self>, SimError> {
56 let entity = Rc::new(Entity::new(parent, name));
57 let tx = OutPort::new_with_renames(&entity, "tx", aka);
58 let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
59 let rc_self = Rc::new(Self {
60 entity,
61 limiter,
62 tx: RefCell::new(Some(tx)),
63 rx: RefCell::new(Some(rx)),
64 });
65 engine.register(rc_self.clone());
66 Ok(rc_self)
67 }
68
69 pub fn new_and_register(
70 engine: &Engine,
71 clock: &Clock,
72 parent: &Rc<Entity>,
73 name: &str,
74 limiter: Rc<RateLimiter<T>>,
75 ) -> Result<Rc<Self>, SimError> {
76 Self::new_and_register_with_renames(engine, clock, parent, name, None, limiter)
77 }
78
79 pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
80 connect_tx!(self.tx, connect ; port_state)
81 }
82
83 pub fn port_rx(&self) -> PortStateResult<T> {
84 port_rx!(self.rx, state)
85 }
86}
87
88#[async_trait(?Send)]
89impl<T> Runnable for Limiter<T>
90where
91 T: SimObject,
92{
93 async fn run(&self) -> SimResult {
94 let rx = take_option!(self.rx);
95 let tx = take_option!(self.tx);
96 let limiter = &self.limiter;
97 loop {
98 let value = rx.start_get()?.await;
100
101 let value_id = value.id();
102 let ticks = limiter.ticks(&value);
103 enter!(self.entity ; value_id);
104
105 tx.put(value)?.await;
106 limiter.delay_ticks(ticks).await;
107 exit!(self.entity ; value_id);
108
109 rx.finish_get();
111 }
112 }
113}