gwr_components/store.rs
1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! A data store.
4//!
5//! The [Store] is a component that can hold a number of items defined by its
6//! capacity.
7//!
8//! # Ports
9//!
10//! This component has the following ports:
11//! - The `rx` port [InPort] which is used to put data into the store.
12//! - The `tx` port [OutPort] which is used to get data out of the store.
13//!
14//! A given [Store] is capable of holding any type as long as it implements the
15//! [SimObject] trait. This trait has been implemented for a few builtin types
16//! like `i32` and `usize` so that they can be used for simple testing.
17//!
18//! <br>
19//!
20//! # Build a store
21//!
22//! Here is an example creating a [Store]:
23//!
24//! ```rust
25//! use std::rc::Rc;
26//!
27//! use gwr_components::store::Store;
28//! use gwr_engine::engine::Engine;
29//! use gwr_engine::executor::Spawner;
30//! use gwr_engine::time::clock::Clock;
31//! use gwr_track::entity::{Entity, GetEntity};
32//!
33//! fn build_store(engine: &Engine, clock: &Clock, parent: &Rc<Entity>) -> Rc<Store<i32>> {
34//! // Create a store. It is passed:
35//! // - the engine under which the store runs
36//! // - a clock that the store will run off
37//! // - a parent entity which provides its location within the simulation.
38//! // - a name which should be unique within the parent.
39//! // - a capacity.
40//! let store: Rc<Store<i32>> = Store::new_and_register(engine, clock, parent, "store", 5)
41//! .expect("should be able to create and register `Store`");
42//! store
43//! }
44//! ```
45//!
46//! By default, the store enters a waiting state if the capacity is overflown,
47//! but this behaviour can be changed to return an error, by using the
48//! `set_error_on_overflow()` method.
49//!
50//! ```rust
51//! use std::rc::Rc;
52//!
53//! use gwr_components::store::Store;
54//! use gwr_engine::engine::Engine;
55//! use gwr_engine::executor::Spawner;
56//! use gwr_engine::time::clock::Clock;
57//! use gwr_track::entity::{Entity, GetEntity};
58//!
59//! fn build_store_with_panic(
60//! engine: &Engine,
61//! clock: &Clock,
62//! parent: &Rc<Entity>,
63//! spawner: Spawner,
64//! ) -> Rc<Store<i32>> {
65//! // Create a store that panics on overflow. Use `new_and_register()` as before,
66//! // then call `set_error_on_overflow()` on the resulting struct.
67//! let store = Store::new_and_register(engine, clock, parent, "store_error", 5)
68//! .expect("should be able to create and register `Store`");
69//! store.set_error_on_overflow();
70//! store
71//! }
72//! ```
73
74//! # Connect a store
75//!
76//! Here is an example of a more complete simulation using a [Store] as well as
77//! a [Source](crate::source::Source) to put data into the store and an
78//! [Sink](crate::sink::Sink) to take the data out of the store.
79//!
80//! ```rust
81//! use std::rc::Rc;
82//! use gwr_components::sink::Sink;
83//! use gwr_components::source::Source;
84//! use gwr_components::store::Store;
85//! use gwr_components::{connect_port, option_box_repeat};
86//! use gwr_engine::engine::Engine;
87//! use gwr_engine::time::clock::Clock;
88//! use gwr_engine::run_simulation;
89//! use gwr_track::entity::GetEntity;
90//!
91//! // Every simulation is based around an `Engine`
92//! let mut engine = Engine::default();
93//!
94//! // For this simulation just use the default clock
95//! let clock = engine.default_clock();
96//!
97//! // Take a reference to the engine top to use as the parent
98//! let top = engine.top();
99//!
100//! // Create the basic componets:
101//! // The simplest use of the source is to inject the same value repeatedly.
102//! let source = Source::new_and_register(&engine, top, "source", option_box_repeat!(1 ; 10));
103//! // Create the store - its type will be derived from the connections to its ports.
104//! let store = Store::new_and_register(&engine, &clock, top, "store", 5)
105//! .expect("should be able to create and register `Store`");
106//! // Create the sink which will pull all of the data items out of the store
107//! let sink = Sink::new_and_register(&engine, &clock, top, "sink");
108//!
109//! // Connect the ports together:
110//! // The source will drive data into the store:
111//! connect_port!(source, tx => store, rx)
112//! .expect("should be able to connect `Source` to `Store`");
113//!
114//! // The sink will pull data out of the store
115//! connect_port!(store, tx => sink, rx)
116//! .expect("should be able to connect `Store` to `Sink`");
117//!
118//! // The `run_simulation!` macro then spawns all active components
119//! // and runs the simulation to completion.
120//! run_simulation!(engine);
121//! ```
122
123use std::cell::RefCell;
124use std::collections::VecDeque;
125use std::rc::Rc;
126
127use async_trait::async_trait;
128use gwr_engine::engine::Engine;
129use gwr_engine::events::repeated::Repeated;
130use gwr_engine::executor::Spawner;
131use gwr_engine::port::{InPort, OutPort, PortStateResult};
132use gwr_engine::sim_error;
133use gwr_engine::time::clock::Clock;
134use gwr_engine::traits::{Event, Runnable, SimObject};
135use gwr_engine::types::{SimError, SimResult};
136use gwr_model_builder::{EntityDisplay, EntityGet};
137use gwr_track::entity::Entity;
138use gwr_track::tracker::aka::Aka;
139
140use crate::{connect_tx, port_rx, take_option};
141
142/// The [`State`] of a [`Store`].
143struct State<T>
144where
145 T: SimObject,
146{
147 entity: Rc<Entity>,
148 capacity: usize,
149 data: RefCell<VecDeque<T>>,
150 error_on_overflow: RefCell<bool>,
151 level_change: Repeated<usize>,
152}
153
154impl<T> State<T>
155where
156 T: SimObject,
157{
158 /// Create a new store
159 fn new(entity: &Rc<Entity>, capacity: usize) -> Self {
160 Self {
161 entity: entity.clone(),
162 capacity,
163 data: RefCell::new(VecDeque::with_capacity(capacity)),
164 error_on_overflow: RefCell::new(false),
165 level_change: Repeated::new(usize::default()),
166 }
167 }
168
169 /// Place an object into the store state.
170 ///
171 /// There must be room before this is called.
172 fn push_value(&self, value: T) -> SimResult {
173 self.entity.track_enter(value.id());
174 if *self.error_on_overflow.borrow() {
175 if self.data.borrow().len() >= self.capacity {
176 return sim_error!("Overflow in {:?}", self.entity.full_name());
177 }
178 } else {
179 assert!(self.data.borrow().len() < self.capacity);
180 }
181
182 self.data.borrow_mut().push_back(value);
183 self.level_change.notify_result(self.data.borrow().len());
184 Ok(())
185 }
186
187 /// Remove an object from the store state.
188 ///
189 /// There must be an object available to remove before this is called.
190 fn pop_value(&self) -> Result<T, SimError> {
191 let value = self.data.borrow_mut().pop_front().unwrap();
192 self.level_change.notify_result(self.data.borrow().len());
193 self.entity.track_exit(value.id());
194 Ok(value)
195 }
196}
197
198/// A component that can support a configurable number of objects.
199///
200/// Objects must support the [SimObject] trait.
201#[derive(EntityGet, EntityDisplay)]
202pub struct Store<T>
203where
204 T: SimObject,
205{
206 entity: Rc<Entity>,
207 spawner: Spawner,
208 state: Rc<State<T>>,
209
210 tx: RefCell<Option<OutPort<T>>>,
211 rx: RefCell<Option<InPort<T>>>,
212}
213
214impl<T> Store<T>
215where
216 T: SimObject,
217{
218 /// Basic store constructor
219 ///
220 /// Returns a `SimError` if `capacity` is 0.
221 pub fn new_and_register_with_renames(
222 engine: &Engine,
223 clock: &Clock,
224 parent: &Rc<Entity>,
225 name: &str,
226 aka: Option<&Aka>,
227 capacity: usize,
228 ) -> Result<Rc<Self>, SimError> {
229 let spawner = engine.spawner();
230 if capacity == 0 {
231 return sim_error!("Unsupported Store with 0 capacity");
232 }
233 let entity = Rc::new(Entity::new(parent, name));
234 entity.track_capacity(capacity, "objects");
235 let state = Rc::new(State::new(&entity, capacity));
236 let tx = OutPort::new_with_renames(&entity, "tx", aka);
237 let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
238 let rc_self = Rc::new(Self {
239 entity,
240 spawner,
241 state,
242 tx: RefCell::new(Some(tx)),
243 rx: RefCell::new(Some(rx)),
244 });
245 engine.register(rc_self.clone());
246 Ok(rc_self)
247 }
248
249 /// Basic store constructor
250 ///
251 /// Returns a `SimError` if `capacity` is 0.
252 pub fn new_and_register(
253 engine: &Engine,
254 clock: &Clock,
255 parent: &Rc<Entity>,
256 name: &str,
257 capacity: usize,
258 ) -> Result<Rc<Self>, SimError> {
259 Self::new_and_register_with_renames(engine, clock, parent, name, None, capacity)
260 }
261
262 pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
263 connect_tx!(self.tx, connect ; port_state)
264 }
265
266 pub fn port_rx(&self) -> PortStateResult<T> {
267 port_rx!(self.rx, state)
268 }
269
270 #[must_use]
271 pub fn fill_level(&self) -> usize {
272 self.state.data.borrow().len()
273 }
274
275 pub fn set_error_on_overflow(&self) {
276 *self.state.error_on_overflow.borrow_mut() = true;
277 }
278
279 #[must_use]
280 pub fn get_level_change_event(&self) -> Repeated<usize> {
281 self.state.level_change.clone()
282 }
283}
284
285#[async_trait(?Send)]
286impl<T> Runnable for Store<T>
287where
288 T: SimObject,
289{
290 async fn run(&self) -> SimResult {
291 let rx = take_option!(self.rx);
292 let state = self.state.clone();
293 self.spawner.spawn(async move { run_rx(rx, state).await });
294
295 let tx = take_option!(self.tx);
296 let state = self.state.clone();
297 self.spawner.spawn(async move { run_tx(tx, state).await });
298 Ok(())
299 }
300}
301
302async fn run_rx<T>(rx: InPort<T>, state: Rc<State<T>>) -> SimResult
303where
304 T: SimObject,
305{
306 let level_change = state.level_change.clone();
307 let error_on_overflow = *state.error_on_overflow.borrow();
308 loop {
309 let level = state.data.borrow().len();
310 if level < state.capacity || error_on_overflow {
311 let value = rx.get()?.await;
312 state.push_value(value)?;
313 } else {
314 level_change.listen().await;
315 }
316 }
317}
318
319async fn run_tx<T>(tx: OutPort<T>, state: Rc<State<T>>) -> SimResult
320where
321 T: SimObject,
322{
323 let level_change = state.level_change.clone();
324 loop {
325 let level = state.data.borrow().len();
326 if level > 0 {
327 // Wait for something to actually want the store value
328 tx.try_put()?.await;
329 let value = state.pop_value()?;
330 tx.put(value)?.await;
331 } else {
332 level_change.listen().await;
333 }
334 }
335}