gwr_components/
delay.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3//! A component that adds `delay_ticks` between receiving anything and sending
4//! it on to its output.
5//!
6//! The `Delay` can be configured such that it will return an error if the
7//! output is ever blocked. Otherwise it will implicitly assert back-pressure on
8//! the input.
9//!
10//! # Ports
11//!
12//! This component has the following ports:
13//!  - One [input port](gwr_engine::port::InPort): `rx`
14//!  - One [output port](gwr_engine::port::OutPort): `tx`
15
16//! # Function
17//!
18//! Fundamentally the [Delay]'s functionality is to:
19//!
20//! ```rust
21//! # use std::rc::Rc;
22//! # use async_trait::async_trait;
23//! # use gwr_engine::port::{InPort, OutPort};
24//! # use gwr_engine::sim_error;
25//! # use gwr_engine::time::clock::Clock;
26//! # use gwr_engine::traits::SimObject;
27//! # use gwr_engine::types::SimResult;
28//! # use gwr_track::entity::Entity;
29//! #
30//! # async fn run_tx<T>(
31//! #     entity: Rc<Entity>,
32//! #     tx: OutPort<T>,
33//! #     clock: &Clock,
34//! #     rx: InPort<T>,
35//! #     delay_ticks: u64,
36//! # ) -> SimResult
37//! # where
38//! #     T: SimObject,
39//! # {
40//! loop {
41//!     let value = rx.get()?.await;
42//!     clock.wait_ticks(delay_ticks).await;
43//!     tx.put(value)?.await;
44//! }
45//! # }
46//! ```
47//!
48//! However, the problem with this is that the input ends up being blocked if
49//! the output does not instantly consume the value. Therefore the [Delay] is
50//! actually split into two halves that manage the ports independently.
51//!
52//! ## Input
53//!
54//! A simplified view of how the input side works is:
55//!
56//! ```rust
57//! # use std::cell::RefCell;
58//! # use std::collections::VecDeque;
59//! # use std::rc::Rc;
60//! # use async_trait::async_trait;
61//! # use gwr_engine::events::repeated::Repeated;
62//! # use gwr_engine::port::{InPort, OutPort};
63//! # use gwr_engine::sim_error;
64//! # use gwr_engine::time::clock::{Clock, ClockTick};
65//! # use gwr_engine::traits::SimObject;
66//! # use gwr_engine::types::SimResult;
67//! # use gwr_track::entity::Entity;
68//! #
69//! # async fn run_rx<T>(
70//! #     entity: Rc<Entity>,
71//! #     rx: InPort<T>,
72//! #     clock: &Clock,
73//! #     pending: Rc<RefCell<VecDeque<(T, ClockTick)>>>,
74//! #     pending_changed: Repeated<usize>,
75//! #     delay_ticks: u64,
76//! # ) -> SimResult
77//! # where
78//! #     T: SimObject,
79//! # {
80//! loop {
81//!     // Receive value from input
82//!     let value = rx.get()?.await;
83//!
84//!     // Compute time at which it should leave Delay
85//!     let mut tick = clock.tick_now();
86//!     tick.set_tick(tick.tick() + delay_ticks as u64);
87//!
88//!     // Send to the output side
89//!     pending.borrow_mut().push_back((value, tick));
90//!
91//!     // Wake up output if required
92//!     pending_changed.notify()?;
93//! }
94//!  # }
95//! ```
96
97//!
98//! ## Output
99//!
100//! A simplified view of how the output side works is:
101//!
102//! ```rust
103//! # use std::cell::RefCell;
104//! # use std::collections::VecDeque;
105//! # use std::rc::Rc;
106//! # use async_trait::async_trait;
107//! # use gwr_engine::events::repeated::Repeated;
108//! # use gwr_engine::port::{InPort, OutPort};
109//! # use gwr_engine::sim_error;
110//! # use gwr_engine::time::clock::{Clock, ClockTick};
111//! # use gwr_engine::traits::{Event, SimObject};
112//! # use gwr_engine::types::SimResult;
113//! # use gwr_track::entity::Entity;
114//! #
115//! # async fn run_tx<T>(
116//! #     entity: Rc<Entity>,
117//! #     tx: OutPort<T>,
118//! #     clock: &Clock,
119//! #     pending: Rc<RefCell<VecDeque<(T, ClockTick)>>>,
120//! #     pending_changed: Repeated<usize>,
121//! # ) -> SimResult
122//! # where
123//! #     T: SimObject,
124//! # {
125//! loop {
126//!     // Get next value and tick at which to send value
127//!     if let Some((value, tick)) = pending.borrow_mut().pop_front() {
128//!         // Wait for correct time
129//!         let tick_now = clock.tick_now();
130//!         clock.wait_ticks(tick.tick() - tick_now.tick()).await;
131//!
132//!         // Send value
133//!         tx.put(value)?.await;
134//!     } else {
135//!         // Wait to be notified of new data
136//!         pending_changed.listen().await;
137//!     }
138//! }
139//! # }
140//! ```
141//!
142//! ## Using a [Delay]
143//!
144//! A [Delay] simply needs to be created with the latency through it and
145//! connected between components.
146//!
147//! ```rust
148//! # use std::cell::RefCell;
149//! # use std::rc::Rc;
150//! #
151//! # use gwr_components::delay::Delay;
152//! # use gwr_components::sink::Sink;
153//! # use gwr_components::source::Source;
154//! # use gwr_components::store::Store;
155//! # use gwr_components::{connect_port, option_box_repeat};
156//! # use gwr_engine::engine::Engine;
157//! # use gwr_engine::port::{InPort, OutPort};
158//! # use gwr_engine::run_simulation;
159//! # use gwr_engine::test_helpers::start_test;
160//! # use gwr_engine::time::clock::Clock;
161//! # use gwr_engine::traits::SimObject;
162//! # use gwr_engine::types::SimResult;
163//! # use gwr_track::entity::GetEntity;
164//! #
165//! # fn source_sink() -> SimResult {
166//! #     let mut engine = start_test(file!());
167//! #     let clock = engine.default_clock();
168//! #
169//! #     let delay_ticks = 3;
170//! #     let num_puts = delay_ticks * 10;
171//! #
172//! #     let top = engine.top();
173//! #     let to_send: Option<Box<dyn Iterator<Item = _>>> = option_box_repeat!(500 ; num_puts);
174//!     // Create the components
175//!     let source = Source::new_and_register(&engine, top, "source", to_send)?;
176//!     let delay = Delay::new_and_register(&engine, &clock, top, "delay", delay_ticks)?;
177//!     let sink = Sink::new_and_register(&engine, &clock, top, "sink")?;
178//!
179//!     // Connect the ports
180//!     connect_port!(source, tx => delay, rx)?;
181//!     connect_port!(delay, tx => sink, rx)?;
182//!
183//!     run_simulation!(engine);
184//! #
185//! #     let num_sunk = sink.num_sunk();
186//! #     assert_eq!(num_sunk, num_puts);
187//! #     Ok(())
188//! # }
189//! ```
190use std::cell::RefCell;
191use std::cmp::Ordering;
192use std::collections::VecDeque;
193use std::rc::Rc;
194
195use async_trait::async_trait;
196use gwr_engine::engine::Engine;
197use gwr_engine::events::repeated::Repeated;
198use gwr_engine::executor::Spawner;
199use gwr_engine::port::{InPort, OutPort, PortStateResult};
200use gwr_engine::sim_error;
201use gwr_engine::time::clock::{Clock, ClockTick};
202use gwr_engine::traits::{Event, Runnable, SimObject};
203use gwr_engine::types::{SimError, SimResult};
204use gwr_model_builder::{EntityDisplay, EntityGet};
205use gwr_track::entity::Entity;
206use gwr_track::tracker::aka::Aka;
207use gwr_track::{enter, exit};
208
209use crate::{connect_tx, port_rx, take_option};
210
211#[derive(EntityGet, EntityDisplay)]
212pub struct Delay<T>
213where
214    T: SimObject,
215{
216    entity: Rc<Entity>,
217    spawner: Spawner,
218    clock: Clock,
219    delay_ticks: RefCell<usize>,
220
221    rx: RefCell<Option<InPort<T>>>,
222    pending: Rc<RefCell<VecDeque<(T, ClockTick)>>>,
223    pending_changed: Repeated<()>,
224    output_changed: Repeated<()>,
225    tx: RefCell<Option<OutPort<T>>>,
226
227    error_on_output_stall: RefCell<bool>,
228}
229
230impl<T> Delay<T>
231where
232    T: SimObject,
233{
234    pub fn new_and_register_with_renames(
235        engine: &Engine,
236        clock: &Clock,
237        parent: &Rc<Entity>,
238        name: &str,
239        aka: Option<&Aka>,
240        delay_ticks: usize,
241    ) -> Result<Rc<Self>, SimError> {
242        let spawner = engine.spawner();
243        let entity = Rc::new(Entity::new(parent, name));
244        let tx = OutPort::new_with_renames(&entity, "tx", aka);
245        let rx = InPort::new_with_renames(engine, clock, &entity, "rx", aka);
246        let rc_self = Rc::new(Self {
247            entity,
248            spawner,
249            clock: clock.clone(),
250            delay_ticks: RefCell::new(delay_ticks),
251            rx: RefCell::new(Some(rx)),
252            pending: Rc::new(RefCell::new(VecDeque::new())),
253            pending_changed: Repeated::default(),
254            output_changed: Repeated::default(),
255            tx: RefCell::new(Some(tx)),
256            error_on_output_stall: RefCell::new(false),
257        });
258        engine.register(rc_self.clone());
259        Ok(rc_self)
260    }
261
262    pub fn new_and_register(
263        engine: &Engine,
264        clock: &Clock,
265        parent: &Rc<Entity>,
266        name: &str,
267        delay_ticks: usize,
268    ) -> Result<Rc<Self>, SimError> {
269        Self::new_and_register_with_renames(engine, clock, parent, name, None, delay_ticks)
270    }
271
272    pub fn set_error_on_output_stall(&self) {
273        *self.error_on_output_stall.borrow_mut() = true;
274    }
275
276    pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
277        connect_tx!(self.tx, connect ; port_state)
278    }
279
280    pub fn port_rx(&self) -> PortStateResult<T> {
281        port_rx!(self.rx, state)
282    }
283
284    pub fn set_delay(&self, delay_ticks: usize) -> SimResult {
285        if self.rx.borrow().is_none() {
286            return sim_error!(format!(
287                "{}: can't change the delay after the simulation has started",
288                self.entity
289            ));
290        }
291        *self.delay_ticks.borrow_mut() = delay_ticks;
292        Ok(())
293    }
294}
295
296#[async_trait(?Send)]
297impl<T> Runnable for Delay<T>
298where
299    T: SimObject,
300{
301    async fn run(&self) -> SimResult {
302        // Spawn the other end of the delay
303        let tx = take_option!(self.tx);
304
305        let entity = self.entity.clone();
306        let clock = self.clock.clone();
307        let pending = self.pending.clone();
308        let pending_changed = self.pending_changed.clone();
309        let output_changed = self.output_changed.clone();
310        let error_on_output_stall = *self.error_on_output_stall.borrow();
311        self.spawner.spawn(async move {
312            run_tx(
313                entity,
314                tx,
315                &clock,
316                pending,
317                pending_changed,
318                output_changed,
319                error_on_output_stall,
320            )
321            .await
322        });
323
324        let rx = take_option!(self.rx);
325        let delay_ticks = *self.delay_ticks.borrow();
326        loop {
327            let value = rx.get()?.await;
328            let value_id = value.id();
329            enter!(self.entity ; value_id);
330
331            let mut tick = self.clock.tick_now();
332            tick.set_tick(tick.tick() + delay_ticks as u64);
333
334            self.pending.borrow_mut().push_back((value, tick));
335            self.pending_changed.notify()?;
336
337            if delay_ticks > 0 && !*self.error_on_output_stall.borrow() {
338                // Enforce back-pressure by waiting until there is room in the pending queue
339                while self.pending.borrow().len() >= delay_ticks {
340                    self.output_changed.listen().await;
341                }
342            }
343        }
344    }
345}
346
347async fn run_tx<T>(
348    entity: Rc<Entity>,
349    tx: OutPort<T>,
350    clock: &Clock,
351    pending: Rc<RefCell<VecDeque<(T, ClockTick)>>>,
352    pending_changed: Repeated<()>,
353    output_changed: Repeated<()>,
354    error_on_output_stall: bool,
355) -> SimResult
356where
357    T: SimObject,
358{
359    loop {
360        let next = pending.borrow_mut().pop_front();
361
362        match next {
363            Some((value, tick)) => {
364                let tick_now = clock.tick_now();
365                match tick.cmp(&tick_now) {
366                    Ordering::Greater => {
367                        clock.wait_ticks(tick.tick() - tick_now.tick()).await;
368                    }
369                    Ordering::Less => {
370                        if error_on_output_stall {
371                            return sim_error!(format!("{entity} delay output stalled"));
372                        }
373                    }
374                    Ordering::Equal => {
375                        // Do nothing - no need to pause
376                    }
377                }
378
379                exit!(entity ; value.id());
380                tx.put(value)?.await;
381                output_changed.notify()?;
382            }
383            None => {
384                pending_changed.listen().await;
385            }
386        }
387    }
388}