flaky_with_delay/
lib.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! This is an example component that delays data and also randomly drop data.
4//!
5//! This component uses the existing `Delay` and `Store` components.
6//!
7//! The `main.rs` in this folder shows how it can be used.
8//!
9//! # Ports
10//!
11//! This component has two ports
12//!  - One [input port](gwr_engine::port::InPort): `rx`
13//!  - One [output port](gwr_engine::port::OutPort): `tx`
14
15/// First need to `use` all types and traits that are used.
16use std::cell::RefCell;
17/// The `Rc` and `RefCell` libraries provide single-threaded sharing and
18/// mutating of state.
19use std::rc::Rc;
20
21use async_trait::async_trait;
22use gwr_components::delay::Delay;
23use gwr_components::store::Store;
24/// The gwr components crate provides many connectable building blocks.
25/// Component traits and types are provided along with the components
26/// themselves.
27use gwr_components::{connect_tx, port_rx, take_option};
28use gwr_engine::engine::Engine;
29/// The gwr engine core provides the traits and types required to be
30/// implemented by a component.
31use gwr_engine::executor::Spawner;
32use gwr_engine::port::{InPort, OutPort, PortStateResult};
33use gwr_engine::time::clock::Clock;
34use gwr_engine::traits::{Runnable, SimObject};
35use gwr_engine::types::{SimError, SimResult};
36use gwr_model_builder::EntityDisplay;
37/// The gwr_track library provides tracing/logging features.
38use gwr_track::entity::Entity;
39use gwr_track::trace;
40/// Random library is just used by this component to implement its drop
41/// decisions.
42use rand::rngs::StdRng;
43use rand::{RngCore, SeedableRng};
44
45/// A struct containing configuration options for the component
46pub struct Config {
47    /// Ratio for how many packets are dropped (in the range [0, 1])
48    drop_ratio: f64,
49
50    /// Seed for random number generator
51    seed: u64,
52
53    /// Delay in clock ticks
54    delay_ticks: usize,
55}
56
57impl Config {
58    #[must_use]
59    pub fn new(drop_ratio: f64, seed: u64, delay_ticks: usize) -> Self {
60        assert!((0.0..=1.0).contains(&drop_ratio));
61        Self {
62            drop_ratio,
63            seed,
64            delay_ticks,
65        }
66    }
67}
68
69/// A component needs to support being cloned and also being printed for debug
70/// logging.
71///
72/// The `EntityDisply` automatically derives the `Display` trait as long as the
73/// struct contains the `entity`.
74#[derive(EntityDisplay)]
75pub struct Flaky<T>
76where
77    T: SimObject,
78{
79    /// Every component should include an Entity that defines where in the
80    /// overall simulation hierarchy it is. The Entity is also used to
81    /// filter logging.
82    pub entity: Rc<Entity>,
83
84    /// This component has an `rx` port that it uses to handle incoming data.
85    ///
86    /// It is placed within an `Option` so that it can be removed later
87    /// when the Engine is run.
88    rx: RefCell<Option<InPort<T>>>,
89
90    /// This component has an internal `tx` port is connected to the delay.
91    /// Any data that is not dropped is sent through this port.
92    ///
93    /// It is placed within an `Option` so that it can be removed later
94    /// when the Engine is run.
95    tx: RefCell<Option<OutPort<T>>>,
96
97    /// After the `Delay` data will be placed into a `Store` from where it can
98    /// be pulled and either passed on or dropped.
99    ///
100    /// It is again placed within an `Option` so that it can be removed later
101    /// when the Engine is run.
102    buffer: RefCell<Option<Rc<Store<T>>>>,
103
104    /// Store the ratio at which packets should be dropped.
105    drop_ratio: f64,
106
107    /// Random number generator used for deciding when to drop. Note that it is
108    /// wrapped in a `RefCell` which allows it to be used mutably in the `put()`
109    /// function despite the fact that the Inner will be immutable (`&self`
110    /// argument in the trait).
111    rng: RefCell<StdRng>,
112}
113
114/// The next thing to do is define the generic functions for the new component.
115impl<T> Flaky<T>
116where
117    T: SimObject,
118{
119    /// In this case, the `new_and_register()` function creates the component
120    /// from the parameters provided as well as registering the component
121    /// with the `Engine`.
122    pub fn new_and_register(
123        engine: &Engine,
124        parent: &Rc<Entity>,
125        name: &str,
126        clock: Clock,
127        spawner: Spawner,
128        config: &Config,
129    ) -> Result<Rc<Self>, SimError> {
130        // The entity needs to be created first because this component will be the
131        // parent to the subcomponents.
132        let entity = Entity::new(parent, name);
133
134        // Because it is shared it needs to be wrapped in an Arc
135        let entity = Rc::new(entity);
136
137        let delay = Delay::new_and_register(
138            engine,
139            &entity,
140            "delay",
141            clock,
142            spawner.clone(),
143            config.delay_ticks,
144        )?;
145        let buffer = Store::new_and_register(engine, &entity, "buffer", spawner, 1)?;
146
147        delay.connect_port_tx(buffer.port_rx())?;
148
149        // Create an internal `tx` port and connect into the `delay` subcomponent
150        let mut tx = OutPort::new(&entity, "delay_tx");
151        tx.connect(delay.port_rx())?;
152
153        let rx = InPort::new(&entity, "rx");
154
155        // Finally, create the component
156        let rc_self = Rc::new(Self {
157            entity,
158            drop_ratio: config.drop_ratio,
159            rx: RefCell::new(Some(rx)),
160            tx: RefCell::new(Some(tx)),
161            buffer: RefCell::new(Some(buffer)),
162            rng: RefCell::new(StdRng::seed_from_u64(config.seed)),
163        });
164        engine.register(rc_self.clone());
165        Ok(rc_self)
166    }
167
168    /// This provides the `InPort` to which you can connect
169    pub fn port_rx(&self) -> PortStateResult<T> {
170        // The `port_rx!` macro is the most consise way to access the rx port state.
171        port_rx!(self.rx, state)
172    }
173
174    /// The ports of this component are effectively defined by the functions
175    /// this component exposes. In this case, the `connect_port_tx` shows
176    /// that this component has an `tx` port which should be connected to an
177    /// `rx` port.
178    ///
179    /// In this case the `tx` port is connected directly to the buffer's `tx`
180    /// port.
181    pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
182        // Because the State is immutable then we use the `connect_tx!` macro
183        // in order to simplify the setup.
184        connect_tx!(self.buffer, connect_port_tx ; port_state)
185    }
186
187    /// Return the next random u32
188    ///
189    /// This is wrapped in a separate function to hide the interior mutation
190    fn next_u32(&self) -> u32 {
191        self.rng.borrow_mut().next_u32()
192    }
193}
194
195#[async_trait(?Send)]
196impl<T> Runnable for Flaky<T>
197where
198    T: SimObject,
199{
200    /// Implement the active aspect of thie component
201    ///
202    /// The `run()` function launches any sub-components and then performs the
203    /// functionality of this component.
204    async fn run(&self) -> SimResult {
205        // Pull out the `rx` port so that it is owned in this function.
206        let rx = take_option!(self.rx);
207
208        // Pull out the internal `tx` port so that it is owned in this function.
209        let tx = take_option!(self.tx);
210
211        loop {
212            // Receive a value from the input
213            let value = rx.get()?.await;
214
215            let next_u32 = self.next_u32();
216            let ratio = next_u32 as f64 / u32::MAX as f64;
217            if ratio > self.drop_ratio {
218                // Only pass on a percentage of the data
219                tx.put(value)?.await;
220            } else {
221                // Let the user know this value has been dropped.
222                trace!(self.entity ; "drop {}", value);
223            }
224        }
225    }
226}