gwr_components/arbiter/
mod.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! Perform arbitration between a number of interfaces.
4//!
5//! # Ports
6//!
7//! This component has the following ports:
8//!  - N [input ports](gwr_engine::port::InPort): `rx[i]` for `i in [0, N-1]`
9//!  - One [output port](gwr_engine::port::OutPort): `tx`
10
11use std::cell::RefCell;
12use std::rc::Rc;
13
14use async_trait::async_trait;
15use gwr_engine::engine::Engine;
16use gwr_engine::events::once::Once;
17use gwr_engine::executor::Spawner;
18use gwr_engine::port::{InPort, OutPort, PortStateResult};
19use gwr_engine::time::clock::Clock;
20use gwr_engine::traits::{Event, Runnable, SimObject};
21use gwr_engine::types::{SimError, SimResult};
22use gwr_model_builder::{EntityDisplay, EntityGet};
23use gwr_track::entity::Entity;
24use gwr_track::tracker::aka::Aka;
25use gwr_track::{enter, exit, trace};
26
27use crate::{connect_tx, take_option};
28
29pub mod policy;
30
31#[derive(Default)]
32struct ArbiterSharedState<T> {
33    input_values: RefCell<Vec<Option<T>>>,
34    arbiter_event: RefCell<Option<Once<()>>>,
35    waiting_put: Vec<RefCell<Option<Once<()>>>>,
36}
37
38impl<T> ArbiterSharedState<T> {
39    fn new(capacity: usize) -> Self {
40        Self {
41            input_values: RefCell::new((0..capacity).map(|_| None).collect()),
42            arbiter_event: RefCell::new(None),
43            waiting_put: (0..capacity).map(|_| RefCell::new(None)).collect(),
44        }
45    }
46}
47
48pub trait Arbitrate<T>
49where
50    T: SimObject,
51{
52    fn arbitrate(
53        &mut self,
54        entity: &Rc<Entity>,
55        input_values: &mut [Option<T>],
56    ) -> Option<(usize, T)>;
57}
58
59#[derive(EntityGet, EntityDisplay)]
60pub struct Arbiter<T>
61where
62    T: SimObject,
63{
64    entity: Rc<Entity>,
65    rx: RefCell<Vec<Option<InPort<T>>>>,
66    tx: RefCell<Option<OutPort<T>>>,
67    policy: RefCell<Option<Box<dyn Arbitrate<T>>>>,
68    shared_state: Rc<ArbiterSharedState<T>>,
69    spawner: Spawner,
70}
71
72impl<T> Arbiter<T>
73where
74    T: SimObject,
75{
76    pub fn new_and_register_with_renames(
77        engine: &Engine,
78        clock: &Clock,
79        parent: &Rc<Entity>,
80        name: &str,
81        aka: Option<&Aka>,
82        num_rx: usize,
83        policy: Box<dyn Arbitrate<T>>,
84    ) -> Result<Rc<Self>, SimError> {
85        let spawner = engine.spawner();
86        let entity = Rc::new(Entity::new(parent, name));
87        let shared_state = Rc::new(ArbiterSharedState::new(num_rx));
88        let rx = (0..num_rx)
89            .map(|i| {
90                Some(InPort::new_with_renames(
91                    engine,
92                    clock,
93                    &entity,
94                    &format!("rx_{i}"),
95                    aka,
96                ))
97            })
98            .collect();
99        let tx = OutPort::new_with_renames(&entity, "tx", aka);
100        let rc_self = Rc::new(Self {
101            entity,
102            rx: RefCell::new(rx),
103            tx: RefCell::new(Some(tx)),
104            policy: RefCell::new(Some(policy)),
105            shared_state,
106            spawner,
107        });
108        engine.register(rc_self.clone());
109        Ok(rc_self)
110    }
111
112    pub fn new_and_register(
113        engine: &Engine,
114        clock: &Clock,
115        parent: &Rc<Entity>,
116        name: &str,
117        num_rx: usize,
118        policy: Box<dyn Arbitrate<T>>,
119    ) -> Result<Rc<Self>, SimError> {
120        Self::new_and_register_with_renames(engine, clock, parent, name, None, num_rx, policy)
121    }
122
123    pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
124        connect_tx!(self.tx, connect ; port_state)
125    }
126
127    pub fn port_rx_i(&self, i: usize) -> PortStateResult<T> {
128        self.rx.borrow()[i].as_ref().unwrap().state()
129    }
130}
131
132#[async_trait(?Send)]
133impl<T> Runnable for Arbiter<T>
134where
135    T: SimObject,
136{
137    async fn run(&self) -> SimResult {
138        // Start running the handlers for each input
139        for (i, mut rx) in self.rx.borrow_mut().drain(..).enumerate() {
140            let entity = self.entity.clone();
141            let rx = rx.take().unwrap();
142            let shared_state = self.shared_state.clone();
143            self.spawner
144                .spawn(async move { run_input(entity, rx, i, shared_state).await });
145        }
146
147        let tx = take_option!(self.tx);
148        let mut policy = take_option!(self.policy);
149
150        // Drive the output
151        loop {
152            let wait_event;
153            loop {
154                let value;
155                let wake_event;
156                {
157                    let mut input_values = self.shared_state.input_values.borrow_mut();
158                    let t = policy.arbitrate(&self.entity, &mut input_values);
159                    match t {
160                        Some((i, t)) => {
161                            trace!(self.entity ; "grant {}: {}", i, t);
162                            wake_event = self.shared_state.waiting_put[i].borrow_mut().take();
163                            value = t;
164                        }
165                        None => {
166                            wait_event = Once::default();
167                            trace!(self.entity ; "arb wait");
168                            *self.shared_state.arbiter_event.borrow_mut() =
169                                Some(wait_event.clone());
170                            break;
171                        }
172                    }
173                }
174
175                if let Some(event) = wake_event {
176                    event.notify()?;
177                }
178                exit!(self.entity ; value.id());
179                tx.put(value)?.await;
180            }
181            wait_event.listen().await;
182        }
183    }
184}
185
186async fn run_input<T: SimObject>(
187    entity: Rc<Entity>,
188    rx: InPort<T>,
189    input_idx: usize,
190    shared_state: Rc<ArbiterSharedState<T>>,
191) -> SimResult {
192    loop {
193        let value = rx.get()?.await;
194        enter!(entity ; value.id());
195
196        // Check if this input needs to wait for the previous value to be handled
197        let wait_for_space = match shared_state.input_values.borrow()[input_idx].as_ref() {
198            Some(_) => {
199                let wait_for_space = Once::default();
200                *shared_state.waiting_put[input_idx].borrow_mut() = Some(wait_for_space.clone());
201                Some(wait_for_space)
202            }
203            None => None,
204        };
205        if let Some(wait_event) = wait_for_space {
206            wait_event.listen().await;
207        }
208
209        // Set the value for this input
210        shared_state.input_values.borrow_mut()[input_idx] = Some(value);
211
212        // Wake up the arbiter if it has paused on an event
213        if let Some(arbiter_event) = shared_state.arbiter_event.borrow_mut().take() {
214            arbiter_event.notify().unwrap();
215        }
216    }
217}