gwr_components/
store.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! A data store.
4//!
5//! The [Store] is a component that can hold a number of items defined by its
6//! capacity.
7//!
8//! # Ports
9//!
10//! This component has the following ports:
11//!   - The `rx` port [InPort] which is used to put data into the store.
12//!   - The `tx` port [OutPort] which is used to get data out of the store.
13//!
14//! A given [Store] is capable of holding any type as long as it implements the
15//! [SimObject] trait. This trait has been implemented for a few builtin types
16//! like `i32` and `usize` so that they can be used for simple testing.
17//!
18//! <br>
19//!
20//! # Build a store
21//!
22//! Here is an example creating a [Store]:
23//!
24//! ```rust
25//! use std::rc::Rc;
26//!
27//! use gwr_components::store::Store;
28//! use gwr_engine::engine::Engine;
29//! use gwr_engine::executor::Spawner;
30//! use gwr_engine::time::clock::Clock;
31//! use gwr_track::entity::{Entity, GetEntity};
32//!
33//! fn build_store(engine: &Engine, clock: &Clock, parent: &Rc<Entity>) -> Rc<Store<i32>> {
34//!     // Create a store. It is passed:
35//!     //   - the engine under which the store runs
36//!     //   - a clock that the store will run off
37//!     //   - a parent entity which provides its location within the simulation.
38//!     //   - a name which should be unique within the parent.
39//!     //   - a capacity.
40//!     let store: Rc<Store<i32>> = Store::new_and_register(engine, clock, parent, "store", 5)
41//!         .expect("should be able to create and register `Store`");
42//!     store
43//! }
44//! ```
45//!
46//! By default, the store enters a waiting state if the capacity is overflown,
47//! but this behaviour can be changed to return an error, by using the
48//! `set_error_on_overflow()` method.
49//!
50//! ```rust
51//! use std::rc::Rc;
52//!
53//! use gwr_components::store::Store;
54//! use gwr_engine::engine::Engine;
55//! use gwr_engine::executor::Spawner;
56//! use gwr_engine::time::clock::Clock;
57//! use gwr_track::entity::{Entity, GetEntity};
58//!
59//! fn build_store_with_panic(
60//!     engine: &Engine,
61//!     clock: &Clock,
62//!     parent: &Rc<Entity>,
63//!     spawner: Spawner,
64//! ) -> Rc<Store<i32>> {
65//!     // Create a store that panics on overflow. Use `new_and_register()` as before,
66//!     // then call `set_error_on_overflow()` on the resulting struct.
67//!     let store = Store::new_and_register(engine, clock, parent, "store_error", 5)
68//!         .expect("should be able to create and register `Store`");
69//!     store.set_error_on_overflow();
70//!     store
71//! }
72//! ```
73
74//! # Connect a store
75//!
76//! Here is an example of a more complete simulation using a [Store] as well as
77//! a [Source](crate::source::Source) to put data into the store and an
78//! [Sink](crate::sink::Sink) to take the data out of the store.
79//!
80//! ```rust
81//! use std::rc::Rc;
82//! use gwr_components::sink::Sink;
83//! use gwr_components::source::Source;
84//! use gwr_components::store::Store;
85//! use gwr_components::{connect_port, option_box_repeat};
86//! use gwr_engine::engine::Engine;
87//! use gwr_engine::time::clock::Clock;
88//! use gwr_engine::run_simulation;
89//! use gwr_track::entity::GetEntity;
90//!
91//! // Every simulation is based around an `Engine`
92//! let mut engine = Engine::default();
93//!
94//! // For this simulation just use the default clock
95//! let clock = engine.default_clock();
96//!
97//! // Take a reference to the engine top to use as the parent
98//! let top = engine.top();
99//!
100//! // Create the basic componets:
101//! // The simplest use of the source is to inject the same value repeatedly.
102//! let source = Source::new_and_register(&engine, top, "source", option_box_repeat!(1 ; 10))
103//!     .expect("should be able to create and register `Source`");
104//! // Create the store - its type will be derived from the connections to its ports.
105//! let store = Store::new_and_register(&engine, &clock, top, "store", 5)
106//!     .expect("should be able to create and register `Store`");
107//! // Create the sink which will pull all of the data items out of the store
108//! let sink = Sink::new_and_register(&engine, &clock, top, "sink")
109//!     .expect("should be able to create and register `Sink`");
110//!
111//! // Connect the ports together:
112//! // The source will drive data into the store:
113//! connect_port!(source, tx => store, rx)
114//!     .expect("should be able to connect `Source` to `Store`");
115//!
116//! // The sink will pull data out of the store
117//! connect_port!(store, tx => sink, rx)
118//!     .expect("should be able to connect `Store` to `Sink`");
119//!
120//! // The `run_simulation!` macro then spawns all active components
121//! // and runs the simulation to completion.
122//! run_simulation!(engine);
123//! ```
124
125use std::cell::RefCell;
126use std::collections::VecDeque;
127use std::rc::Rc;
128
129use async_trait::async_trait;
130use gwr_engine::engine::Engine;
131use gwr_engine::events::repeated::Repeated;
132use gwr_engine::executor::Spawner;
133use gwr_engine::port::{InPort, OutPort, PortStateResult};
134use gwr_engine::sim_error;
135use gwr_engine::time::clock::Clock;
136use gwr_engine::traits::{Event, Runnable, SimObject};
137use gwr_engine::types::{SimError, SimResult};
138use gwr_model_builder::{EntityDisplay, EntityGet};
139use gwr_track::entity::Entity;
140use gwr_track::tracker::aka::Aka;
141use gwr_track::{enter, exit};
142
143use crate::{connect_tx, port_rx, take_option};
144
145/// The [`State`] of a [`Store`].
146struct State<T>
147where
148    T: SimObject,
149{
150    entity: Rc<Entity>,
151    capacity: usize,
152    data: RefCell<VecDeque<T>>,
153    error_on_overflow: RefCell<bool>,
154    level_change: Repeated<usize>,
155}
156
157impl<T> State<T>
158where
159    T: SimObject,
160{
161    /// Create a new store
162    fn new(entity: &Rc<Entity>, capacity: usize) -> Self {
163        Self {
164            entity: entity.clone(),
165            capacity,
166            data: RefCell::new(VecDeque::with_capacity(capacity)),
167            error_on_overflow: RefCell::new(false),
168            level_change: Repeated::new(usize::default()),
169        }
170    }
171
172    /// Place an object into the store state.
173    ///
174    /// There must be room before this is called.
175    fn push_value(&self, value: T) -> SimResult {
176        enter!(self.entity ; value.id());
177        if *self.error_on_overflow.borrow() {
178            if self.data.borrow().len() >= self.capacity {
179                return sim_error!(format!("Overflow in {:?}", self.entity.full_name()));
180            }
181        } else {
182            assert!(self.data.borrow().len() < self.capacity);
183        }
184
185        self.data.borrow_mut().push_back(value);
186        self.level_change.notify_result(self.data.borrow().len())?;
187        Ok(())
188    }
189
190    /// Remove an object from the store state.
191    ///
192    /// There must be an object available to remove before this is called.
193    fn pop_value(&self) -> Result<T, SimError> {
194        let value = self.data.borrow_mut().pop_front().unwrap();
195        self.level_change.notify_result(self.data.borrow().len())?;
196        exit!(self.entity ; value.id());
197        Ok(value)
198    }
199}
200
201/// A component that can support a configurable number of objects.
202///
203/// Objects must support the [SimObject] trait.
204#[derive(EntityGet, EntityDisplay)]
205pub struct Store<T>
206where
207    T: SimObject,
208{
209    entity: Rc<Entity>,
210    spawner: Spawner,
211    state: Rc<State<T>>,
212
213    tx: RefCell<Option<OutPort<T>>>,
214    rx: RefCell<Option<InPort<T>>>,
215}
216
217impl<T> Store<T>
218where
219    T: SimObject,
220{
221    /// Basic store constructor
222    ///
223    /// Returns a `SimError` if `capacity` is 0.
224    pub fn new_and_register_with_renames(
225        engine: &Engine,
226        clock: &Clock,
227        parent: &Rc<Entity>,
228        name: &str,
229        aka: Option<&Aka>,
230        capacity: usize,
231    ) -> Result<Rc<Self>, SimError> {
232        let spawner = engine.spawner();
233        if capacity == 0 {
234            return sim_error!("Unsupported Store with 0 capacity");
235        }
236        let entity = Rc::new(Entity::new(parent, name));
237        let state = Rc::new(State::new(&entity, capacity));
238        let tx = OutPort::new_with_renames(&entity, "tx", aka);
239        let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
240        let rc_self = Rc::new(Self {
241            entity,
242            spawner,
243            state,
244            tx: RefCell::new(Some(tx)),
245            rx: RefCell::new(Some(rx)),
246        });
247        engine.register(rc_self.clone());
248        Ok(rc_self)
249    }
250
251    /// Basic store constructor
252    ///
253    /// Returns a `SimError` if `capacity` is 0.
254    pub fn new_and_register(
255        engine: &Engine,
256        clock: &Clock,
257        parent: &Rc<Entity>,
258        name: &str,
259        capacity: usize,
260    ) -> Result<Rc<Self>, SimError> {
261        Self::new_and_register_with_renames(engine, clock, parent, name, None, capacity)
262    }
263
264    pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
265        connect_tx!(self.tx, connect ; port_state)
266    }
267
268    pub fn port_rx(&self) -> PortStateResult<T> {
269        port_rx!(self.rx, state)
270    }
271
272    #[must_use]
273    pub fn fill_level(&self) -> usize {
274        self.state.data.borrow().len()
275    }
276
277    pub fn set_error_on_overflow(&self) {
278        *self.state.error_on_overflow.borrow_mut() = true;
279    }
280
281    #[must_use]
282    pub fn get_level_change_event(&self) -> Repeated<usize> {
283        self.state.level_change.clone()
284    }
285}
286
287#[async_trait(?Send)]
288impl<T> Runnable for Store<T>
289where
290    T: SimObject,
291{
292    async fn run(&self) -> SimResult {
293        let rx = take_option!(self.rx);
294        let state = self.state.clone();
295        self.spawner.spawn(async move { run_rx(rx, state).await });
296
297        let tx = take_option!(self.tx);
298        let state = self.state.clone();
299        self.spawner.spawn(async move { run_tx(tx, state).await });
300        Ok(())
301    }
302}
303
304async fn run_rx<T>(rx: InPort<T>, state: Rc<State<T>>) -> SimResult
305where
306    T: SimObject,
307{
308    let level_change = state.level_change.clone();
309    let error_on_overflow = *state.error_on_overflow.borrow();
310    loop {
311        let level = state.data.borrow().len();
312        if level < state.capacity || error_on_overflow {
313            let value = rx.get()?.await;
314            state.push_value(value)?;
315        } else {
316            level_change.listen().await;
317        }
318    }
319}
320
321async fn run_tx<T>(tx: OutPort<T>, state: Rc<State<T>>) -> SimResult
322where
323    T: SimObject,
324{
325    let level_change = state.level_change.clone();
326    loop {
327        let level = state.data.borrow().len();
328        if level > 0 {
329            // Wait for something to actually want the store value
330            tx.try_put()?.await;
331            let value = state.pop_value()?;
332            tx.put(value)?.await;
333        } else {
334            level_change.listen().await;
335        }
336    }
337}