gwr_engine/port/
monitor.rs

1// Copyright (c) 2024 Graphcore Ltd. All rights reserved.
2
3//! Monitor for port
4//!
5//! This port monitor is used to track data travelling through the
6//! port and report bandwidth.
7
8use std::cell::RefCell;
9use std::rc::Rc;
10
11use async_trait::async_trait;
12use byte_unit::{Byte, Unit};
13use gwr_track::entity::{Entity, EntityMonitor};
14
15use crate::engine::Engine;
16use crate::time::clock::Clock;
17use crate::traits::{Runnable, SimObject};
18use crate::types::SimResult;
19
20pub struct Monitor {
21    entity: EntityMonitor,
22    clock: Clock,
23    window_size_ticks: u64,
24    bytes_in_window: RefCell<usize>,
25    bytes_total: RefCell<usize>,
26    last_time_ns: RefCell<f64>,
27    bw_unit: Unit,
28}
29
30impl Monitor {
31    #[must_use]
32    pub fn new_and_register(
33        engine: &Engine,
34        entity: &Rc<Entity>,
35        clock: &Clock,
36        window_size_ticks: u64,
37    ) -> Rc<Self> {
38        let bw_unit = Unit::GiB;
39        let bw_entity = EntityMonitor::new(entity, &format!("bw_{bw_unit}/s"));
40
41        let rc_self = Rc::new(Self {
42            entity: bw_entity,
43            clock: clock.clone(),
44            window_size_ticks,
45            bytes_in_window: RefCell::new(0),
46            bytes_total: RefCell::new(0),
47            last_time_ns: RefCell::new(clock.time_now_ns()),
48            bw_unit,
49        });
50
51        engine.register(rc_self.clone());
52        rc_self
53    }
54
55    pub fn sample<T>(&self, object: &T)
56    where
57        T: SimObject,
58    {
59        let object_bytes = object.total_bytes();
60        *self.bytes_in_window.borrow_mut() += object_bytes;
61    }
62}
63
64#[async_trait(?Send)]
65impl Runnable for Monitor {
66    async fn run(&self) -> SimResult {
67        // Drive the output
68        loop {
69            self.clock.wait_ticks_or_exit(self.window_size_ticks).await;
70            let bytes_in_window = *self.bytes_in_window.borrow();
71            *self.bytes_in_window.borrow_mut() = 0;
72            *self.bytes_total.borrow_mut() += bytes_in_window;
73
74            let time_now_ns = self.clock.time_now_ns();
75            let window_duration_s =
76                (time_now_ns - *self.last_time_ns.borrow()) / (1000.0 * 1000.0 * 1000.0);
77
78            let per_second = Byte::from_f64(bytes_in_window as f64 / window_duration_s).unwrap();
79            let gib_per_second = per_second.get_adjusted_unit(self.bw_unit);
80
81            self.entity.track_value(gib_per_second.get_value());
82
83            *self.last_time_ns.borrow_mut() = time_now_ns;
84        }
85    }
86}