gwr_components/
store.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! A data store.
4//!
5//! The [Store] is a component that can hold a number of items defined by its
6//! capacity.
7//!
8//! # Ports
9//!
10//! This component has the following ports:
11//!   - The `rx` port [InPort] which is used to put data into the store.
12//!   - The `tx` port [OutPort] which is used to get data out of the store.
13//!
14//! A given [Store] is capable of holding any type as long as it implements the
15//! [SimObject] trait. This trait has been implemented for a few builtin types
16//! like `i32` and `usize` so that they can be used for simple testing.
17//!
18//! <br>
19//!
20//! # Build a store
21//!
22//! Here is an example creating a [Store]:
23//!
24//! ```rust
25//! use std::rc::Rc;
26//!
27//! use gwr_components::store::Store;
28//! use gwr_engine::engine::Engine;
29//! use gwr_engine::executor::Spawner;
30//! use gwr_track::entity::Entity;
31//!
32//! fn build_store(engine: &Engine, parent: &Rc<Entity>, spawner: Spawner) -> Rc<Store<i32>> {
33//!     // Create a store. It is passed:
34//!     //   - a parent entity which provides its location within the simulation.
35//!     //   - a name which should be unique within the parent.
36//!     //   - a [spawner](gwr_engine::executor::Spawner) that is used to spawn
37//!     //     internal concurrent tasks.
38//!     //   - a capacity.
39//!     let store: Rc<Store<i32>> = Store::new_and_register(engine, parent, "store", spawner, 5)
40//!         .expect("should be able to create and register `Store`");
41//!     store
42//! }
43//! ```
44//!
45//! By default, the store enters a waiting state if the capacity is overflown,
46//! but this behaviour can be changed to return an error, by using the
47//! `set_error_on_overflow()` method.
48//!
49//! ```rust
50//! use std::rc::Rc;
51//!
52//! use gwr_components::store::Store;
53//! use gwr_engine::engine::Engine;
54//! use gwr_engine::executor::Spawner;
55//! use gwr_track::entity::Entity;
56//!
57//! fn build_store_with_panic(
58//!     engine: &Engine,
59//!     parent: &Rc<Entity>,
60//!     spawner: Spawner,
61//! ) -> Rc<Store<i32>> {
62//!     // Create a store that panics on overflow. Use `new_and_register()` as before,
63//!     // then call `set_error_on_overflow()` on the resulting struct.
64//!     let store = Store::new_and_register(engine, parent, "store_error", spawner, 5)
65//!         .expect("should be able to create and register `Store`");
66//!     store.set_error_on_overflow();
67//!     store
68//! }
69//! ```
70
71//! # Connect a store
72//!
73//! Here is an example of a more complete simulation using a [Store] as well as
74//! a [Source](crate::source::Source) to put data into the store and an
75//! [Sink](crate::sink::Sink) to take the data out of the store.
76//!
77//! ```rust
78//! use std::rc::Rc;
79//! use gwr_components::sink::Sink;
80//! use gwr_components::source::Source;
81//! use gwr_components::store::Store;
82//! use gwr_components::{connect_port, option_box_repeat};
83//! use gwr_engine::engine::Engine;
84//! use gwr_engine::run_simulation;
85//!
86//! // Every simulation is based around an `Engine`
87//! let mut engine = Engine::default();
88//!
89//! // A spawner allows components to spawn activity
90//! let spawner = engine.spawner();
91//!
92//! // Take a reference to the engine top to use as the parent
93//! let top = engine.top();
94//!
95//! // Create the basic componets:
96//! // The simplest use of the source is to inject the same value repeatedly.
97//! let source = Source::new_and_register(&engine, top, "source", option_box_repeat!(1 ; 10))
98//!     .expect("should be able to create and register `Source`");
99//! // Create the store - its type will be derived from the connections to its ports.
100//! let store = Store::new_and_register(&engine, top, "store", spawner, 5)
101//!     .expect("should be able to create and register `Store`");
102//! // Create the sink which will pull all of the data items out of the store
103//! let sink = Sink::new_and_register(&engine, top, "sink")
104//!     .expect("should be able to create and register `Sink`");
105//!
106//! // Connect the ports together:
107//! // The source will drive data into the store:
108//! connect_port!(source, tx => store, rx)
109//!     .expect("should be able to connect `Source` to `Store`");
110//!
111//! // The sink will pull data out of the store
112//! connect_port!(store, tx => sink, rx)
113//!     .expect("should be able to connect `Store` to `Sink`");
114//!
115//! // The `run_simulation!` macro then spawns all active components
116//! // and runs the simulation to completion.
117//! run_simulation!(engine);
118//! ```
119
120use std::cell::RefCell;
121use std::collections::VecDeque;
122use std::rc::Rc;
123
124use async_trait::async_trait;
125use gwr_engine::engine::Engine;
126use gwr_engine::events::repeated::Repeated;
127use gwr_engine::executor::Spawner;
128use gwr_engine::port::{InPort, OutPort, PortStateResult};
129use gwr_engine::sim_error;
130use gwr_engine::traits::{Event, Runnable, SimObject};
131use gwr_engine::types::{SimError, SimResult};
132use gwr_model_builder::EntityDisplay;
133use gwr_track::entity::Entity;
134use gwr_track::{enter, exit};
135
136use crate::{connect_tx, port_rx, take_option};
137
138/// The [`State`] of a [`Store`].
139struct State<T>
140where
141    T: SimObject,
142{
143    entity: Rc<Entity>,
144    capacity: usize,
145    data: RefCell<VecDeque<T>>,
146    error_on_overflow: RefCell<bool>,
147    level_change: Repeated<usize>,
148}
149
150impl<T> State<T>
151where
152    T: SimObject,
153{
154    /// Create a new store
155    fn new(entity: &Rc<Entity>, capacity: usize) -> Self {
156        Self {
157            entity: entity.clone(),
158            capacity,
159            data: RefCell::new(VecDeque::with_capacity(capacity)),
160            error_on_overflow: RefCell::new(false),
161            level_change: Repeated::new(usize::default()),
162        }
163    }
164
165    /// Place an object into the store state.
166    ///
167    /// There must be room before this is called.
168    fn push_value(&self, value: T) -> SimResult {
169        enter!(self.entity ; value.id());
170        if *self.error_on_overflow.borrow() {
171            if self.data.borrow().len() >= self.capacity {
172                return sim_error!(format!("Overflow in {:?}", self.entity.full_name()));
173            }
174        } else {
175            assert!(self.data.borrow().len() < self.capacity);
176        }
177
178        self.data.borrow_mut().push_back(value);
179        self.level_change.notify_result(self.data.borrow().len())?;
180        Ok(())
181    }
182
183    /// Remove an object from the store state.
184    ///
185    /// There must be an object available to remove before this is called.
186    fn pop_value(&self) -> Result<T, SimError> {
187        let value = self.data.borrow_mut().pop_front().unwrap();
188        self.level_change.notify_result(self.data.borrow().len())?;
189        exit!(self.entity ; value.id());
190        Ok(value)
191    }
192}
193
194/// A component that can support a configurable number of objects.
195///
196/// Objects must support the [SimObject] trait.
197#[derive(EntityDisplay)]
198pub struct Store<T>
199where
200    T: SimObject,
201{
202    pub entity: Rc<Entity>,
203    spawner: Spawner,
204    state: Rc<State<T>>,
205
206    tx: RefCell<Option<OutPort<T>>>,
207    rx: RefCell<Option<InPort<T>>>,
208}
209
210impl<T> Store<T>
211where
212    T: SimObject,
213{
214    /// Basic store constructor
215    ///
216    /// Returns a `SimError` if `capacity` is 0.
217    pub fn new_and_register(
218        engine: &Engine,
219        parent: &Rc<Entity>,
220        name: &str,
221        spawner: Spawner,
222        capacity: usize,
223    ) -> Result<Rc<Self>, SimError> {
224        if capacity == 0 {
225            return sim_error!("Unsupported Store with 0 capacity");
226        }
227        let entity = Rc::new(Entity::new(parent, name));
228        let state = Rc::new(State::new(&entity, capacity));
229        let tx = OutPort::new(&entity, "tx");
230        let rx = InPort::new(&entity, "rx");
231        let rc_self = Rc::new(Self {
232            entity,
233            spawner,
234            state,
235            tx: RefCell::new(Some(tx)),
236            rx: RefCell::new(Some(rx)),
237        });
238        engine.register(rc_self.clone());
239        Ok(rc_self)
240    }
241
242    pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
243        connect_tx!(self.tx, connect ; port_state)
244    }
245
246    pub fn port_rx(&self) -> PortStateResult<T> {
247        port_rx!(self.rx, state)
248    }
249
250    #[must_use]
251    pub fn fill_level(&self) -> usize {
252        self.state.data.borrow().len()
253    }
254
255    pub fn set_error_on_overflow(&self) {
256        *self.state.error_on_overflow.borrow_mut() = true;
257    }
258
259    #[must_use]
260    pub fn get_level_change_event(&self) -> Repeated<usize> {
261        self.state.level_change.clone()
262    }
263}
264
265#[async_trait(?Send)]
266impl<T> Runnable for Store<T>
267where
268    T: SimObject,
269{
270    async fn run(&self) -> SimResult {
271        let rx = take_option!(self.rx);
272        let state = self.state.clone();
273        self.spawner.spawn(async move { run_rx(rx, state).await });
274
275        let tx = take_option!(self.tx);
276        let state = self.state.clone();
277        self.spawner.spawn(async move { run_tx(tx, state).await });
278        Ok(())
279    }
280}
281
282async fn run_rx<T>(rx: InPort<T>, state: Rc<State<T>>) -> SimResult
283where
284    T: SimObject,
285{
286    let level_change = state.level_change.clone();
287    let error_on_overflow = *state.error_on_overflow.borrow();
288    loop {
289        let level = state.data.borrow().len();
290        if level < state.capacity || error_on_overflow {
291            let value = rx.get()?.await;
292            state.push_value(value)?;
293        } else {
294            level_change.listen().await;
295        }
296    }
297}
298
299async fn run_tx<T>(tx: OutPort<T>, state: Rc<State<T>>) -> SimResult
300where
301    T: SimObject,
302{
303    let level_change = state.level_change.clone();
304    loop {
305        let level = state.data.borrow().len();
306        if level > 0 {
307            // Wait for something to actually want the store value
308            tx.try_put()?.await;
309            let value = state.pop_value()?;
310            tx.put(value)?.await;
311        } else {
312            level_change.listen().await;
313        }
314    }
315}