gwr_components/arbiter/
mod.rs1use std::cell::RefCell;
12use std::rc::Rc;
13
14use async_trait::async_trait;
15use gwr_engine::engine::Engine;
16use gwr_engine::events::once::Once;
17use gwr_engine::executor::Spawner;
18use gwr_engine::port::{InPort, OutPort, PortStateResult};
19use gwr_engine::time::clock::Clock;
20use gwr_engine::traits::{Event, Runnable, SimObject};
21use gwr_engine::types::{SimError, SimResult};
22use gwr_model_builder::{EntityDisplay, EntityGet};
23use gwr_track::entity::Entity;
24use gwr_track::tracker::aka::Aka;
25use gwr_track::{enter, exit, trace};
26
27use crate::{connect_tx, take_option};
28
29pub mod policy;
30
31#[derive(Default)]
32struct ArbiterSharedState<T> {
33 input_values: RefCell<Vec<Option<T>>>,
34 arbiter_event: RefCell<Option<Once<()>>>,
35 waiting_put: Vec<RefCell<Option<Once<()>>>>,
36}
37
38impl<T> ArbiterSharedState<T> {
39 fn new(capacity: usize) -> Self {
40 Self {
41 input_values: RefCell::new((0..capacity).map(|_| None).collect()),
42 arbiter_event: RefCell::new(None),
43 waiting_put: (0..capacity).map(|_| RefCell::new(None)).collect(),
44 }
45 }
46}
47
48pub trait Arbitrate<T>
49where
50 T: SimObject,
51{
52 fn arbitrate(
53 &mut self,
54 entity: &Rc<Entity>,
55 input_values: &mut [Option<T>],
56 ) -> Option<(usize, T)>;
57}
58
59#[derive(EntityGet, EntityDisplay)]
60pub struct Arbiter<T>
61where
62 T: SimObject,
63{
64 entity: Rc<Entity>,
65 rx: RefCell<Vec<Option<InPort<T>>>>,
66 tx: RefCell<Option<OutPort<T>>>,
67 policy: RefCell<Option<Box<dyn Arbitrate<T>>>>,
68 shared_state: Rc<ArbiterSharedState<T>>,
69 spawner: Spawner,
70}
71
72impl<T> Arbiter<T>
73where
74 T: SimObject,
75{
76 pub fn new_and_register_with_renames(
77 engine: &Engine,
78 clock: &Clock,
79 parent: &Rc<Entity>,
80 name: &str,
81 aka: Option<&Aka>,
82 num_rx: usize,
83 policy: Box<dyn Arbitrate<T>>,
84 ) -> Result<Rc<Self>, SimError> {
85 let spawner = engine.spawner();
86 let entity = Rc::new(Entity::new(parent, name));
87 let shared_state = Rc::new(ArbiterSharedState::new(num_rx));
88 let rx = (0..num_rx)
89 .map(|i| {
90 Some(InPort::new_with_renames(
91 engine,
92 clock,
93 &entity,
94 &format!("rx_{i}"),
95 aka,
96 ))
97 })
98 .collect();
99 let tx = OutPort::new_with_renames(&entity, "tx", aka);
100 let rc_self = Rc::new(Self {
101 entity,
102 rx: RefCell::new(rx),
103 tx: RefCell::new(Some(tx)),
104 policy: RefCell::new(Some(policy)),
105 shared_state,
106 spawner,
107 });
108 engine.register(rc_self.clone());
109 Ok(rc_self)
110 }
111
112 pub fn new_and_register(
113 engine: &Engine,
114 clock: &Clock,
115 parent: &Rc<Entity>,
116 name: &str,
117 num_rx: usize,
118 policy: Box<dyn Arbitrate<T>>,
119 ) -> Result<Rc<Self>, SimError> {
120 Self::new_and_register_with_renames(engine, clock, parent, name, None, num_rx, policy)
121 }
122
123 pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
124 connect_tx!(self.tx, connect ; port_state)
125 }
126
127 pub fn port_rx_i(&self, i: usize) -> PortStateResult<T> {
128 self.rx.borrow()[i].as_ref().unwrap().state()
129 }
130}
131
132#[async_trait(?Send)]
133impl<T> Runnable for Arbiter<T>
134where
135 T: SimObject,
136{
137 async fn run(&self) -> SimResult {
138 for (i, mut rx) in self.rx.borrow_mut().drain(..).enumerate() {
140 let entity = self.entity.clone();
141 let rx = rx.take().unwrap();
142 let shared_state = self.shared_state.clone();
143 self.spawner
144 .spawn(async move { run_input(entity, rx, i, shared_state).await });
145 }
146
147 let tx = take_option!(self.tx);
148 let mut policy = take_option!(self.policy);
149
150 loop {
152 let wait_event;
153 loop {
154 let value;
155 let wake_event;
156 {
157 let mut input_values = self.shared_state.input_values.borrow_mut();
158 let t = policy.arbitrate(&self.entity, &mut input_values);
159 match t {
160 Some((i, t)) => {
161 trace!(self.entity ; "grant {}: {}", i, t);
162 wake_event = self.shared_state.waiting_put[i].borrow_mut().take();
163 value = t;
164 }
165 None => {
166 wait_event = Once::default();
167 trace!(self.entity ; "arb wait");
168 *self.shared_state.arbiter_event.borrow_mut() =
169 Some(wait_event.clone());
170 break;
171 }
172 }
173 }
174
175 if let Some(event) = wake_event {
176 event.notify()?;
177 }
178 exit!(self.entity ; value.id());
179 tx.put(value)?.await;
180 }
181 wait_event.listen().await;
182 }
183 }
184}
185
186async fn run_input<T: SimObject>(
187 entity: Rc<Entity>,
188 rx: InPort<T>,
189 input_idx: usize,
190 shared_state: Rc<ArbiterSharedState<T>>,
191) -> SimResult {
192 loop {
193 let value = rx.get()?.await;
194 enter!(entity ; value.id());
195
196 let wait_for_space = match shared_state.input_values.borrow()[input_idx].as_ref() {
198 Some(_) => {
199 let wait_for_space = Once::default();
200 *shared_state.waiting_put[input_idx].borrow_mut() = Some(wait_for_space.clone());
201 Some(wait_for_space)
202 }
203 None => None,
204 };
205 if let Some(wait_event) = wait_for_space {
206 wait_event.listen().await;
207 }
208
209 shared_state.input_values.borrow_mut()[input_idx] = Some(value);
211
212 if let Some(arbiter_event) = shared_state.arbiter_event.borrow_mut().take() {
214 arbiter_event.notify().unwrap();
215 }
216 }
217}