gwr_components/
queue.rs

1// Copyright (c) 2026 Graphcore Ltd. All rights reserved.
2
3//! Generic queues.
4
5use std::cell::RefCell;
6use std::collections::VecDeque;
7use std::fmt;
8use std::rc::Rc;
9
10use async_trait::async_trait;
11use gwr_engine::engine::Engine;
12use gwr_engine::events::repeated::Repeated;
13use gwr_engine::executor::Spawner;
14use gwr_engine::port::{InPort, OutPort, PortStateResult};
15use gwr_engine::sim_error;
16use gwr_engine::time::clock::Clock;
17use gwr_engine::traits::{Event, Runnable, SimObject};
18use gwr_engine::types::{SimError, SimResult};
19use gwr_model_builder::{EntityDisplay, EntityGet};
20use gwr_track::entity::Entity;
21use gwr_track::tracker::aka::Aka;
22
23use crate::{connect_tx, port_rx, take_option};
24
25/// A generic queue for simulation objects, without ports or runnable behavior.
26#[derive(EntityGet, EntityDisplay)]
27pub struct QueueCore<T>
28where
29    T: SimObject,
30{
31    entity: Rc<Entity>,
32    capacity: Option<usize>,
33    data: RefCell<VecDeque<T>>,
34    queue_changed: Repeated<()>,
35}
36
37impl<T> fmt::Debug for QueueCore<T>
38where
39    T: SimObject,
40{
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        f.debug_struct("QueueCore")
43            .field("entity", &self.entity)
44            .finish()
45    }
46}
47
48impl<T> QueueCore<T>
49where
50    T: SimObject,
51{
52    /// Create a new queue.
53    ///
54    /// Returns a [`SimError`] if `capacity` is `Some(0)`.
55    pub fn new(parent: &Rc<Entity>, name: &str, capacity: Option<usize>) -> Result<Self, SimError> {
56        if capacity == Some(0) {
57            return sim_error!("Unsupported Queue with 0 capacity");
58        }
59
60        let entity = Rc::new(Entity::new(parent, name));
61        if let Some(capacity) = capacity {
62            entity.track_capacity(capacity, "objects");
63        }
64
65        Ok(Self {
66            entity,
67            capacity,
68            data: RefCell::new(VecDeque::new()),
69            queue_changed: Repeated::default(),
70        })
71    }
72
73    /// Return the current queue length.
74    #[must_use]
75    pub fn len(&self) -> usize {
76        self.data.borrow().len()
77    }
78
79    /// Return whether the queue is empty.
80    #[must_use]
81    pub fn is_empty(&self) -> bool {
82        self.data.borrow().is_empty()
83    }
84
85    /// Return whether the queue is full.
86    #[must_use]
87    pub fn is_full(&self) -> bool {
88        self.capacity.is_some_and(|capacity| self.len() >= capacity)
89    }
90
91    /// Return a snapshot of the queue contents by copying all values.
92    #[must_use]
93    pub fn values(&self) -> Vec<T> {
94        self.data.borrow().iter().cloned().collect()
95    }
96
97    /// Return an event that fires whenever the queue contents change.
98    #[must_use]
99    pub fn changed_event(&self) -> Repeated<()> {
100        self.queue_changed.clone()
101    }
102
103    /// Push a value into the queue, waiting until there is space if required.
104    pub async fn push(&self, value: T) -> SimResult {
105        let mut value = Some(value);
106        loop {
107            if !self.is_full() {
108                self.push_now(value.take().expect("value should still be present"));
109                return Ok(());
110            }
111
112            self.queue_changed.listen().await;
113        }
114    }
115
116    fn push_now(&self, value: T) {
117        self.entity.track_enter(value.id());
118        self.data.borrow_mut().push_back(value);
119        self.queue_changed.notify();
120    }
121
122    /// Pop the oldest value from the queue.
123    #[must_use]
124    pub fn pop_front(&self) -> Option<T> {
125        let value = self.data.borrow_mut().pop_front();
126        if let Some(ref value) = value {
127            self.entity.track_exit(value.id());
128            self.queue_changed.notify();
129        }
130        value
131    }
132
133    /// Remove the first value matching the predicate.
134    #[must_use]
135    pub fn remove_where<F>(&self, predicate: F) -> Option<T>
136    where
137        F: FnMut(&T) -> bool,
138    {
139        let mut queue = self.data.borrow_mut();
140        let index = queue.iter().position(predicate)?;
141        let value = queue.remove(index)?;
142        drop(queue);
143
144        self.entity.track_exit(value.id());
145        self.queue_changed.notify();
146        Some(value)
147    }
148}
149
150/// A generic queue component with `rx` and `tx` ports.
151#[derive(EntityGet, EntityDisplay)]
152pub struct Queue<T>
153where
154    T: SimObject,
155{
156    entity: Rc<Entity>,
157    spawner: Spawner,
158    queue: Rc<QueueCore<T>>,
159    rx: RefCell<Option<InPort<T>>>,
160    tx: RefCell<Option<OutPort<T>>>,
161}
162
163impl<T> fmt::Debug for Queue<T>
164where
165    T: SimObject,
166{
167    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
168        f.debug_struct("Queue")
169            .field("entity", &self.entity)
170            .finish()
171    }
172}
173
174impl<T> Queue<T>
175where
176    T: SimObject,
177{
178    /// Create and register a new queue component.
179    ///
180    /// Returns a [`SimError`] if `capacity` is `Some(0)`.
181    pub fn new_and_register_with_renames(
182        engine: &Engine,
183        clock: &Clock,
184        parent: &Rc<Entity>,
185        name: &str,
186        aka: Option<&Aka>,
187        capacity: Option<usize>,
188    ) -> Result<Rc<Self>, SimError> {
189        let spawner = engine.spawner();
190        let queue = QueueCore::new(parent, name, capacity)?;
191        let entity = queue.entity.clone();
192        let tx = OutPort::new_with_renames(&entity, "tx", aka);
193        let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
194        let rc_self = Rc::new(Self {
195            entity,
196            spawner,
197            queue: Rc::new(queue),
198            rx: RefCell::new(Some(rx)),
199            tx: RefCell::new(Some(tx)),
200        });
201        engine.register(rc_self.clone());
202        Ok(rc_self)
203    }
204
205    /// Create and register a new queue component.
206    ///
207    /// Returns a [`SimError`] if `capacity` is `Some(0)`.
208    pub fn new_and_register(
209        engine: &Engine,
210        clock: &Clock,
211        parent: &Rc<Entity>,
212        name: &str,
213        capacity: Option<usize>,
214    ) -> Result<Rc<Self>, SimError> {
215        Self::new_and_register_with_renames(engine, clock, parent, name, None, capacity)
216    }
217
218    pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
219        connect_tx!(self.tx, connect ; port_state)
220    }
221
222    pub fn port_rx(&self) -> PortStateResult<T> {
223        port_rx!(self.rx, state)
224    }
225
226    /// Return the current queue length.
227    #[must_use]
228    pub fn len(&self) -> usize {
229        self.queue.len()
230    }
231
232    /// Return whether the queue is empty.
233    #[must_use]
234    pub fn is_empty(&self) -> bool {
235        self.queue.is_empty()
236    }
237
238    /// Return whether the queue is full.
239    #[must_use]
240    pub fn is_full(&self) -> bool {
241        self.queue.is_full()
242    }
243
244    /// Return a snapshot of the queue contents by copying all values.
245    #[must_use]
246    pub fn values(&self) -> Vec<T> {
247        self.queue.values()
248    }
249
250    /// Return an event that fires whenever the queue contents change.
251    #[must_use]
252    pub fn changed_event(&self) -> Repeated<()> {
253        self.queue.changed_event()
254    }
255}
256
257#[async_trait(?Send)]
258impl<T> Runnable for Queue<T>
259where
260    T: SimObject,
261{
262    async fn run(&self) -> SimResult {
263        let rx = take_option!(self.rx);
264        let queue = self.queue.clone();
265        self.spawner.spawn(async move { run_rx(rx, queue).await });
266
267        let tx = take_option!(self.tx);
268        let queue = self.queue.clone();
269        self.spawner.spawn(async move { run_tx(tx, queue).await });
270        Ok(())
271    }
272}
273
274async fn run_rx<T>(rx: InPort<T>, queue: Rc<QueueCore<T>>) -> SimResult
275where
276    T: SimObject,
277{
278    let queue_changed = queue.changed_event();
279    loop {
280        if queue.is_full() {
281            queue_changed.listen().await;
282        } else {
283            let value = rx.get()?.await;
284            queue.push(value).await?;
285        }
286    }
287}
288
289async fn run_tx<T>(tx: OutPort<T>, queue: Rc<QueueCore<T>>) -> SimResult
290where
291    T: SimObject,
292{
293    let queue_changed = queue.changed_event();
294    loop {
295        if queue.is_empty() {
296            queue_changed.listen().await;
297        } else {
298            tx.try_put()?.await;
299            if let Some(value) = queue.pop_front() {
300                tx.put(value)?.await;
301            }
302        }
303    }
304}