gwr_components/arbiter/
mod.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! Perform arbitration between a number of interfaces.
4//!
5//! # Ports
6//!
7//! This component has the following ports:
8//!  - N [input ports](gwr_engine::port::InPort): `rx[i]` for `i in [0, N-1]`
9//!  - One [output port](gwr_engine::port::OutPort): `tx`
10
11use std::cell::RefCell;
12use std::rc::Rc;
13
14use async_trait::async_trait;
15use gwr_engine::engine::Engine;
16use gwr_engine::events::once::Once;
17use gwr_engine::executor::Spawner;
18use gwr_engine::port::{InPort, OutPort, PortStateResult};
19use gwr_engine::traits::{Event, Runnable, SimObject};
20use gwr_engine::types::{SimError, SimResult};
21use gwr_model_builder::EntityDisplay;
22use gwr_track::entity::Entity;
23use gwr_track::{enter, exit, trace};
24
25use crate::{connect_tx, take_option};
26
27pub mod policy;
28
29#[derive(Default)]
30struct ArbiterSharedState<T> {
31    input_values: RefCell<Vec<Option<T>>>,
32    arbiter_event: RefCell<Option<Once<()>>>,
33    waiting_put: Vec<RefCell<Option<Once<()>>>>,
34}
35
36impl<T> ArbiterSharedState<T> {
37    fn new(capacity: usize) -> Self {
38        Self {
39            input_values: RefCell::new((0..capacity).map(|_| None).collect()),
40            arbiter_event: RefCell::new(None),
41            waiting_put: (0..capacity).map(|_| RefCell::new(None)).collect(),
42        }
43    }
44}
45
46pub trait Arbitrate<T>
47where
48    T: SimObject,
49{
50    fn arbitrate(
51        &mut self,
52        entity: &Rc<Entity>,
53        input_values: &mut [Option<T>],
54    ) -> Option<(usize, T)>;
55}
56
57#[derive(EntityDisplay)]
58pub struct Arbiter<T>
59where
60    T: SimObject,
61{
62    pub entity: Rc<Entity>,
63    rx: RefCell<Vec<Option<InPort<T>>>>,
64    tx: RefCell<Option<OutPort<T>>>,
65    policy: RefCell<Option<Box<dyn Arbitrate<T>>>>,
66    shared_state: Rc<ArbiterSharedState<T>>,
67    spawner: Spawner,
68}
69
70impl<T> Arbiter<T>
71where
72    T: SimObject,
73{
74    pub fn new_and_register(
75        engine: &Engine,
76        parent: &Rc<Entity>,
77        name: &str,
78        spawner: Spawner,
79        num_rx: usize,
80        policy: Box<dyn Arbitrate<T>>,
81    ) -> Result<Rc<Self>, SimError> {
82        let entity = Rc::new(Entity::new(parent, name));
83        let shared_state = Rc::new(ArbiterSharedState::new(num_rx));
84        let rx = (0..num_rx)
85            .map(|i| Some(InPort::new(&entity, format!("rx{i}").as_str())))
86            .collect();
87        let tx = OutPort::new(&entity, "tx");
88        let rc_self = Rc::new(Self {
89            entity,
90            rx: RefCell::new(rx),
91            tx: RefCell::new(Some(tx)),
92            policy: RefCell::new(Some(policy)),
93            shared_state,
94            spawner,
95        });
96        engine.register(rc_self.clone());
97        Ok(rc_self)
98    }
99
100    pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
101        connect_tx!(self.tx, connect ; port_state)
102    }
103
104    pub fn port_rx_i(&self, i: usize) -> PortStateResult<T> {
105        self.rx.borrow()[i].as_ref().unwrap().state()
106    }
107}
108
109#[async_trait(?Send)]
110impl<T> Runnable for Arbiter<T>
111where
112    T: SimObject,
113{
114    async fn run(&self) -> SimResult {
115        // Start running the handlers for each input
116        for (i, mut rx) in self.rx.borrow_mut().drain(..).enumerate() {
117            let entity = self.entity.clone();
118            let rx = rx.take().unwrap();
119            let shared_state = self.shared_state.clone();
120            self.spawner
121                .spawn(async move { run_input(entity, rx, i, shared_state).await });
122        }
123
124        let tx = take_option!(self.tx);
125        let mut policy = take_option!(self.policy);
126
127        // Drive the output
128        loop {
129            let wait_event;
130            loop {
131                let value;
132                let wake_event;
133                {
134                    let mut input_values = self.shared_state.input_values.borrow_mut();
135                    let t = policy.arbitrate(&self.entity, &mut input_values);
136                    match t {
137                        Some((i, t)) => {
138                            trace!(self.entity ; "grant {}: {}", i, t);
139                            wake_event = self.shared_state.waiting_put[i].borrow_mut().take();
140                            value = t;
141                        }
142                        None => {
143                            wait_event = Once::default();
144                            trace!(self.entity ; "arb wait");
145                            *self.shared_state.arbiter_event.borrow_mut() =
146                                Some(wait_event.clone());
147                            break;
148                        }
149                    }
150                }
151
152                if let Some(event) = wake_event {
153                    event.notify()?;
154                }
155                exit!(self.entity ; value.id());
156                tx.put(value)?.await;
157            }
158            wait_event.listen().await;
159        }
160    }
161}
162
163async fn run_input<T: SimObject>(
164    entity: Rc<Entity>,
165    rx: InPort<T>,
166    input_idx: usize,
167    shared_state: Rc<ArbiterSharedState<T>>,
168) -> SimResult {
169    loop {
170        let value = rx.get()?.await;
171        enter!(entity ; value.id());
172
173        // Check if this input needs to wait for the previous value to be handled
174        let wait_for_space = match shared_state.input_values.borrow()[input_idx].as_ref() {
175            Some(_) => {
176                let wait_for_space = Once::default();
177                *shared_state.waiting_put[input_idx].borrow_mut() = Some(wait_for_space.clone());
178                Some(wait_for_space)
179            }
180            None => None,
181        };
182        if let Some(wait_event) = wait_for_space {
183            wait_event.listen().await;
184        }
185
186        // Set the value for this input
187        shared_state.input_values.borrow_mut()[input_idx] = Some(value);
188
189        // Wake up the arbiter if it has paused on an event
190        if let Some(arbiter_event) = shared_state.arbiter_event.borrow_mut().take() {
191            arbiter_event.notify().unwrap();
192        }
193    }
194}