gwr_components/flow_controls/
credit_limiter.rs1use std::cell::RefCell;
12use std::rc::Rc;
13
14use async_trait::async_trait;
15use gwr_engine::engine::Engine;
16use gwr_engine::executor::Spawner;
17use gwr_engine::port::{InPort, OutPort, PortStateResult};
18use gwr_engine::spawn_subcomponent;
19use gwr_engine::traits::{Runnable, SimObject};
20use gwr_engine::types::{SimError, SimResult};
21use gwr_model_builder::EntityDisplay;
22use gwr_resources::Resource;
23use gwr_track::entity::Entity;
24use gwr_track::trace;
25
26use crate::types::Credit;
27use crate::{connect_tx, port_rx, take_option};
28
29#[derive(EntityDisplay)]
30struct PortCredit {
31 pub entity: Rc<Entity>,
32 credit: Resource,
33 rx: RefCell<Option<InPort<Credit>>>,
34}
35
36impl PortCredit {
37 pub fn new(parent: &Rc<Entity>, name: &str, credit: Resource) -> Self {
38 let entity = Rc::new(Entity::new(parent, name));
39 let rx = InPort::new(&entity, "rx");
40 Self {
41 entity,
42 credit,
43 rx: RefCell::new(Some(rx)),
44 }
45 }
46
47 pub fn port_rx(&self) -> PortStateResult<Credit> {
48 port_rx!(self.rx, state)
49 }
50
51 pub async fn run(&self) -> SimResult {
52 let rx = take_option!(self.rx);
53 let credit = self.credit.clone();
54
55 loop {
56 let credits = rx.get()?.await;
57 for _ in 0..credits.0 {
58 trace!(self.entity ; "release credit");
59 credit.release().await?;
60 }
61 }
62 }
63}
64
65#[derive(EntityDisplay)]
66pub struct CreditLimiter<T>
67where
68 T: SimObject,
69{
70 pub entity: Rc<Entity>,
71 spawner: Spawner,
72 credit: Resource,
73
74 tx: RefCell<Option<OutPort<T>>>,
75 credit_rx: RefCell<Option<PortCredit>>,
76 rx: RefCell<Option<InPort<T>>>,
77}
78
79impl<T> CreditLimiter<T>
80where
81 T: SimObject,
82{
83 pub fn new_and_register(
84 engine: &Engine,
85 parent: &Rc<Entity>,
86 spawner: Spawner,
87 num_credits: usize,
88 ) -> Result<Rc<Self>, SimError> {
89 let entity = Rc::new(Entity::new(parent, "credit"));
90 let credit = Resource::new(num_credits);
91 let credit_rx = PortCredit::new(&entity, "credit_rx", credit.clone());
92 let tx = OutPort::new(&entity, "tx");
93 let rx = InPort::new(&entity, "rx");
94
95 let rc_self = Rc::new(Self {
96 entity,
97 credit,
98 tx: RefCell::new(Some(tx)),
99 credit_rx: RefCell::new(Some(credit_rx)),
100 rx: RefCell::new(Some(rx)),
101 spawner,
102 });
103 engine.register(rc_self.clone());
104 Ok(rc_self)
105 }
106
107 pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
108 connect_tx!(self.tx, connect ; port_state)
109 }
110
111 pub fn port_rx(&self) -> PortStateResult<T> {
112 port_rx!(self.rx, state)
113 }
114
115 pub fn port_credit_rx(&self) -> PortStateResult<Credit> {
116 port_rx!(self.credit_rx, port_rx)
117 }
118}
119
120#[async_trait(?Send)]
121impl<T> Runnable for CreditLimiter<T>
122where
123 T: SimObject,
124{
125 async fn run(&self) -> SimResult {
126 let rx = take_option!(self.rx);
127 let tx = take_option!(self.tx);
128 let credit = self.credit.clone();
129
130 spawn_subcomponent!(self.spawner ; self.credit_rx);
131
132 loop {
133 let value = rx.get()?.await;
134
135 credit.request().await;
136 trace!(self.entity ; "consume credit");
137
138 tx.put(value)?.await;
139 }
140 }
141}