gwr_components/arbiter/
mod.rs1use std::cell::RefCell;
12use std::rc::Rc;
13
14use async_trait::async_trait;
15use gwr_engine::engine::Engine;
16use gwr_engine::events::once::Once;
17use gwr_engine::executor::Spawner;
18use gwr_engine::port::{InPort, OutPort, PortStateResult};
19use gwr_engine::traits::{Event, Runnable, SimObject};
20use gwr_engine::types::{SimError, SimResult};
21use gwr_model_builder::EntityDisplay;
22use gwr_track::entity::Entity;
23use gwr_track::{enter, exit, trace};
24
25use crate::{connect_tx, take_option};
26
27pub mod policy;
28
29#[derive(Default)]
30struct ArbiterSharedState<T> {
31 input_values: RefCell<Vec<Option<T>>>,
32 arbiter_event: RefCell<Option<Once<()>>>,
33 waiting_put: Vec<RefCell<Option<Once<()>>>>,
34}
35
36impl<T> ArbiterSharedState<T> {
37 fn new(capacity: usize) -> Self {
38 Self {
39 input_values: RefCell::new((0..capacity).map(|_| None).collect()),
40 arbiter_event: RefCell::new(None),
41 waiting_put: (0..capacity).map(|_| RefCell::new(None)).collect(),
42 }
43 }
44}
45
46pub trait Arbitrate<T>
47where
48 T: SimObject,
49{
50 fn arbitrate(
51 &mut self,
52 entity: &Rc<Entity>,
53 input_values: &mut [Option<T>],
54 ) -> Option<(usize, T)>;
55}
56
57#[derive(EntityDisplay)]
58pub struct Arbiter<T>
59where
60 T: SimObject,
61{
62 pub entity: Rc<Entity>,
63 rx: RefCell<Vec<Option<InPort<T>>>>,
64 tx: RefCell<Option<OutPort<T>>>,
65 policy: RefCell<Option<Box<dyn Arbitrate<T>>>>,
66 shared_state: Rc<ArbiterSharedState<T>>,
67 spawner: Spawner,
68}
69
70impl<T> Arbiter<T>
71where
72 T: SimObject,
73{
74 pub fn new_and_register(
75 engine: &Engine,
76 parent: &Rc<Entity>,
77 name: &str,
78 spawner: Spawner,
79 num_rx: usize,
80 policy: Box<dyn Arbitrate<T>>,
81 ) -> Result<Rc<Self>, SimError> {
82 let entity = Rc::new(Entity::new(parent, name));
83 let shared_state = Rc::new(ArbiterSharedState::new(num_rx));
84 let rx = (0..num_rx)
85 .map(|i| Some(InPort::new(&entity, format!("rx{i}").as_str())))
86 .collect();
87 let tx = OutPort::new(&entity, "tx");
88 let rc_self = Rc::new(Self {
89 entity,
90 rx: RefCell::new(rx),
91 tx: RefCell::new(Some(tx)),
92 policy: RefCell::new(Some(policy)),
93 shared_state,
94 spawner,
95 });
96 engine.register(rc_self.clone());
97 Ok(rc_self)
98 }
99
100 pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
101 connect_tx!(self.tx, connect ; port_state)
102 }
103
104 pub fn port_rx_i(&self, i: usize) -> PortStateResult<T> {
105 self.rx.borrow()[i].as_ref().unwrap().state()
106 }
107}
108
109#[async_trait(?Send)]
110impl<T> Runnable for Arbiter<T>
111where
112 T: SimObject,
113{
114 async fn run(&self) -> SimResult {
115 for (i, mut rx) in self.rx.borrow_mut().drain(..).enumerate() {
117 let entity = self.entity.clone();
118 let rx = rx.take().unwrap();
119 let shared_state = self.shared_state.clone();
120 self.spawner
121 .spawn(async move { run_input(entity, rx, i, shared_state).await });
122 }
123
124 let tx = take_option!(self.tx);
125 let mut policy = take_option!(self.policy);
126
127 loop {
129 let wait_event;
130 loop {
131 let value;
132 let wake_event;
133 {
134 let mut input_values = self.shared_state.input_values.borrow_mut();
135 let t = policy.arbitrate(&self.entity, &mut input_values);
136 match t {
137 Some((i, t)) => {
138 trace!(self.entity ; "grant {}: {}", i, t);
139 wake_event = self.shared_state.waiting_put[i].borrow_mut().take();
140 value = t;
141 }
142 None => {
143 wait_event = Once::default();
144 trace!(self.entity ; "arb wait");
145 *self.shared_state.arbiter_event.borrow_mut() =
146 Some(wait_event.clone());
147 break;
148 }
149 }
150 }
151
152 if let Some(event) = wake_event {
153 event.notify()?;
154 }
155 exit!(self.entity ; value.id());
156 tx.put(value)?.await;
157 }
158 wait_event.listen().await;
159 }
160 }
161}
162
163async fn run_input<T: SimObject>(
164 entity: Rc<Entity>,
165 rx: InPort<T>,
166 input_idx: usize,
167 shared_state: Rc<ArbiterSharedState<T>>,
168) -> SimResult {
169 loop {
170 let value = rx.get()?.await;
171 enter!(entity ; value.id());
172
173 let wait_for_space = match shared_state.input_values.borrow()[input_idx].as_ref() {
175 Some(_) => {
176 let wait_for_space = Once::default();
177 *shared_state.waiting_put[input_idx].borrow_mut() = Some(wait_for_space.clone());
178 Some(wait_for_space)
179 }
180 None => None,
181 };
182 if let Some(wait_event) = wait_for_space {
183 wait_event.listen().await;
184 }
185
186 shared_state.input_values.borrow_mut()[input_idx] = Some(value);
188
189 if let Some(arbiter_event) = shared_state.arbiter_event.borrow_mut().take() {
191 arbiter_event.notify().unwrap();
192 }
193 }
194}