gwr_engine/port/
monitor.rs

1// Copyright (c) 2024 Graphcore Ltd. All rights reserved.
2
3//! Monitor for port
4//!
5//! This port monitor is used to track data travelling through the
6//! port and report bandwidth.
7
8use std::cell::RefCell;
9use std::rc::Rc;
10
11use async_trait::async_trait;
12use byte_unit::{Byte, Unit};
13use gwr_track::entity::Entity;
14use gwr_track::tracker::types::ReqType;
15use gwr_track::{create, value};
16
17use crate::engine::Engine;
18use crate::time::clock::Clock;
19use crate::traits::{Runnable, SimObject};
20use crate::types::SimResult;
21
22pub struct Monitor {
23    pub bw_entity: Rc<Entity>,
24    clock: Clock,
25    window_size_ticks: u64,
26    bytes_in_window: RefCell<usize>,
27    bytes_total: RefCell<usize>,
28    last_time_ns: RefCell<f64>,
29    bw_unit: Unit,
30}
31
32impl Monitor {
33    #[must_use]
34    pub fn new_and_register(
35        engine: &Engine,
36        entity: &Rc<Entity>,
37        clock: &Clock,
38        window_size_ticks: u64,
39    ) -> Rc<Self> {
40        let bw_unit = Unit::GiB;
41        let bw_entity = Entity::new_without_create(entity, &format!("bw_{bw_unit}/s"));
42
43        // Need to use custom create! in order to trackers know the data type.
44        create!(entity ; bw_entity, 0, ReqType::Value as i8);
45
46        let rc_self = Rc::new(Self {
47            bw_entity: Rc::new(bw_entity),
48            clock: clock.clone(),
49            window_size_ticks,
50            bytes_in_window: RefCell::new(0),
51            bytes_total: RefCell::new(0),
52            last_time_ns: RefCell::new(clock.time_now_ns()),
53            bw_unit,
54        });
55
56        engine.register(rc_self.clone());
57        rc_self
58    }
59
60    pub fn sample<T>(&self, object: &T)
61    where
62        T: SimObject,
63    {
64        let object_bytes = object.total_bytes();
65        *self.bytes_in_window.borrow_mut() += object_bytes;
66    }
67}
68
69#[async_trait(?Send)]
70impl Runnable for Monitor {
71    async fn run(&self) -> SimResult {
72        // Drive the output
73        loop {
74            self.clock.wait_ticks_or_exit(self.window_size_ticks).await;
75            let bytes_in_window = *self.bytes_in_window.borrow();
76            *self.bytes_in_window.borrow_mut() = 0;
77            *self.bytes_total.borrow_mut() += bytes_in_window;
78
79            let time_now_ns = self.clock.time_now_ns();
80            let window_duration_s =
81                (time_now_ns - *self.last_time_ns.borrow()) / (1000.0 * 1000.0 * 1000.0);
82
83            let per_second = Byte::from_f64(bytes_in_window as f64 / window_duration_s).unwrap();
84            let gib_per_second = per_second.get_adjusted_unit(self.bw_unit);
85
86            value!(self.bw_entity ; gib_per_second.get_value());
87
88            *self.last_time_ns.borrow_mut() = time_now_ns;
89        }
90    }
91}