gwr_engine/port/
monitor.rs1use std::cell::RefCell;
9use std::rc::Rc;
10
11use async_trait::async_trait;
12use byte_unit::{Byte, Unit};
13use gwr_track::entity::{Entity, EntityMonitor};
14
15use crate::engine::Engine;
16use crate::time::clock::Clock;
17use crate::traits::{Runnable, SimObject};
18use crate::types::SimResult;
19
20pub struct Monitor {
21 entity: EntityMonitor,
22 clock: Clock,
23 window_size_ticks: u64,
24 bytes_in_window: RefCell<usize>,
25 bytes_total: RefCell<usize>,
26 last_time_ns: RefCell<f64>,
27 bw_unit: Unit,
28}
29
30impl Monitor {
31 #[must_use]
32 pub fn new_and_register(
33 engine: &Engine,
34 entity: &Rc<Entity>,
35 clock: &Clock,
36 window_size_ticks: u64,
37 ) -> Rc<Self> {
38 let bw_unit = Unit::GiB;
39 let bw_entity = EntityMonitor::new(entity, &format!("bw_{bw_unit}/s"));
40
41 let rc_self = Rc::new(Self {
42 entity: bw_entity,
43 clock: clock.clone(),
44 window_size_ticks,
45 bytes_in_window: RefCell::new(0),
46 bytes_total: RefCell::new(0),
47 last_time_ns: RefCell::new(clock.time_now_ns()),
48 bw_unit,
49 });
50
51 engine.register(rc_self.clone());
52 rc_self
53 }
54
55 pub fn sample<T>(&self, object: &T)
56 where
57 T: SimObject,
58 {
59 let object_bytes = object.total_bytes();
60 *self.bytes_in_window.borrow_mut() += object_bytes;
61 }
62
63 #[cfg(test)]
64 pub(crate) fn bytes_in_window(&self) -> usize {
65 *self.bytes_in_window.borrow()
66 }
67
68 #[cfg(test)]
69 pub(crate) fn bytes_total(&self) -> usize {
70 *self.bytes_total.borrow()
71 }
72
73 #[cfg(test)]
74 pub(crate) fn last_time_ns(&self) -> f64 {
75 *self.last_time_ns.borrow()
76 }
77}
78
79#[async_trait(?Send)]
80impl Runnable for Monitor {
81 async fn run(&self) -> SimResult {
82 loop {
84 self.clock.wait_ticks_or_exit(self.window_size_ticks).await;
85 let bytes_in_window = *self.bytes_in_window.borrow();
86 *self.bytes_in_window.borrow_mut() = 0;
87 *self.bytes_total.borrow_mut() += bytes_in_window;
88
89 let time_now_ns = self.clock.time_now_ns();
90 let window_duration_s =
91 (time_now_ns - *self.last_time_ns.borrow()) / (1000.0 * 1000.0 * 1000.0);
92
93 let per_second = Byte::from_f64(bytes_in_window as f64 / window_duration_s).unwrap();
94 let gib_per_second = per_second.get_adjusted_unit(self.bw_unit);
95
96 self.entity.track_value(gib_per_second.get_value());
97
98 *self.last_time_ns.borrow_mut() = time_now_ns;
99 }
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use std::mem::size_of;
106 use std::rc::Rc;
107
108 use gwr_track::entity::Entity;
109 use gwr_track::tracker::dev_null_tracker;
110
111 use super::*;
112
113 #[test]
114 fn new_and_register_initializes_monitor_and_sample_counts_bytes() {
115 let tracker = dev_null_tracker();
116 let mut engine = Engine::new(&tracker);
117 let clock = engine.default_clock();
118 let parent = engine.top().clone();
119 let entity = Rc::new(Entity::new(&parent, "port"));
120
121 let monitor = Monitor::new_and_register(&engine, &entity, &clock, 4);
122
123 assert_eq!(monitor.window_size_ticks, 4);
124 assert_eq!(monitor.bytes_in_window(), 0);
125 assert_eq!(monitor.bytes_total(), 0);
126 assert_eq!(monitor.last_time_ns(), 0.0);
127
128 monitor.sample(&123_i32);
129
130 assert_eq!(monitor.bytes_in_window(), size_of::<i32>());
131
132 {
133 let clock = clock.clone();
134 engine.spawn(async move {
135 clock.wait_ticks(4).await;
136 Ok(())
137 });
138 }
139
140 engine.run().unwrap();
141
142 assert_eq!(monitor.bytes_in_window(), 0);
143 assert_eq!(monitor.bytes_total(), size_of::<i32>());
144 }
145}