1use std::cell::RefCell;
6use std::collections::VecDeque;
7use std::fmt;
8use std::rc::Rc;
9
10use async_trait::async_trait;
11use gwr_engine::engine::Engine;
12use gwr_engine::events::repeated::Repeated;
13use gwr_engine::executor::Spawner;
14use gwr_engine::port::{InPort, OutPort, PortStateResult};
15use gwr_engine::sim_error;
16use gwr_engine::time::clock::Clock;
17use gwr_engine::traits::{Event, Runnable, SimObject};
18use gwr_engine::types::{SimError, SimResult};
19use gwr_model_builder::{EntityDisplay, EntityGet};
20use gwr_track::entity::Entity;
21use gwr_track::tracker::aka::Aka;
22
23use crate::{connect_tx, port_rx, take_option};
24
25#[derive(EntityGet, EntityDisplay)]
27pub struct QueueCore<T>
28where
29 T: SimObject,
30{
31 entity: Rc<Entity>,
32 capacity: Option<usize>,
33 data: RefCell<VecDeque<T>>,
34 queue_changed: Repeated<()>,
35}
36
37impl<T> fmt::Debug for QueueCore<T>
38where
39 T: SimObject,
40{
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 f.debug_struct("QueueCore")
43 .field("entity", &self.entity)
44 .finish()
45 }
46}
47
48impl<T> QueueCore<T>
49where
50 T: SimObject,
51{
52 pub fn new(parent: &Rc<Entity>, name: &str, capacity: Option<usize>) -> Result<Self, SimError> {
56 if capacity == Some(0) {
57 return sim_error!("Unsupported Queue with 0 capacity");
58 }
59
60 let entity = Rc::new(Entity::new(parent, name));
61 if let Some(capacity) = capacity {
62 entity.track_capacity(capacity, "objects");
63 }
64
65 Ok(Self {
66 entity,
67 capacity,
68 data: RefCell::new(VecDeque::new()),
69 queue_changed: Repeated::default(),
70 })
71 }
72
73 #[must_use]
75 pub fn len(&self) -> usize {
76 self.data.borrow().len()
77 }
78
79 #[must_use]
81 pub fn is_empty(&self) -> bool {
82 self.data.borrow().is_empty()
83 }
84
85 #[must_use]
87 pub fn is_full(&self) -> bool {
88 self.capacity.is_some_and(|capacity| self.len() >= capacity)
89 }
90
91 #[must_use]
93 pub fn values(&self) -> Vec<T> {
94 self.data.borrow().iter().cloned().collect()
95 }
96
97 #[must_use]
99 pub fn changed_event(&self) -> Repeated<()> {
100 self.queue_changed.clone()
101 }
102
103 pub async fn push(&self, value: T) -> SimResult {
105 let mut value = Some(value);
106 loop {
107 if !self.is_full() {
108 self.push_now(value.take().expect("value should still be present"));
109 return Ok(());
110 }
111
112 self.queue_changed.listen().await;
113 }
114 }
115
116 fn push_now(&self, value: T) {
117 self.entity.track_enter(value.id());
118 self.data.borrow_mut().push_back(value);
119 self.queue_changed.notify();
120 }
121
122 #[must_use]
124 pub fn pop_front(&self) -> Option<T> {
125 let value = self.data.borrow_mut().pop_front();
126 if let Some(ref value) = value {
127 self.entity.track_exit(value.id());
128 self.queue_changed.notify();
129 }
130 value
131 }
132
133 #[must_use]
135 pub fn remove_where<F>(&self, predicate: F) -> Option<T>
136 where
137 F: FnMut(&T) -> bool,
138 {
139 let mut queue = self.data.borrow_mut();
140 let index = queue.iter().position(predicate)?;
141 let value = queue.remove(index)?;
142 drop(queue);
143
144 self.entity.track_exit(value.id());
145 self.queue_changed.notify();
146 Some(value)
147 }
148}
149
150#[derive(EntityGet, EntityDisplay)]
152pub struct Queue<T>
153where
154 T: SimObject,
155{
156 entity: Rc<Entity>,
157 spawner: Spawner,
158 queue: Rc<QueueCore<T>>,
159 rx: RefCell<Option<InPort<T>>>,
160 tx: RefCell<Option<OutPort<T>>>,
161}
162
163impl<T> fmt::Debug for Queue<T>
164where
165 T: SimObject,
166{
167 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168 f.debug_struct("Queue")
169 .field("entity", &self.entity)
170 .finish()
171 }
172}
173
174impl<T> Queue<T>
175where
176 T: SimObject,
177{
178 pub fn new_and_register_with_renames(
182 engine: &Engine,
183 clock: &Clock,
184 parent: &Rc<Entity>,
185 name: &str,
186 aka: Option<&Aka>,
187 capacity: Option<usize>,
188 ) -> Result<Rc<Self>, SimError> {
189 let spawner = engine.spawner();
190 let queue = QueueCore::new(parent, name, capacity)?;
191 let entity = queue.entity.clone();
192 let tx = OutPort::new_with_renames(&entity, "tx", aka);
193 let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
194 let rc_self = Rc::new(Self {
195 entity,
196 spawner,
197 queue: Rc::new(queue),
198 rx: RefCell::new(Some(rx)),
199 tx: RefCell::new(Some(tx)),
200 });
201 engine.register(rc_self.clone());
202 Ok(rc_self)
203 }
204
205 pub fn new_and_register(
209 engine: &Engine,
210 clock: &Clock,
211 parent: &Rc<Entity>,
212 name: &str,
213 capacity: Option<usize>,
214 ) -> Result<Rc<Self>, SimError> {
215 Self::new_and_register_with_renames(engine, clock, parent, name, None, capacity)
216 }
217
218 pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
219 connect_tx!(self.tx, connect ; port_state)
220 }
221
222 pub fn port_rx(&self) -> PortStateResult<T> {
223 port_rx!(self.rx, state)
224 }
225
226 #[must_use]
228 pub fn len(&self) -> usize {
229 self.queue.len()
230 }
231
232 #[must_use]
234 pub fn is_empty(&self) -> bool {
235 self.queue.is_empty()
236 }
237
238 #[must_use]
240 pub fn is_full(&self) -> bool {
241 self.queue.is_full()
242 }
243
244 #[must_use]
246 pub fn values(&self) -> Vec<T> {
247 self.queue.values()
248 }
249
250 #[must_use]
252 pub fn changed_event(&self) -> Repeated<()> {
253 self.queue.changed_event()
254 }
255}
256
257#[async_trait(?Send)]
258impl<T> Runnable for Queue<T>
259where
260 T: SimObject,
261{
262 async fn run(&self) -> SimResult {
263 let rx = take_option!(self.rx);
264 let queue = self.queue.clone();
265 self.spawner.spawn(async move { run_rx(rx, queue).await });
266
267 let tx = take_option!(self.tx);
268 let queue = self.queue.clone();
269 self.spawner.spawn(async move { run_tx(tx, queue).await });
270 Ok(())
271 }
272}
273
274async fn run_rx<T>(rx: InPort<T>, queue: Rc<QueueCore<T>>) -> SimResult
275where
276 T: SimObject,
277{
278 let queue_changed = queue.changed_event();
279 loop {
280 if queue.is_full() {
281 queue_changed.listen().await;
282 } else {
283 let value = rx.get()?.await;
284 queue.push(value).await?;
285 }
286 }
287}
288
289async fn run_tx<T>(tx: OutPort<T>, queue: Rc<QueueCore<T>>) -> SimResult
290where
291 T: SimObject,
292{
293 let queue_changed = queue.changed_event();
294 loop {
295 if queue.is_empty() {
296 queue_changed.listen().await;
297 } else {
298 tx.try_put()?.await;
299 if let Some(value) = queue.pop_front() {
300 tx.put(value)?.await;
301 }
302 }
303 }
304}