flaky_with_delay/lib.rs
1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! This is an example component that delays data and also randomly drop data.
4//!
5//! This component uses the existing `Delay` and `Store` components.
6//!
7//! The `main.rs` in this folder shows how it can be used.
8//!
9//! # Ports
10//!
11//! This component has two ports
12//! - One [input port](gwr_engine::port::InPort): `rx`
13//! - One [output port](gwr_engine::port::OutPort): `tx`
14
15/// First need to `use` all types and traits that are used.
16use std::cell::RefCell;
17/// The `Rc` and `RefCell` libraries provide single-threaded sharing and
18/// mutating of state.
19use std::rc::Rc;
20
21use async_trait::async_trait;
22use gwr_components::delay::Delay;
23use gwr_components::store::Store;
24/// The gwr components crate provides many connectable building blocks.
25/// Component traits and types are provided along with the components
26/// themselves.
27use gwr_components::{connect_tx, port_rx, take_option};
28use gwr_engine::engine::Engine;
29/// The gwr engine core provides the traits and types required to be
30/// implemented by a component.
31use gwr_engine::executor::Spawner;
32use gwr_engine::port::{InPort, OutPort, PortStateResult};
33use gwr_engine::time::clock::Clock;
34use gwr_engine::traits::{Runnable, SimObject};
35use gwr_engine::types::{SimError, SimResult};
36use gwr_model_builder::EntityDisplay;
37/// The gwr_track library provides tracing/logging features.
38use gwr_track::entity::Entity;
39use gwr_track::trace;
40/// Random library is just used by this component to implement its drop
41/// decisions.
42use rand::rngs::StdRng;
43use rand::{RngCore, SeedableRng};
44
45/// A struct containing configuration options for the component
46pub struct Config {
47 /// Ratio for how many packets are dropped (in the range [0, 1])
48 drop_ratio: f64,
49
50 /// Seed for random number generator
51 seed: u64,
52
53 /// Delay in clock ticks
54 delay_ticks: usize,
55}
56
57impl Config {
58 #[must_use]
59 pub fn new(drop_ratio: f64, seed: u64, delay_ticks: usize) -> Self {
60 assert!((0.0..=1.0).contains(&drop_ratio));
61 Self {
62 drop_ratio,
63 seed,
64 delay_ticks,
65 }
66 }
67}
68
69/// A component needs to support being cloned and also being printed for debug
70/// logging.
71///
72/// The `EntityDisply` automatically derives the `Display` trait as long as the
73/// struct contains the `entity`.
74#[derive(EntityDisplay)]
75pub struct Flaky<T>
76where
77 T: SimObject,
78{
79 /// Every component should include an Entity that defines where in the
80 /// overall simulation hierarchy it is. The Entity is also used to
81 /// filter logging.
82 pub entity: Rc<Entity>,
83
84 /// This component has an `rx` port that it uses to handle incoming data.
85 ///
86 /// It is placed within an `Option` so that it can be removed later
87 /// when the Engine is run.
88 rx: RefCell<Option<InPort<T>>>,
89
90 /// This component has an internal `tx` port is connected to the delay.
91 /// Any data that is not dropped is sent through this port.
92 ///
93 /// It is placed within an `Option` so that it can be removed later
94 /// when the Engine is run.
95 tx: RefCell<Option<OutPort<T>>>,
96
97 /// After the `Delay` data will be placed into a `Store` from where it can
98 /// be pulled and either passed on or dropped.
99 ///
100 /// It is again placed within an `Option` so that it can be removed later
101 /// when the Engine is run.
102 buffer: RefCell<Option<Rc<Store<T>>>>,
103
104 /// Store the ratio at which packets should be dropped.
105 drop_ratio: f64,
106
107 /// Random number generator used for deciding when to drop. Note that it is
108 /// wrapped in a `RefCell` which allows it to be used mutably in the `put()`
109 /// function despite the fact that the Inner will be immutable (`&self`
110 /// argument in the trait).
111 rng: RefCell<StdRng>,
112}
113
114/// The next thing to do is define the generic functions for the new component.
115impl<T> Flaky<T>
116where
117 T: SimObject,
118{
119 /// In this case, the `new_and_register()` function creates the component
120 /// from the parameters provided as well as registering the component
121 /// with the `Engine`.
122 pub fn new_and_register(
123 engine: &Engine,
124 parent: &Rc<Entity>,
125 name: &str,
126 clock: Clock,
127 spawner: Spawner,
128 config: &Config,
129 ) -> Result<Rc<Self>, SimError> {
130 // The entity needs to be created first because this component will be the
131 // parent to the subcomponents.
132 let entity = Entity::new(parent, name);
133
134 // Because it is shared it needs to be wrapped in an Arc
135 let entity = Rc::new(entity);
136
137 let delay = Delay::new_and_register(
138 engine,
139 &entity,
140 "delay",
141 clock,
142 spawner.clone(),
143 config.delay_ticks,
144 )?;
145 let buffer = Store::new_and_register(engine, &entity, "buffer", spawner, 1)?;
146
147 delay.connect_port_tx(buffer.port_rx())?;
148
149 // Create an internal `tx` port and connect into the `delay` subcomponent
150 let mut tx = OutPort::new(&entity, "delay_tx");
151 tx.connect(delay.port_rx())?;
152
153 let rx = InPort::new(&entity, "rx");
154
155 // Finally, create the component
156 let rc_self = Rc::new(Self {
157 entity,
158 drop_ratio: config.drop_ratio,
159 rx: RefCell::new(Some(rx)),
160 tx: RefCell::new(Some(tx)),
161 buffer: RefCell::new(Some(buffer)),
162 rng: RefCell::new(StdRng::seed_from_u64(config.seed)),
163 });
164 engine.register(rc_self.clone());
165 Ok(rc_self)
166 }
167
168 /// This provides the `InPort` to which you can connect
169 pub fn port_rx(&self) -> PortStateResult<T> {
170 // The `port_rx!` macro is the most consise way to access the rx port state.
171 port_rx!(self.rx, state)
172 }
173
174 /// The ports of this component are effectively defined by the functions
175 /// this component exposes. In this case, the `connect_port_tx` shows
176 /// that this component has an `tx` port which should be connected to an
177 /// `rx` port.
178 ///
179 /// In this case the `tx` port is connected directly to the buffer's `tx`
180 /// port.
181 pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
182 // Because the State is immutable then we use the `connect_tx!` macro
183 // in order to simplify the setup.
184 connect_tx!(self.buffer, connect_port_tx ; port_state)
185 }
186
187 /// Return the next random u32
188 ///
189 /// This is wrapped in a separate function to hide the interior mutation
190 fn next_u32(&self) -> u32 {
191 self.rng.borrow_mut().next_u32()
192 }
193}
194
195#[async_trait(?Send)]
196impl<T> Runnable for Flaky<T>
197where
198 T: SimObject,
199{
200 /// Implement the active aspect of thie component
201 ///
202 /// The `run()` function launches any sub-components and then performs the
203 /// functionality of this component.
204 async fn run(&self) -> SimResult {
205 // Pull out the `rx` port so that it is owned in this function.
206 let rx = take_option!(self.rx);
207
208 // Pull out the internal `tx` port so that it is owned in this function.
209 let tx = take_option!(self.tx);
210
211 loop {
212 // Receive a value from the input
213 let value = rx.get()?.await;
214
215 let next_u32 = self.next_u32();
216 let ratio = next_u32 as f64 / u32::MAX as f64;
217 if ratio > self.drop_ratio {
218 // Only pass on a percentage of the data
219 tx.put(value)?.await;
220 } else {
221 // Let the user know this value has been dropped.
222 trace!(self.entity ; "drop {}", value);
223 }
224 }
225 }
226}