gwr_components/
queue.rs

1// Copyright (c) 2026 Graphcore Ltd. All rights reserved.
2
3//! A generic queue.
4
5use std::cell::RefCell;
6use std::collections::VecDeque;
7use std::fmt;
8use std::rc::Rc;
9
10use gwr_engine::events::repeated::Repeated;
11use gwr_engine::sim_error;
12use gwr_engine::traits::{Event, SimObject};
13use gwr_engine::types::{SimError, SimResult};
14use gwr_model_builder::EntityDisplay;
15use gwr_track::entity::Entity;
16
17/// A generic queue for simulation objects.
18#[derive(EntityDisplay)]
19pub struct Queue<T>
20where
21    T: SimObject,
22{
23    entity: Rc<Entity>,
24    capacity: Option<usize>,
25    data: RefCell<VecDeque<T>>,
26    queue_changed: Repeated<()>,
27}
28
29impl<T> fmt::Debug for Queue<T>
30where
31    T: SimObject,
32{
33    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34        f.debug_struct("Queue")
35            .field("entity", &self.entity)
36            .finish()
37    }
38}
39
40impl<T> Queue<T>
41where
42    T: SimObject,
43{
44    /// Create a new queue.
45    ///
46    /// Returns a [`SimError`] if `capacity` is `Some(0)`.
47    pub fn new(parent: &Rc<Entity>, name: &str, capacity: Option<usize>) -> Result<Self, SimError> {
48        if capacity == Some(0) {
49            return sim_error!("Unsupported Queue with 0 capacity");
50        }
51
52        let entity = Rc::new(Entity::new(parent, name));
53        if let Some(capacity) = capacity {
54            entity.track_capacity(capacity, "objects");
55        }
56
57        Ok(Self {
58            entity,
59            capacity,
60            data: RefCell::new(VecDeque::new()),
61            queue_changed: Repeated::default(),
62        })
63    }
64
65    /// Return the current queue length.
66    #[must_use]
67    pub fn len(&self) -> usize {
68        self.data.borrow().len()
69    }
70
71    /// Return whether the queue is empty.
72    #[must_use]
73    pub fn is_empty(&self) -> bool {
74        self.data.borrow().is_empty()
75    }
76
77    /// Return whether the queue is full.
78    #[must_use]
79    pub fn is_full(&self) -> bool {
80        self.capacity.is_some_and(|capacity| self.len() >= capacity)
81    }
82
83    /// Return a snapshot of the queue contents by copying all values.
84    #[must_use]
85    pub fn values(&self) -> Vec<T> {
86        self.data.borrow().iter().cloned().collect()
87    }
88
89    /// Return an event that fires whenever the queue contents change.
90    #[must_use]
91    pub fn changed_event(&self) -> Repeated<()> {
92        self.queue_changed.clone()
93    }
94
95    /// Push a value into the queue, waiting until there is space if required.
96    pub async fn push(&self, value: T) -> SimResult {
97        let mut value = Some(value);
98        loop {
99            if !self.is_full() {
100                self.push_now(value.take().expect("value should still be present"));
101                return Ok(());
102            }
103
104            self.queue_changed.listen().await;
105        }
106    }
107
108    fn push_now(&self, value: T) {
109        self.entity.track_enter(value.id());
110        self.data.borrow_mut().push_back(value);
111        self.queue_changed.notify();
112    }
113
114    /// Pop the oldest value from the queue.
115    #[must_use]
116    pub fn pop_front(&self) -> Option<T> {
117        let value = self.data.borrow_mut().pop_front();
118        if let Some(ref value) = value {
119            self.entity.track_exit(value.id());
120            self.queue_changed.notify();
121        }
122        value
123    }
124
125    /// Remove the first value matching the predicate.
126    #[must_use]
127    pub fn remove_where<F>(&self, predicate: F) -> Option<T>
128    where
129        F: FnMut(&T) -> bool,
130    {
131        let mut queue = self.data.borrow_mut();
132        let index = queue.iter().position(predicate)?;
133        let value = queue.remove(index)?;
134        drop(queue);
135
136        self.entity.track_exit(value.id());
137        self.queue_changed.notify();
138        Some(value)
139    }
140}