gwr_components/flow_controls/
credit_limiter.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! Enforce credit limit on an interface between two ports.
4//!
5//! # Ports
6//!
7//! This component has the following ports:
8//!  - Two [input ports](gwr_engine::port::InPort): `rx`, `credit_rx`
9//!  - One [output port](gwr_engine::port::OutPort): `tx`
10
11use std::cell::RefCell;
12use std::rc::Rc;
13
14use async_trait::async_trait;
15use gwr_engine::engine::Engine;
16use gwr_engine::executor::Spawner;
17use gwr_engine::port::{InPort, OutPort, PortStateResult};
18use gwr_engine::spawn_subcomponent;
19use gwr_engine::time::clock::Clock;
20use gwr_engine::traits::{Runnable, SimObject};
21use gwr_engine::types::{SimError, SimResult};
22use gwr_model_builder::{EntityDisplay, EntityGet};
23use gwr_resources::Resource;
24use gwr_track::entity::Entity;
25use gwr_track::tracker::aka::Aka;
26use gwr_track::{build_aka, trace};
27
28use crate::types::Credit;
29use crate::{connect_tx, port_rx, take_option};
30
31#[derive(EntityGet, EntityDisplay)]
32struct PortCredit {
33    entity: Rc<Entity>,
34    credit: Resource,
35    rx: RefCell<Option<InPort<Credit>>>,
36}
37
38impl PortCredit {
39    pub fn new(
40        engine: &Engine,
41        clock: &Clock,
42        parent: &Rc<Entity>,
43        name: &str,
44        aka: Option<&Aka>,
45        credit: Resource,
46    ) -> Self {
47        let entity = Rc::new(Entity::new(parent, name));
48        let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
49        Self {
50            entity,
51            credit,
52            rx: RefCell::new(Some(rx)),
53        }
54    }
55
56    pub fn port_rx(&self) -> PortStateResult<Credit> {
57        port_rx!(self.rx, state)
58    }
59
60    pub async fn run(&self) -> SimResult {
61        let rx = take_option!(self.rx);
62        let credit = self.credit.clone();
63
64        loop {
65            let credits = rx.get()?.await;
66            for _ in 0..credits.0 {
67                trace!(self.entity ; "release credit");
68                credit.release().await?;
69            }
70        }
71    }
72}
73
74#[derive(EntityGet, EntityDisplay)]
75pub struct CreditLimiter<T>
76where
77    T: SimObject,
78{
79    entity: Rc<Entity>,
80    spawner: Spawner,
81    credit: Resource,
82
83    tx: RefCell<Option<OutPort<T>>>,
84    credit_rx: RefCell<Option<PortCredit>>,
85    rx: RefCell<Option<InPort<T>>>,
86}
87
88impl<T> CreditLimiter<T>
89where
90    T: SimObject,
91{
92    pub fn new_and_register(
93        engine: &Engine,
94        clock: &Clock,
95        parent: &Rc<Entity>,
96        name: &str,
97        aka: Option<&Aka>,
98        num_credits: usize,
99    ) -> Result<Rc<Self>, SimError> {
100        let spawner = engine.spawner();
101        let entity = Rc::new(Entity::new(parent, name));
102        let credit = Resource::new(num_credits);
103
104        let credit_rx_aka = build_aka!(aka, &entity, &[("credit_rx", "rx")]);
105        let credit_rx: PortCredit = PortCredit::new(
106            engine,
107            clock,
108            &entity,
109            "credit_rx",
110            Some(&credit_rx_aka),
111            credit.clone(),
112        );
113        let tx = OutPort::new_with_renames(&entity, "tx", aka);
114        let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
115
116        let rc_self = Rc::new(Self {
117            entity,
118            credit,
119            tx: RefCell::new(Some(tx)),
120            credit_rx: RefCell::new(Some(credit_rx)),
121            rx: RefCell::new(Some(rx)),
122            spawner,
123        });
124        engine.register(rc_self.clone());
125        Ok(rc_self)
126    }
127
128    pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
129        connect_tx!(self.tx, connect ; port_state)
130    }
131
132    pub fn port_rx(&self) -> PortStateResult<T> {
133        port_rx!(self.rx, state)
134    }
135
136    pub fn port_credit_rx(&self) -> PortStateResult<Credit> {
137        port_rx!(self.credit_rx, port_rx)
138    }
139}
140
141#[async_trait(?Send)]
142impl<T> Runnable for CreditLimiter<T>
143where
144    T: SimObject,
145{
146    async fn run(&self) -> SimResult {
147        let rx = take_option!(self.rx);
148        let tx = take_option!(self.tx);
149        let credit = self.credit.clone();
150
151        spawn_subcomponent!(self.spawner ; self.credit_rx);
152
153        loop {
154            let value = rx.get()?.await;
155
156            credit.request().await;
157            trace!(self.entity ; "consume credit");
158
159            tx.put(value)?.await;
160        }
161    }
162}