sim_fabric/
source_sink_builder.rs1use std::rc::Rc;
6
7use gwr_components::sink::Sink;
8use gwr_components::source::Source;
9use gwr_engine::engine::Engine;
10use gwr_engine::time::clock::Clock;
11use gwr_models::data_frame::DataFrame;
12use gwr_models::fabric::FabricConfig;
13use rand::SeedableRng;
14use rand::seq::SliceRandom;
15use rand_xoshiro::Xoshiro256PlusPlus;
16
17use crate::frame_gen::{FrameGen, TrafficPattern};
18
19pub type Sources = Vec<Rc<Source<DataFrame>>>;
21pub type Sinks = Vec<Rc<Sink<DataFrame>>>;
22
23#[expect(clippy::too_many_arguments)]
24#[must_use]
25pub fn build_source_sinks(
26 engine: &mut Engine,
27 clock: &Clock,
28 config: &Rc<FabricConfig>,
29 traffic_pattern: TrafficPattern,
30 overhead_size_bytes: usize,
31 payload_size_bytes: usize,
32 num_send_frames: usize,
33 seed: u64,
34 num_active_sources: usize,
35) -> (Sources, Sinks, usize) {
36 let top = engine.top();
37
38 let num_ports = config.num_ports();
39 let mut total_expected_frames = 0;
40 let mut sources = Vec::with_capacity(num_ports);
41
42 let mut rng = Xoshiro256PlusPlus::seed_from_u64(seed);
43
44 let mut all_port_indices: Vec<usize> = config.port_indices().clone();
46 all_port_indices.shuffle(&mut rng);
47 let active_port_indices: Vec<usize> = all_port_indices
48 .into_iter()
49 .take(num_active_sources)
50 .collect();
51
52 let mut dest_indices: Vec<usize> = config.port_indices().clone();
54 dest_indices.shuffle(&mut rng);
55
56 let first_dest = dest_indices[0];
57
58 for (i, dest_index) in dest_indices.drain(..).enumerate() {
59 let source_index = config.port_indices()[i];
60
61 let config = config.clone();
62 let initial_dest_index = if traffic_pattern == TrafficPattern::AllToOne {
63 first_dest
64 } else {
65 dest_index
66 };
67
68 let num_frames_from_source = if active_port_indices.contains(&source_index) {
69 match traffic_pattern {
70 TrafficPattern::AllToOne | TrafficPattern::AllToAllFixed => {
72 if source_index == initial_dest_index {
73 0
74 } else {
75 num_send_frames
76 }
77 }
78 _ => num_send_frames,
80 }
81 } else {
82 0
83 };
84
85 total_expected_frames += num_frames_from_source;
86
87 let data_generator: std::option::Option<Box<dyn Iterator<Item = DataFrame>>> =
88 if active_port_indices.contains(&source_index) {
89 Some(Box::new(FrameGen::new(
90 top,
91 config,
92 source_index,
93 initial_dest_index,
94 traffic_pattern,
95 overhead_size_bytes,
96 payload_size_bytes,
97 num_send_frames,
98 seed,
99 )))
100 } else {
101 None
102 };
103 sources.push(
104 Source::new_and_register(
105 engine,
106 top,
107 &format!("source{source_index}"),
108 data_generator,
109 )
110 .unwrap(),
111 );
112 }
113
114 let sinks: Sinks = config
115 .port_indices()
116 .iter()
117 .map(|i| Sink::new_and_register(engine, clock, top, &format!("sink_{i}")).unwrap())
118 .collect();
119
120 (sources, sinks, total_expected_frames)
121}