gwr_engine/port/
monitor.rs

1// Copyright (c) 2024 Graphcore Ltd. All rights reserved.
2
3//! Monitor for port
4//!
5//! This port monitor is used to track data travelling through the
6//! port and report bandwidth.
7
8use std::cell::RefCell;
9use std::rc::Rc;
10
11use async_trait::async_trait;
12use byte_unit::{Byte, Unit};
13use gwr_track::entity::{Entity, EntityMonitor};
14
15use crate::engine::Engine;
16use crate::time::clock::Clock;
17use crate::traits::{Runnable, SimObject};
18use crate::types::SimResult;
19
20pub struct Monitor {
21    entity: EntityMonitor,
22    clock: Clock,
23    window_size_ticks: u64,
24    bytes_in_window: RefCell<usize>,
25    bytes_total: RefCell<usize>,
26    last_time_ns: RefCell<f64>,
27    bw_unit: Unit,
28}
29
30impl Monitor {
31    #[must_use]
32    pub fn new_and_register(
33        engine: &Engine,
34        entity: &Rc<Entity>,
35        clock: &Clock,
36        window_size_ticks: u64,
37    ) -> Rc<Self> {
38        let bw_unit = Unit::GiB;
39        let bw_entity = EntityMonitor::new(entity, &format!("bw_{bw_unit}/s"));
40
41        let rc_self = Rc::new(Self {
42            entity: bw_entity,
43            clock: clock.clone(),
44            window_size_ticks,
45            bytes_in_window: RefCell::new(0),
46            bytes_total: RefCell::new(0),
47            last_time_ns: RefCell::new(clock.time_now_ns()),
48            bw_unit,
49        });
50
51        engine.register(rc_self.clone());
52        rc_self
53    }
54
55    pub fn sample<T>(&self, object: &T)
56    where
57        T: SimObject,
58    {
59        let object_bytes = object.total_bytes();
60        *self.bytes_in_window.borrow_mut() += object_bytes;
61    }
62
63    #[cfg(test)]
64    pub(crate) fn bytes_in_window(&self) -> usize {
65        *self.bytes_in_window.borrow()
66    }
67
68    #[cfg(test)]
69    pub(crate) fn bytes_total(&self) -> usize {
70        *self.bytes_total.borrow()
71    }
72
73    #[cfg(test)]
74    pub(crate) fn last_time_ns(&self) -> f64 {
75        *self.last_time_ns.borrow()
76    }
77}
78
79#[async_trait(?Send)]
80impl Runnable for Monitor {
81    async fn run(&self) -> SimResult {
82        // Drive the output
83        loop {
84            self.clock.wait_ticks_or_exit(self.window_size_ticks).await;
85            let bytes_in_window = *self.bytes_in_window.borrow();
86            *self.bytes_in_window.borrow_mut() = 0;
87            *self.bytes_total.borrow_mut() += bytes_in_window;
88
89            let time_now_ns = self.clock.time_now_ns();
90            let window_duration_s =
91                (time_now_ns - *self.last_time_ns.borrow()) / (1000.0 * 1000.0 * 1000.0);
92
93            let per_second = Byte::from_f64(bytes_in_window as f64 / window_duration_s).unwrap();
94            let gib_per_second = per_second.get_adjusted_unit(self.bw_unit);
95
96            self.entity.track_value(gib_per_second.get_value());
97
98            *self.last_time_ns.borrow_mut() = time_now_ns;
99        }
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use std::mem::size_of;
106    use std::rc::Rc;
107
108    use gwr_track::entity::Entity;
109    use gwr_track::tracker::dev_null_tracker;
110
111    use super::*;
112
113    #[test]
114    fn new_and_register_initializes_monitor_and_sample_counts_bytes() {
115        let tracker = dev_null_tracker();
116        let mut engine = Engine::new(&tracker);
117        let clock = engine.default_clock();
118        let parent = engine.top().clone();
119        let entity = Rc::new(Entity::new(&parent, "port"));
120
121        let monitor = Monitor::new_and_register(&engine, &entity, &clock, 4);
122
123        assert_eq!(monitor.window_size_ticks, 4);
124        assert_eq!(monitor.bytes_in_window(), 0);
125        assert_eq!(monitor.bytes_total(), 0);
126        assert_eq!(monitor.last_time_ns(), 0.0);
127
128        monitor.sample(&123_i32);
129
130        assert_eq!(monitor.bytes_in_window(), size_of::<i32>());
131
132        {
133            let clock = clock.clone();
134            engine.spawn(async move {
135                clock.wait_ticks(4).await;
136                Ok(())
137            });
138        }
139
140        engine.run().unwrap();
141
142        assert_eq!(monitor.bytes_in_window(), 0);
143        assert_eq!(monitor.bytes_total(), size_of::<i32>());
144    }
145}