flaky_with_delay/
lib.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! This is an example component that delays data and also randomly drop data.
4//!
5//! This component uses the existing `Delay` and `Store` components.
6//!
7//! The `main.rs` in this folder shows how it can be used.
8//!
9//! # Ports
10//!
11//! This component has two ports
12//!  - One [input port](gwr_engine::port::InPort): `rx`
13//!  - One [output port](gwr_engine::port::OutPort): `tx`
14
15/// First need to `use` all types and traits that are used.
16use std::cell::RefCell;
17/// The `Rc` and `RefCell` libraries provide single-threaded sharing and
18/// mutating of state.
19use std::rc::Rc;
20
21use async_trait::async_trait;
22use gwr_components::delay::Delay;
23use gwr_components::store::Store;
24/// The gwr components crate provides many connectable building blocks.
25/// Component traits and types are provided along with the components
26/// themselves.
27use gwr_components::{connect_tx, port_rx, take_option};
28use gwr_engine::engine::Engine;
29/// The gwr engine core provides the traits and types required to be
30/// implemented by a component.
31use gwr_engine::port::{InPort, OutPort, PortStateResult};
32use gwr_engine::time::clock::Clock;
33use gwr_engine::traits::{Runnable, SimObject};
34use gwr_engine::types::{SimError, SimResult};
35use gwr_model_builder::{EntityDisplay, EntityGet};
36/// The gwr_track library provides tracing/logging features.
37use gwr_track::entity::Entity;
38use gwr_track::tracker::aka::Aka;
39use gwr_track::{build_aka, trace};
40/// Random library is just used by this component to implement its drop
41/// decisions.
42use rand::rngs::StdRng;
43use rand::{RngCore, SeedableRng};
44
45/// A struct containing configuration options for the component
46pub struct Config {
47    /// Ratio for how many packets are dropped (in the range [0, 1])
48    drop_ratio: f64,
49
50    /// Seed for random number generator
51    seed: u64,
52
53    /// Delay in clock ticks
54    delay_ticks: usize,
55}
56
57impl Config {
58    #[must_use]
59    pub fn new(drop_ratio: f64, seed: u64, delay_ticks: usize) -> Self {
60        assert!((0.0..=1.0).contains(&drop_ratio));
61        Self {
62            drop_ratio,
63            seed,
64            delay_ticks,
65        }
66    }
67}
68
69/// A component needs to support being cloned and also being printed for debug
70/// logging.
71///
72///
73/// The `EntityGet` automatically implements the `GetEntity` trait for this
74/// struct.
75///
76/// The `EntityDisply` automatically derives the `Display` trait as long as the
77/// struct contains the `entity`.
78#[derive(EntityGet, EntityDisplay)]
79pub struct Flaky<T>
80where
81    T: SimObject,
82{
83    /// Every component should include an Entity that defines where in the
84    /// overall simulation hierarchy it is. The Entity is also used to
85    /// filter logging.
86    entity: Rc<Entity>,
87
88    /// This component has an `rx` port that it uses to handle incoming data.
89    ///
90    /// It is placed within an `Option` so that it can be removed later
91    /// when the Engine is run.
92    rx: RefCell<Option<InPort<T>>>,
93
94    /// This component has an internal `tx` port is connected to the delay.
95    /// Any data that is not dropped is sent through this port.
96    ///
97    /// It is placed within an `Option` so that it can be removed later
98    /// when the Engine is run.
99    tx: RefCell<Option<OutPort<T>>>,
100
101    /// After the `Delay` data will be placed into a `Store` from where it can
102    /// be pulled and either passed on or dropped.
103    ///
104    /// It is again placed within an `Option` so that it can be removed later
105    /// when the Engine is run.
106    buffer: RefCell<Option<Rc<Store<T>>>>,
107
108    /// Store the ratio at which packets should be dropped.
109    drop_ratio: f64,
110
111    /// Random number generator used for deciding when to drop. Note that it is
112    /// wrapped in a `RefCell` which allows it to be used mutably in the `put()`
113    /// function despite the fact that the Inner will be immutable (`&self`
114    /// argument in the trait).
115    rng: RefCell<StdRng>,
116}
117
118/// The next thing to do is define the generic functions for the new component.
119impl<T> Flaky<T>
120where
121    T: SimObject,
122{
123    /// In this case, the `new_and_register()` function creates the component
124    /// from the parameters provided as well as registering the component
125    /// with the `Engine`.
126    pub fn new_and_register(
127        engine: &Engine,
128        clock: &Clock,
129        parent: &Rc<Entity>,
130        name: &str,
131        config: &Config,
132    ) -> Result<Rc<Self>, SimError> {
133        // In this case it simply uses the function that provides renaming features as
134        // well.
135        Self::new_and_register_with_renames(engine, clock, parent, name, None, config)
136    }
137
138    /// The `new_and_register_with_renames()` function does the work of creating
139    /// the component from the parameters provided, as well as registering the
140    /// component with the [Engine].
141    ///
142    /// This function adds the ability to provide multiple names for the `tx`
143    /// port which is actually just the mapped through to the internal buffer's
144    /// `tx` port.
145    pub fn new_and_register_with_renames(
146        engine: &Engine,
147        clock: &Clock,
148        parent: &Rc<Entity>,
149        name: &str,
150        aka: Option<&Aka>,
151        config: &Config,
152    ) -> Result<Rc<Self>, SimError> {
153        // The entity needs to be created first because this component will be the
154        // parent to the subcomponents.
155        let entity = Entity::new(parent, name);
156
157        // Because it is shared it needs to be wrapped in an Arc
158        let entity = Rc::new(entity);
159
160        let delay = Delay::new_and_register(engine, clock, &entity, "delay", config.delay_ticks)?;
161
162        // Build up a renaming that shows that this component's `tx` port is the same
163        // as the buffer's `tx` port and the user will be able to use either name.
164        let buffer_aka = build_aka!(aka, &entity, &[("tx", "tx")]);
165        let buffer = Store::new_and_register_with_renames(
166            engine,
167            clock,
168            &entity,
169            "buffer",
170            Some(&buffer_aka),
171            1,
172        )?;
173
174        delay.connect_port_tx(buffer.port_rx())?;
175
176        // Create an internal `tx` port and connect into the `delay` subcomponent
177        let mut tx = OutPort::new(&entity, "delay_tx");
178        tx.connect(delay.port_rx())?;
179
180        let rx = InPort::new(engine, clock, &entity, "rx");
181
182        // Finally, create the component
183        let rc_self = Rc::new(Self {
184            entity,
185            drop_ratio: config.drop_ratio,
186            rx: RefCell::new(Some(rx)),
187            tx: RefCell::new(Some(tx)),
188            buffer: RefCell::new(Some(buffer)),
189            rng: RefCell::new(StdRng::seed_from_u64(config.seed)),
190        });
191        engine.register(rc_self.clone());
192        Ok(rc_self)
193    }
194
195    /// This provides the `InPort` to which you can connect
196    pub fn port_rx(&self) -> PortStateResult<T> {
197        // The `port_rx!` macro is the most consise way to access the rx port state.
198        port_rx!(self.rx, state)
199    }
200
201    /// The ports of this component are effectively defined by the functions
202    /// this component exposes. In this case, the `connect_port_tx` shows
203    /// that this component has an `tx` port which should be connected to an
204    /// `rx` port.
205    ///
206    /// In this case the `tx` port is connected directly to the buffer's `tx`
207    /// port.
208    pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
209        // Because the State is immutable then we use the `connect_tx!` macro
210        // in order to simplify the setup.
211        connect_tx!(self.buffer, connect_port_tx ; port_state)
212    }
213
214    /// Return the next random u32
215    ///
216    /// This is wrapped in a separate function to hide the interior mutation
217    fn next_u32(&self) -> u32 {
218        self.rng.borrow_mut().next_u32()
219    }
220}
221
222#[async_trait(?Send)]
223impl<T> Runnable for Flaky<T>
224where
225    T: SimObject,
226{
227    /// Implement the active aspect of thie component
228    ///
229    /// The `run()` function launches any sub-components and then performs the
230    /// functionality of this component.
231    async fn run(&self) -> SimResult {
232        // Pull out the `rx` port so that it is owned in this function.
233        let rx = take_option!(self.rx);
234
235        // Pull out the internal `tx` port so that it is owned in this function.
236        let tx = take_option!(self.tx);
237
238        loop {
239            // Receive a value from the input
240            let value = rx.get()?.await;
241
242            let next_u32 = self.next_u32();
243            let ratio = next_u32 as f64 / u32::MAX as f64;
244            if ratio > self.drop_ratio {
245                // Only pass on a percentage of the data
246                tx.put(value)?.await;
247            } else {
248                // Let the user know this value has been dropped.
249                trace!(self.entity ; "drop {}", value);
250            }
251        }
252    }
253}