gwr_components/
store.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! A data store.
4//!
5//! The [Store] is a component that can hold a number of items defined by its
6//! capacity.
7//!
8//! # Ports
9//!
10//! This component has the following ports:
11//!   - The `rx` port [InPort] which is used to put data into the store.
12//!   - The `tx` port [OutPort] which is used to get data out of the store.
13//!
14//! A given [Store] is capable of holding any type as long as it implements the
15//! [SimObject] trait. This trait has been implemented for a few builtin types
16//! like `i32` and `usize` so that they can be used for simple testing.
17//!
18//! <br>
19//!
20//! # Build a store
21//!
22//! Here is an example creating a [Store]:
23//!
24//! ```rust
25//! use std::rc::Rc;
26//!
27//! use gwr_components::store::Store;
28//! use gwr_engine::engine::Engine;
29//! use gwr_engine::executor::Spawner;
30//! use gwr_engine::time::clock::Clock;
31//! use gwr_track::entity::{Entity, GetEntity};
32//!
33//! fn build_store(engine: &Engine, clock: &Clock, parent: &Rc<Entity>) -> Rc<Store<i32>> {
34//!     // Create a store. It is passed:
35//!     //   - the engine under which the store runs
36//!     //   - a clock that the store will run off
37//!     //   - a parent entity which provides its location within the simulation.
38//!     //   - a name which should be unique within the parent.
39//!     //   - a capacity.
40//!     let store: Rc<Store<i32>> = Store::new_and_register(engine, clock, parent, "store", 5)
41//!         .expect("should be able to create and register `Store`");
42//!     store
43//! }
44//! ```
45//!
46//! By default, the store enters a waiting state if the capacity is overflown,
47//! but this behaviour can be changed to return an error, by using the
48//! `set_error_on_overflow()` method.
49//!
50//! ```rust
51//! use std::rc::Rc;
52//!
53//! use gwr_components::store::Store;
54//! use gwr_engine::engine::Engine;
55//! use gwr_engine::executor::Spawner;
56//! use gwr_engine::time::clock::Clock;
57//! use gwr_track::entity::{Entity, GetEntity};
58//!
59//! fn build_store_with_panic(
60//!     engine: &Engine,
61//!     clock: &Clock,
62//!     parent: &Rc<Entity>,
63//!     spawner: Spawner,
64//! ) -> Rc<Store<i32>> {
65//!     // Create a store that panics on overflow. Use `new_and_register()` as before,
66//!     // then call `set_error_on_overflow()` on the resulting struct.
67//!     let store = Store::new_and_register(engine, clock, parent, "store_error", 5)
68//!         .expect("should be able to create and register `Store`");
69//!     store.set_error_on_overflow();
70//!     store
71//! }
72//! ```
73
74//! # Connect a store
75//!
76//! Here is an example of a more complete simulation using a [Store] as well as
77//! a [Source](crate::source::Source) to put data into the store and an
78//! [Sink](crate::sink::Sink) to take the data out of the store.
79//!
80//! ```rust
81//! use std::rc::Rc;
82//! use gwr_components::sink::Sink;
83//! use gwr_components::source::Source;
84//! use gwr_components::store::Store;
85//! use gwr_components::{connect_port, option_box_repeat};
86//! use gwr_engine::engine::Engine;
87//! use gwr_engine::time::clock::Clock;
88//! use gwr_engine::run_simulation;
89//! use gwr_track::entity::GetEntity;
90//!
91//! // Every simulation is based around an `Engine`
92//! let mut engine = Engine::default();
93//!
94//! // For this simulation just use the default clock
95//! let clock = engine.default_clock();
96//!
97//! // Take a reference to the engine top to use as the parent
98//! let top = engine.top();
99//!
100//! // Create the basic componets:
101//! // The simplest use of the source is to inject the same value repeatedly.
102//! let source = Source::new_and_register(&engine, top, "source", option_box_repeat!(1 ; 10));
103//! // Create the store - its type will be derived from the connections to its ports.
104//! let store = Store::new_and_register(&engine, &clock, top, "store", 5)
105//!     .expect("should be able to create and register `Store`");
106//! // Create the sink which will pull all of the data items out of the store
107//! let sink = Sink::new_and_register(&engine, &clock, top, "sink");
108//!
109//! // Connect the ports together:
110//! // The source will drive data into the store:
111//! connect_port!(source, tx => store, rx)
112//!     .expect("should be able to connect `Source` to `Store`");
113//!
114//! // The sink will pull data out of the store
115//! connect_port!(store, tx => sink, rx)
116//!     .expect("should be able to connect `Store` to `Sink`");
117//!
118//! // The `run_simulation!` macro then spawns all active components
119//! // and runs the simulation to completion.
120//! run_simulation!(engine);
121//! ```
122
123use std::cell::RefCell;
124use std::collections::VecDeque;
125use std::rc::Rc;
126
127use async_trait::async_trait;
128use gwr_engine::engine::Engine;
129use gwr_engine::events::repeated::Repeated;
130use gwr_engine::executor::Spawner;
131use gwr_engine::port::{InPort, OutPort, PortStateResult};
132use gwr_engine::sim_error;
133use gwr_engine::time::clock::Clock;
134use gwr_engine::traits::{Event, Runnable, SimObject};
135use gwr_engine::types::{SimError, SimResult};
136use gwr_model_builder::{EntityDisplay, EntityGet};
137use gwr_track::entity::Entity;
138use gwr_track::tracker::aka::Aka;
139
140use crate::{connect_tx, port_rx, take_option};
141
142/// The [`State`] of a [`Store`].
143struct State<T>
144where
145    T: SimObject,
146{
147    entity: Rc<Entity>,
148    capacity: usize,
149    data: RefCell<VecDeque<T>>,
150    error_on_overflow: RefCell<bool>,
151    level_change: Repeated<usize>,
152}
153
154impl<T> State<T>
155where
156    T: SimObject,
157{
158    /// Create a new store
159    fn new(entity: &Rc<Entity>, capacity: usize) -> Self {
160        Self {
161            entity: entity.clone(),
162            capacity,
163            data: RefCell::new(VecDeque::with_capacity(capacity)),
164            error_on_overflow: RefCell::new(false),
165            level_change: Repeated::new(usize::default()),
166        }
167    }
168
169    /// Place an object into the store state.
170    ///
171    /// There must be room before this is called.
172    fn push_value(&self, value: T) -> SimResult {
173        self.entity.track_enter(value.id());
174        if *self.error_on_overflow.borrow() {
175            if self.data.borrow().len() >= self.capacity {
176                return sim_error!("Overflow in {:?}", self.entity.full_name());
177            }
178        } else {
179            assert!(self.data.borrow().len() < self.capacity);
180        }
181
182        self.data.borrow_mut().push_back(value);
183        self.level_change.notify_result(self.data.borrow().len());
184        Ok(())
185    }
186
187    /// Remove an object from the store state.
188    ///
189    /// There must be an object available to remove before this is called.
190    fn pop_value(&self) -> Result<T, SimError> {
191        let value = self.data.borrow_mut().pop_front().unwrap();
192        self.level_change.notify_result(self.data.borrow().len());
193        self.entity.track_exit(value.id());
194        Ok(value)
195    }
196}
197
198/// A component that can support a configurable number of objects.
199///
200/// Objects must support the [SimObject] trait.
201#[derive(EntityGet, EntityDisplay)]
202pub struct Store<T>
203where
204    T: SimObject,
205{
206    entity: Rc<Entity>,
207    spawner: Spawner,
208    state: Rc<State<T>>,
209
210    tx: RefCell<Option<OutPort<T>>>,
211    rx: RefCell<Option<InPort<T>>>,
212}
213
214impl<T> Store<T>
215where
216    T: SimObject,
217{
218    /// Basic store constructor
219    ///
220    /// Returns a `SimError` if `capacity` is 0.
221    pub fn new_and_register_with_renames(
222        engine: &Engine,
223        clock: &Clock,
224        parent: &Rc<Entity>,
225        name: &str,
226        aka: Option<&Aka>,
227        capacity: usize,
228    ) -> Result<Rc<Self>, SimError> {
229        let spawner = engine.spawner();
230        if capacity == 0 {
231            return sim_error!("Unsupported Store with 0 capacity");
232        }
233        let entity = Rc::new(Entity::new(parent, name));
234        entity.track_capacity(capacity, "objects");
235        let state = Rc::new(State::new(&entity, capacity));
236        let tx = OutPort::new_with_renames(&entity, "tx", aka);
237        let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
238        let rc_self = Rc::new(Self {
239            entity,
240            spawner,
241            state,
242            tx: RefCell::new(Some(tx)),
243            rx: RefCell::new(Some(rx)),
244        });
245        engine.register(rc_self.clone());
246        Ok(rc_self)
247    }
248
249    /// Basic store constructor
250    ///
251    /// Returns a `SimError` if `capacity` is 0.
252    pub fn new_and_register(
253        engine: &Engine,
254        clock: &Clock,
255        parent: &Rc<Entity>,
256        name: &str,
257        capacity: usize,
258    ) -> Result<Rc<Self>, SimError> {
259        Self::new_and_register_with_renames(engine, clock, parent, name, None, capacity)
260    }
261
262    pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
263        connect_tx!(self.tx, connect ; port_state)
264    }
265
266    pub fn port_rx(&self) -> PortStateResult<T> {
267        port_rx!(self.rx, state)
268    }
269
270    #[must_use]
271    pub fn fill_level(&self) -> usize {
272        self.state.data.borrow().len()
273    }
274
275    pub fn set_error_on_overflow(&self) {
276        *self.state.error_on_overflow.borrow_mut() = true;
277    }
278
279    #[must_use]
280    pub fn get_level_change_event(&self) -> Repeated<usize> {
281        self.state.level_change.clone()
282    }
283}
284
285#[async_trait(?Send)]
286impl<T> Runnable for Store<T>
287where
288    T: SimObject,
289{
290    async fn run(&self) -> SimResult {
291        let rx = take_option!(self.rx);
292        let state = self.state.clone();
293        self.spawner.spawn(async move { run_rx(rx, state).await });
294
295        let tx = take_option!(self.tx);
296        let state = self.state.clone();
297        self.spawner.spawn(async move { run_tx(tx, state).await });
298        Ok(())
299    }
300}
301
302async fn run_rx<T>(rx: InPort<T>, state: Rc<State<T>>) -> SimResult
303where
304    T: SimObject,
305{
306    let level_change = state.level_change.clone();
307    let error_on_overflow = *state.error_on_overflow.borrow();
308    loop {
309        let level = state.data.borrow().len();
310        if level < state.capacity || error_on_overflow {
311            let value = rx.get()?.await;
312            state.push_value(value)?;
313        } else {
314            level_change.listen().await;
315        }
316    }
317}
318
319async fn run_tx<T>(tx: OutPort<T>, state: Rc<State<T>>) -> SimResult
320where
321    T: SimObject,
322{
323    let level_change = state.level_change.clone();
324    loop {
325        let level = state.data.borrow().len();
326        if level > 0 {
327            // Wait for something to actually want the store value
328            tx.try_put()?.await;
329            let value = state.pop_value()?;
330            tx.put(value)?.await;
331        } else {
332            level_change.listen().await;
333        }
334    }
335}