gwr_components/flow_controls/
credit_limiter.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! Enforce credit limit on an interface between two ports.
4//!
5//! # Ports
6//!
7//! This component has the following ports:
8//!  - Two [input ports](gwr_engine::port::InPort): `rx`, `credit_rx`
9//!  - One [output port](gwr_engine::port::OutPort): `tx`
10
11use std::cell::RefCell;
12use std::rc::Rc;
13
14use async_trait::async_trait;
15use gwr_engine::engine::Engine;
16use gwr_engine::executor::Spawner;
17use gwr_engine::port::{InPort, OutPort, PortStateResult};
18use gwr_engine::spawn_subcomponent;
19use gwr_engine::traits::{Runnable, SimObject};
20use gwr_engine::types::{SimError, SimResult};
21use gwr_model_builder::EntityDisplay;
22use gwr_resources::Resource;
23use gwr_track::entity::Entity;
24use gwr_track::trace;
25
26use crate::types::Credit;
27use crate::{connect_tx, port_rx, take_option};
28
29#[derive(EntityDisplay)]
30struct PortCredit {
31    pub entity: Rc<Entity>,
32    credit: Resource,
33    rx: RefCell<Option<InPort<Credit>>>,
34}
35
36impl PortCredit {
37    pub fn new(parent: &Rc<Entity>, name: &str, credit: Resource) -> Self {
38        let entity = Rc::new(Entity::new(parent, name));
39        let rx = InPort::new(&entity, "rx");
40        Self {
41            entity,
42            credit,
43            rx: RefCell::new(Some(rx)),
44        }
45    }
46
47    pub fn port_rx(&self) -> PortStateResult<Credit> {
48        port_rx!(self.rx, state)
49    }
50
51    pub async fn run(&self) -> SimResult {
52        let rx = take_option!(self.rx);
53        let credit = self.credit.clone();
54
55        loop {
56            let credits = rx.get()?.await;
57            for _ in 0..credits.0 {
58                trace!(self.entity ; "release credit");
59                credit.release().await?;
60            }
61        }
62    }
63}
64
65#[derive(EntityDisplay)]
66pub struct CreditLimiter<T>
67where
68    T: SimObject,
69{
70    pub entity: Rc<Entity>,
71    spawner: Spawner,
72    credit: Resource,
73
74    tx: RefCell<Option<OutPort<T>>>,
75    credit_rx: RefCell<Option<PortCredit>>,
76    rx: RefCell<Option<InPort<T>>>,
77}
78
79impl<T> CreditLimiter<T>
80where
81    T: SimObject,
82{
83    pub fn new_and_register(
84        engine: &Engine,
85        parent: &Rc<Entity>,
86        spawner: Spawner,
87        num_credits: usize,
88    ) -> Result<Rc<Self>, SimError> {
89        let entity = Rc::new(Entity::new(parent, "credit"));
90        let credit = Resource::new(num_credits);
91        let credit_rx = PortCredit::new(&entity, "credit_rx", credit.clone());
92        let tx = OutPort::new(&entity, "tx");
93        let rx = InPort::new(&entity, "rx");
94
95        let rc_self = Rc::new(Self {
96            entity,
97            credit,
98            tx: RefCell::new(Some(tx)),
99            credit_rx: RefCell::new(Some(credit_rx)),
100            rx: RefCell::new(Some(rx)),
101            spawner,
102        });
103        engine.register(rc_self.clone());
104        Ok(rc_self)
105    }
106
107    pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
108        connect_tx!(self.tx, connect ; port_state)
109    }
110
111    pub fn port_rx(&self) -> PortStateResult<T> {
112        port_rx!(self.rx, state)
113    }
114
115    pub fn port_credit_rx(&self) -> PortStateResult<Credit> {
116        port_rx!(self.credit_rx, port_rx)
117    }
118}
119
120#[async_trait(?Send)]
121impl<T> Runnable for CreditLimiter<T>
122where
123    T: SimObject,
124{
125    async fn run(&self) -> SimResult {
126        let rx = take_option!(self.rx);
127        let tx = take_option!(self.tx);
128        let credit = self.credit.clone();
129
130        spawn_subcomponent!(self.spawner ; self.credit_rx);
131
132        loop {
133            let value = rx.get()?.await;
134
135            credit.request().await;
136            trace!(self.entity ; "consume credit");
137
138            tx.put(value)?.await;
139        }
140    }
141}