flaky_with_delay/lib.rs
1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! This is an example component that delays data and also randomly drop data.
4//!
5//! This component uses the existing `Delay` and `Store` components.
6//!
7//! The `main.rs` in this folder shows how it can be used.
8//!
9//! # Ports
10//!
11//! This component has two ports
12//! - One [input port](gwr_engine::port::InPort): `rx`
13//! - One [output port](gwr_engine::port::OutPort): `tx`
14
15/// First need to `use` all types and traits that are used.
16use std::cell::RefCell;
17/// The `Rc` and `RefCell` libraries provide single-threaded sharing and
18/// mutating of state.
19use std::rc::Rc;
20
21use async_trait::async_trait;
22use gwr_components::delay::Delay;
23use gwr_components::store::Store;
24/// The gwr components crate provides many connectable building blocks.
25/// Component traits and types are provided along with the components
26/// themselves.
27use gwr_components::{connect_tx, port_rx, take_option};
28use gwr_engine::engine::Engine;
29/// The gwr engine core provides the traits and types required to be
30/// implemented by a component.
31use gwr_engine::port::{InPort, OutPort, PortStateResult};
32use gwr_engine::time::clock::Clock;
33use gwr_engine::traits::{Runnable, SimObject};
34use gwr_engine::types::{SimError, SimResult};
35use gwr_model_builder::{EntityDisplay, EntityGet};
36/// The gwr_track library provides tracing/logging features.
37use gwr_track::entity::Entity;
38use gwr_track::tracker::aka::Aka;
39use gwr_track::{build_aka, trace};
40/// Random library is just used by this component to implement its drop
41/// decisions.
42use rand::rngs::StdRng;
43use rand::{RngCore, SeedableRng};
44
45/// A struct containing configuration options for the component
46pub struct Config {
47 /// Ratio for how many packets are dropped (in the range [0, 1])
48 drop_ratio: f64,
49
50 /// Seed for random number generator
51 seed: u64,
52
53 /// Delay in clock ticks
54 delay_ticks: usize,
55}
56
57impl Config {
58 #[must_use]
59 pub fn new(drop_ratio: f64, seed: u64, delay_ticks: usize) -> Self {
60 assert!((0.0..=1.0).contains(&drop_ratio));
61 Self {
62 drop_ratio,
63 seed,
64 delay_ticks,
65 }
66 }
67}
68
69/// A component needs to support being cloned and also being printed for debug
70/// logging.
71///
72///
73/// The `EntityGet` automatically implements the `GetEntity` trait for this
74/// struct.
75///
76/// The `EntityDisply` automatically derives the `Display` trait as long as the
77/// struct contains the `entity`.
78#[derive(EntityGet, EntityDisplay)]
79pub struct Flaky<T>
80where
81 T: SimObject,
82{
83 /// Every component should include an Entity that defines where in the
84 /// overall simulation hierarchy it is. The Entity is also used to
85 /// filter logging.
86 entity: Rc<Entity>,
87
88 /// This component has an `rx` port that it uses to handle incoming data.
89 ///
90 /// It is placed within an `Option` so that it can be removed later
91 /// when the Engine is run.
92 rx: RefCell<Option<InPort<T>>>,
93
94 /// This component has an internal `tx` port is connected to the delay.
95 /// Any data that is not dropped is sent through this port.
96 ///
97 /// It is placed within an `Option` so that it can be removed later
98 /// when the Engine is run.
99 tx: RefCell<Option<OutPort<T>>>,
100
101 /// After the `Delay` data will be placed into a `Store` from where it can
102 /// be pulled and either passed on or dropped.
103 ///
104 /// It is again placed within an `Option` so that it can be removed later
105 /// when the Engine is run.
106 buffer: RefCell<Option<Rc<Store<T>>>>,
107
108 /// Store the ratio at which packets should be dropped.
109 drop_ratio: f64,
110
111 /// Random number generator used for deciding when to drop. Note that it is
112 /// wrapped in a `RefCell` which allows it to be used mutably in the `put()`
113 /// function despite the fact that the Inner will be immutable (`&self`
114 /// argument in the trait).
115 rng: RefCell<StdRng>,
116}
117
118/// The next thing to do is define the generic functions for the new component.
119impl<T> Flaky<T>
120where
121 T: SimObject,
122{
123 /// In this case, the `new_and_register()` function creates the component
124 /// from the parameters provided as well as registering the component
125 /// with the `Engine`.
126 pub fn new_and_register(
127 engine: &Engine,
128 clock: &Clock,
129 parent: &Rc<Entity>,
130 name: &str,
131 config: &Config,
132 ) -> Result<Rc<Self>, SimError> {
133 // In this case it simply uses the function that provides renaming features as
134 // well.
135 Self::new_and_register_with_renames(engine, clock, parent, name, None, config)
136 }
137
138 /// The `new_and_register_with_renames()` function does the work of creating
139 /// the component from the parameters provided, as well as registering the
140 /// component with the [Engine].
141 ///
142 /// This function adds the ability to provide multiple names for the `tx`
143 /// port which is actually just the mapped through to the internal buffer's
144 /// `tx` port.
145 pub fn new_and_register_with_renames(
146 engine: &Engine,
147 clock: &Clock,
148 parent: &Rc<Entity>,
149 name: &str,
150 aka: Option<&Aka>,
151 config: &Config,
152 ) -> Result<Rc<Self>, SimError> {
153 // The entity needs to be created first because this component will be the
154 // parent to the subcomponents.
155 let entity = Entity::new(parent, name);
156
157 // Because it is shared it needs to be wrapped in an Arc
158 let entity = Rc::new(entity);
159
160 let delay = Delay::new_and_register(engine, clock, &entity, "delay", config.delay_ticks)?;
161
162 // Build up a renaming that shows that this component's `tx` port is the same
163 // as the buffer's `tx` port and the user will be able to use either name.
164 let buffer_aka = build_aka!(aka, &entity, &[("tx", "tx")]);
165 let buffer = Store::new_and_register_with_renames(
166 engine,
167 clock,
168 &entity,
169 "buffer",
170 Some(&buffer_aka),
171 1,
172 )?;
173
174 delay.connect_port_tx(buffer.port_rx())?;
175
176 // Create an internal `tx` port and connect into the `delay` subcomponent
177 let mut tx = OutPort::new(&entity, "delay_tx");
178 tx.connect(delay.port_rx())?;
179
180 let rx = InPort::new(engine, clock, &entity, "rx");
181
182 // Finally, create the component
183 let rc_self = Rc::new(Self {
184 entity,
185 drop_ratio: config.drop_ratio,
186 rx: RefCell::new(Some(rx)),
187 tx: RefCell::new(Some(tx)),
188 buffer: RefCell::new(Some(buffer)),
189 rng: RefCell::new(StdRng::seed_from_u64(config.seed)),
190 });
191 engine.register(rc_self.clone());
192 Ok(rc_self)
193 }
194
195 /// This provides the `InPort` to which you can connect
196 pub fn port_rx(&self) -> PortStateResult<T> {
197 // The `port_rx!` macro is the most consise way to access the rx port state.
198 port_rx!(self.rx, state)
199 }
200
201 /// The ports of this component are effectively defined by the functions
202 /// this component exposes. In this case, the `connect_port_tx` shows
203 /// that this component has an `tx` port which should be connected to an
204 /// `rx` port.
205 ///
206 /// In this case the `tx` port is connected directly to the buffer's `tx`
207 /// port.
208 pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
209 // Because the State is immutable then we use the `connect_tx!` macro
210 // in order to simplify the setup.
211 connect_tx!(self.buffer, connect_port_tx ; port_state)
212 }
213
214 /// Return the next random u32
215 ///
216 /// This is wrapped in a separate function to hide the interior mutation
217 fn next_u32(&self) -> u32 {
218 self.rng.borrow_mut().next_u32()
219 }
220}
221
222#[async_trait(?Send)]
223impl<T> Runnable for Flaky<T>
224where
225 T: SimObject,
226{
227 /// Implement the active aspect of thie component
228 ///
229 /// The `run()` function launches any sub-components and then performs the
230 /// functionality of this component.
231 async fn run(&self) -> SimResult {
232 // Pull out the `rx` port so that it is owned in this function.
233 let rx = take_option!(self.rx);
234
235 // Pull out the internal `tx` port so that it is owned in this function.
236 let tx = take_option!(self.tx);
237
238 loop {
239 // Receive a value from the input
240 let value = rx.get()?.await;
241
242 let next_u32 = self.next_u32();
243 let ratio = next_u32 as f64 / u32::MAX as f64;
244 if ratio > self.drop_ratio {
245 // Only pass on a percentage of the data
246 tx.put(value)?.await;
247 } else {
248 // Let the user know this value has been dropped.
249 trace!(self.entity ; "drop {}", value);
250 }
251 }
252 }
253}