gwr_components/store.rs
1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! A data store.
4//!
5//! The [Store] is a component that can hold a number of items defined by its
6//! capacity.
7//!
8//! # Ports
9//!
10//! This component has the following ports:
11//! - The `rx` port [InPort] which is used to put data into the store.
12//! - The `tx` port [OutPort] which is used to get data out of the store.
13//!
14//! A given [Store] is capable of holding any type as long as it implements the
15//! [SimObject] trait. This trait has been implemented for a few builtin types
16//! like `i32` and `usize` so that they can be used for simple testing.
17//!
18//! <br>
19//!
20//! # Build a store
21//!
22//! Here is an example creating a [Store]:
23//!
24//! ```rust
25//! use std::rc::Rc;
26//!
27//! use gwr_components::store::Store;
28//! use gwr_engine::engine::Engine;
29//! use gwr_engine::executor::Spawner;
30//! use gwr_track::entity::Entity;
31//!
32//! fn build_store(engine: &Engine, parent: &Rc<Entity>, spawner: Spawner) -> Rc<Store<i32>> {
33//! // Create a store. It is passed:
34//! // - a parent entity which provides its location within the simulation.
35//! // - a name which should be unique within the parent.
36//! // - a [spawner](gwr_engine::executor::Spawner) that is used to spawn
37//! // internal concurrent tasks.
38//! // - a capacity.
39//! let store: Rc<Store<i32>> = Store::new_and_register(engine, parent, "store", spawner, 5)
40//! .expect("should be able to create and register `Store`");
41//! store
42//! }
43//! ```
44//!
45//! By default, the store enters a waiting state if the capacity is overflown,
46//! but this behaviour can be changed to return an error, by using the
47//! `set_error_on_overflow()` method.
48//!
49//! ```rust
50//! use std::rc::Rc;
51//!
52//! use gwr_components::store::Store;
53//! use gwr_engine::engine::Engine;
54//! use gwr_engine::executor::Spawner;
55//! use gwr_track::entity::Entity;
56//!
57//! fn build_store_with_panic(
58//! engine: &Engine,
59//! parent: &Rc<Entity>,
60//! spawner: Spawner,
61//! ) -> Rc<Store<i32>> {
62//! // Create a store that panics on overflow. Use `new_and_register()` as before,
63//! // then call `set_error_on_overflow()` on the resulting struct.
64//! let store = Store::new_and_register(engine, parent, "store_error", spawner, 5)
65//! .expect("should be able to create and register `Store`");
66//! store.set_error_on_overflow();
67//! store
68//! }
69//! ```
70
71//! # Connect a store
72//!
73//! Here is an example of a more complete simulation using a [Store] as well as
74//! a [Source](crate::source::Source) to put data into the store and an
75//! [Sink](crate::sink::Sink) to take the data out of the store.
76//!
77//! ```rust
78//! use std::rc::Rc;
79//! use gwr_components::sink::Sink;
80//! use gwr_components::source::Source;
81//! use gwr_components::store::Store;
82//! use gwr_components::{connect_port, option_box_repeat};
83//! use gwr_engine::engine::Engine;
84//! use gwr_engine::run_simulation;
85//!
86//! // Every simulation is based around an `Engine`
87//! let mut engine = Engine::default();
88//!
89//! // A spawner allows components to spawn activity
90//! let spawner = engine.spawner();
91//!
92//! // Take a reference to the engine top to use as the parent
93//! let top = engine.top();
94//!
95//! // Create the basic componets:
96//! // The simplest use of the source is to inject the same value repeatedly.
97//! let source = Source::new_and_register(&engine, top, "source", option_box_repeat!(1 ; 10))
98//! .expect("should be able to create and register `Source`");
99//! // Create the store - its type will be derived from the connections to its ports.
100//! let store = Store::new_and_register(&engine, top, "store", spawner, 5)
101//! .expect("should be able to create and register `Store`");
102//! // Create the sink which will pull all of the data items out of the store
103//! let sink = Sink::new_and_register(&engine, top, "sink")
104//! .expect("should be able to create and register `Sink`");
105//!
106//! // Connect the ports together:
107//! // The source will drive data into the store:
108//! connect_port!(source, tx => store, rx)
109//! .expect("should be able to connect `Source` to `Store`");
110//!
111//! // The sink will pull data out of the store
112//! connect_port!(store, tx => sink, rx)
113//! .expect("should be able to connect `Store` to `Sink`");
114//!
115//! // The `run_simulation!` macro then spawns all active components
116//! // and runs the simulation to completion.
117//! run_simulation!(engine);
118//! ```
119
120use std::cell::RefCell;
121use std::collections::VecDeque;
122use std::rc::Rc;
123
124use async_trait::async_trait;
125use gwr_engine::engine::Engine;
126use gwr_engine::events::repeated::Repeated;
127use gwr_engine::executor::Spawner;
128use gwr_engine::port::{InPort, OutPort, PortStateResult};
129use gwr_engine::sim_error;
130use gwr_engine::traits::{Event, Runnable, SimObject};
131use gwr_engine::types::{SimError, SimResult};
132use gwr_model_builder::EntityDisplay;
133use gwr_track::entity::Entity;
134use gwr_track::{enter, exit};
135
136use crate::{connect_tx, port_rx, take_option};
137
138/// The [`State`] of a [`Store`].
139struct State<T>
140where
141 T: SimObject,
142{
143 entity: Rc<Entity>,
144 capacity: usize,
145 data: RefCell<VecDeque<T>>,
146 error_on_overflow: RefCell<bool>,
147 level_change: Repeated<usize>,
148}
149
150impl<T> State<T>
151where
152 T: SimObject,
153{
154 /// Create a new store
155 fn new(entity: &Rc<Entity>, capacity: usize) -> Self {
156 Self {
157 entity: entity.clone(),
158 capacity,
159 data: RefCell::new(VecDeque::with_capacity(capacity)),
160 error_on_overflow: RefCell::new(false),
161 level_change: Repeated::new(usize::default()),
162 }
163 }
164
165 /// Place an object into the store state.
166 ///
167 /// There must be room before this is called.
168 fn push_value(&self, value: T) -> SimResult {
169 enter!(self.entity ; value.id());
170 if *self.error_on_overflow.borrow() {
171 if self.data.borrow().len() >= self.capacity {
172 return sim_error!(format!("Overflow in {:?}", self.entity.full_name()));
173 }
174 } else {
175 assert!(self.data.borrow().len() < self.capacity);
176 }
177
178 self.data.borrow_mut().push_back(value);
179 self.level_change.notify_result(self.data.borrow().len())?;
180 Ok(())
181 }
182
183 /// Remove an object from the store state.
184 ///
185 /// There must be an object available to remove before this is called.
186 fn pop_value(&self) -> Result<T, SimError> {
187 let value = self.data.borrow_mut().pop_front().unwrap();
188 self.level_change.notify_result(self.data.borrow().len())?;
189 exit!(self.entity ; value.id());
190 Ok(value)
191 }
192}
193
194/// A component that can support a configurable number of objects.
195///
196/// Objects must support the [SimObject] trait.
197#[derive(EntityDisplay)]
198pub struct Store<T>
199where
200 T: SimObject,
201{
202 pub entity: Rc<Entity>,
203 spawner: Spawner,
204 state: Rc<State<T>>,
205
206 tx: RefCell<Option<OutPort<T>>>,
207 rx: RefCell<Option<InPort<T>>>,
208}
209
210impl<T> Store<T>
211where
212 T: SimObject,
213{
214 /// Basic store constructor
215 ///
216 /// Returns a `SimError` if `capacity` is 0.
217 pub fn new_and_register(
218 engine: &Engine,
219 parent: &Rc<Entity>,
220 name: &str,
221 spawner: Spawner,
222 capacity: usize,
223 ) -> Result<Rc<Self>, SimError> {
224 if capacity == 0 {
225 return sim_error!("Unsupported Store with 0 capacity");
226 }
227 let entity = Rc::new(Entity::new(parent, name));
228 let state = Rc::new(State::new(&entity, capacity));
229 let tx = OutPort::new(&entity, "tx");
230 let rx = InPort::new(&entity, "rx");
231 let rc_self = Rc::new(Self {
232 entity,
233 spawner,
234 state,
235 tx: RefCell::new(Some(tx)),
236 rx: RefCell::new(Some(rx)),
237 });
238 engine.register(rc_self.clone());
239 Ok(rc_self)
240 }
241
242 pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
243 connect_tx!(self.tx, connect ; port_state)
244 }
245
246 pub fn port_rx(&self) -> PortStateResult<T> {
247 port_rx!(self.rx, state)
248 }
249
250 #[must_use]
251 pub fn fill_level(&self) -> usize {
252 self.state.data.borrow().len()
253 }
254
255 pub fn set_error_on_overflow(&self) {
256 *self.state.error_on_overflow.borrow_mut() = true;
257 }
258
259 #[must_use]
260 pub fn get_level_change_event(&self) -> Repeated<usize> {
261 self.state.level_change.clone()
262 }
263}
264
265#[async_trait(?Send)]
266impl<T> Runnable for Store<T>
267where
268 T: SimObject,
269{
270 async fn run(&self) -> SimResult {
271 let rx = take_option!(self.rx);
272 let state = self.state.clone();
273 self.spawner.spawn(async move { run_rx(rx, state).await });
274
275 let tx = take_option!(self.tx);
276 let state = self.state.clone();
277 self.spawner.spawn(async move { run_tx(tx, state).await });
278 Ok(())
279 }
280}
281
282async fn run_rx<T>(rx: InPort<T>, state: Rc<State<T>>) -> SimResult
283where
284 T: SimObject,
285{
286 let level_change = state.level_change.clone();
287 let error_on_overflow = *state.error_on_overflow.borrow();
288 loop {
289 let level = state.data.borrow().len();
290 if level < state.capacity || error_on_overflow {
291 let value = rx.get()?.await;
292 state.push_value(value)?;
293 } else {
294 level_change.listen().await;
295 }
296 }
297}
298
299async fn run_tx<T>(tx: OutPort<T>, state: Rc<State<T>>) -> SimResult
300where
301 T: SimObject,
302{
303 let level_change = state.level_change.clone();
304 loop {
305 let level = state.data.borrow().len();
306 if level > 0 {
307 // Wait for something to actually want the store value
308 tx.try_put()?.await;
309 let value = state.pop_value()?;
310 tx.put(value)?.await;
311 } else {
312 level_change.listen().await;
313 }
314 }
315}