gwr_components/
queue.rs

1// Copyright (c) 2026 Graphcore Ltd. All rights reserved.
2
3//! A generic queue.
4
5use std::cell::RefCell;
6use std::collections::VecDeque;
7use std::fmt;
8use std::rc::Rc;
9
10use gwr_engine::events::repeated::Repeated;
11use gwr_engine::sim_error;
12use gwr_engine::traits::{Event, SimObject};
13use gwr_engine::types::{SimError, SimResult};
14use gwr_model_builder::EntityDisplay;
15use gwr_track::entity::Entity;
16use gwr_track::{enter, exit};
17
18/// A generic queue for simulation objects.
19#[derive(EntityDisplay)]
20pub struct Queue<T>
21where
22    T: SimObject,
23{
24    entity: Rc<Entity>,
25    capacity: Option<usize>,
26    data: RefCell<VecDeque<T>>,
27    queue_changed: Repeated<()>,
28}
29
30impl<T> fmt::Debug for Queue<T>
31where
32    T: SimObject,
33{
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        f.debug_struct("Queue")
36            .field("entity", &self.entity)
37            .finish()
38    }
39}
40
41impl<T> Queue<T>
42where
43    T: SimObject,
44{
45    /// Create a new queue.
46    ///
47    /// Returns a [`SimError`] if `capacity` is `Some(0)`.
48    pub fn new(parent: &Rc<Entity>, name: &str, capacity: Option<usize>) -> Result<Self, SimError> {
49        if capacity == Some(0) {
50            return sim_error!("Unsupported Queue with 0 capacity");
51        }
52
53        let entity = Rc::new(Entity::new(parent, name));
54        if let Some(capacity) = capacity {
55            entity.track_capacity(capacity, "objects");
56        }
57
58        Ok(Self {
59            entity,
60            capacity,
61            data: RefCell::new(VecDeque::new()),
62            queue_changed: Repeated::default(),
63        })
64    }
65
66    /// Return the current queue length.
67    #[must_use]
68    pub fn len(&self) -> usize {
69        self.data.borrow().len()
70    }
71
72    /// Return whether the queue is empty.
73    #[must_use]
74    pub fn is_empty(&self) -> bool {
75        self.data.borrow().is_empty()
76    }
77
78    /// Return whether the queue is full.
79    #[must_use]
80    pub fn is_full(&self) -> bool {
81        self.capacity.is_some_and(|capacity| self.len() >= capacity)
82    }
83
84    /// Return a snapshot of the queue contents by copying all values.
85    #[must_use]
86    pub fn values(&self) -> Vec<T> {
87        self.data.borrow().iter().cloned().collect()
88    }
89
90    /// Return an event that fires whenever the queue contents change.
91    #[must_use]
92    pub fn changed_event(&self) -> Repeated<()> {
93        self.queue_changed.clone()
94    }
95
96    /// Push a value into the queue, waiting until there is space if required.
97    pub async fn push(&self, value: T) -> SimResult {
98        let mut value = Some(value);
99        loop {
100            if !self.is_full() {
101                self.push_now(value.take().expect("value should still be present"));
102                return Ok(());
103            }
104
105            self.queue_changed.listen().await;
106        }
107    }
108
109    fn push_now(&self, value: T) {
110        enter!(self.entity ; value.id());
111        self.data.borrow_mut().push_back(value);
112        self.queue_changed.notify();
113    }
114
115    /// Pop the oldest value from the queue.
116    #[must_use]
117    pub fn pop_front(&self) -> Option<T> {
118        let value = self.data.borrow_mut().pop_front();
119        if let Some(ref value) = value {
120            exit!(self.entity ; value.id());
121            self.queue_changed.notify();
122        }
123        value
124    }
125
126    /// Remove the first value matching the predicate.
127    #[must_use]
128    pub fn remove_where<F>(&self, predicate: F) -> Option<T>
129    where
130        F: FnMut(&T) -> bool,
131    {
132        let mut queue = self.data.borrow_mut();
133        let index = queue.iter().position(predicate)?;
134        let value = queue.remove(index)?;
135        drop(queue);
136
137        exit!(self.entity ; value.id());
138        self.queue_changed.notify();
139        Some(value)
140    }
141}