gwr_components/flow_controls/
credit_limiter.rs1use std::cell::RefCell;
12use std::rc::Rc;
13
14use async_trait::async_trait;
15use gwr_engine::engine::Engine;
16use gwr_engine::executor::Spawner;
17use gwr_engine::port::{InPort, OutPort, PortStateResult};
18use gwr_engine::spawn_subcomponent;
19use gwr_engine::time::clock::Clock;
20use gwr_engine::traits::{Runnable, SimObject};
21use gwr_engine::types::{SimError, SimResult};
22use gwr_model_builder::{EntityDisplay, EntityGet};
23use gwr_resources::Resource;
24use gwr_track::entity::Entity;
25use gwr_track::tracker::aka::Aka;
26use gwr_track::{build_aka, trace};
27
28use crate::types::Credit;
29use crate::{connect_tx, port_rx, take_option};
30
31#[derive(EntityGet, EntityDisplay)]
32struct PortCredit {
33 entity: Rc<Entity>,
34 credit: Resource,
35 rx: RefCell<Option<InPort<Credit>>>,
36}
37
38impl PortCredit {
39 pub fn new(
40 engine: &Engine,
41 clock: &Clock,
42 parent: &Rc<Entity>,
43 name: &str,
44 aka: Option<&Aka>,
45 credit: Resource,
46 ) -> Self {
47 let entity = Rc::new(Entity::new(parent, name));
48 let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
49 Self {
50 entity,
51 credit,
52 rx: RefCell::new(Some(rx)),
53 }
54 }
55
56 pub fn port_rx(&self) -> PortStateResult<Credit> {
57 port_rx!(self.rx, state)
58 }
59
60 pub async fn run(&self) -> SimResult {
61 let rx = take_option!(self.rx);
62 let credit = self.credit.clone();
63
64 loop {
65 let credits = rx.get()?.await;
66 for _ in 0..credits.0 {
67 trace!(self.entity ; "release credit");
68 credit.release().await?;
69 }
70 }
71 }
72}
73
74#[derive(EntityGet, EntityDisplay)]
75pub struct CreditLimiter<T>
76where
77 T: SimObject,
78{
79 entity: Rc<Entity>,
80 spawner: Spawner,
81 credit: Resource,
82
83 tx: RefCell<Option<OutPort<T>>>,
84 credit_rx: RefCell<Option<PortCredit>>,
85 rx: RefCell<Option<InPort<T>>>,
86}
87
88impl<T> CreditLimiter<T>
89where
90 T: SimObject,
91{
92 pub fn new_and_register(
93 engine: &Engine,
94 clock: &Clock,
95 parent: &Rc<Entity>,
96 name: &str,
97 aka: Option<&Aka>,
98 num_credits: usize,
99 ) -> Result<Rc<Self>, SimError> {
100 let spawner = engine.spawner();
101 let entity = Rc::new(Entity::new(parent, name));
102 let credit = Resource::new(num_credits);
103
104 let credit_rx_aka = build_aka!(aka, &entity, &[("credit_rx", "rx")]);
105 let credit_rx: PortCredit = PortCredit::new(
106 engine,
107 clock,
108 &entity,
109 "credit_rx",
110 Some(&credit_rx_aka),
111 credit.clone(),
112 );
113 let tx = OutPort::new_with_renames(&entity, "tx", aka);
114 let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
115
116 let rc_self = Rc::new(Self {
117 entity,
118 credit,
119 tx: RefCell::new(Some(tx)),
120 credit_rx: RefCell::new(Some(credit_rx)),
121 rx: RefCell::new(Some(rx)),
122 spawner,
123 });
124 engine.register(rc_self.clone());
125 Ok(rc_self)
126 }
127
128 pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
129 connect_tx!(self.tx, connect ; port_state)
130 }
131
132 pub fn port_rx(&self) -> PortStateResult<T> {
133 port_rx!(self.rx, state)
134 }
135
136 pub fn port_credit_rx(&self) -> PortStateResult<Credit> {
137 port_rx!(self.credit_rx, port_rx)
138 }
139}
140
141#[async_trait(?Send)]
142impl<T> Runnable for CreditLimiter<T>
143where
144 T: SimObject,
145{
146 async fn run(&self) -> SimResult {
147 let rx = take_option!(self.rx);
148 let tx = take_option!(self.tx);
149 let credit = self.credit.clone();
150
151 spawn_subcomponent!(self.spawner ; self.credit_rx);
152
153 loop {
154 let value = rx.get()?.await;
155
156 credit.request().await;
157 trace!(self.entity ; "consume credit");
158
159 tx.put(value)?.await;
160 }
161 }
162}