sim_fabric/
source_sink_builder.rs1use std::rc::Rc;
6
7use gwr_components::sink::Sink;
8use gwr_components::source::Source;
9use gwr_engine::engine::Engine;
10use gwr_models::data_frame::DataFrame;
11use gwr_models::fabric::FabricConfig;
12use rand::SeedableRng;
13use rand::seq::SliceRandom;
14use rand_xoshiro::Xoshiro256PlusPlus;
15
16use crate::frame_gen::{FrameGen, TrafficPattern};
17
18pub type Sources = Vec<Rc<Source<DataFrame>>>;
20pub type Sinks = Vec<Rc<Sink<DataFrame>>>;
21
22#[expect(clippy::too_many_arguments)]
23#[must_use]
24pub fn build_source_sinks(
25 engine: &mut Engine,
26 config: &Rc<FabricConfig>,
27 traffic_pattern: TrafficPattern,
28 overhead_size_bytes: usize,
29 payload_size_bytes: usize,
30 num_send_frames: usize,
31 seed: u64,
32 num_active_sources: usize,
33) -> (Sources, Sinks, usize) {
34 let top = engine.top();
35
36 let num_ports = config.num_ports();
37 let mut total_expected_frames = 0;
38 let mut sources = Vec::with_capacity(num_ports);
39
40 let mut rng = Xoshiro256PlusPlus::seed_from_u64(seed);
41
42 let mut all_port_indices: Vec<usize> = config.port_indices().clone();
44 all_port_indices.shuffle(&mut rng);
45 let active_port_indices: Vec<usize> = all_port_indices
46 .into_iter()
47 .take(num_active_sources)
48 .collect();
49
50 let mut dest_indices: Vec<usize> = config.port_indices().clone();
52 dest_indices.shuffle(&mut rng);
53
54 let first_dest = dest_indices[0];
55
56 for (i, dest_index) in dest_indices.drain(..).enumerate() {
57 let source_index = config.port_indices()[i];
58
59 let config = config.clone();
60 let initial_dest_index = if traffic_pattern == TrafficPattern::AllToOne {
61 first_dest
62 } else {
63 dest_index
64 };
65
66 let num_frames_from_source = if active_port_indices.contains(&source_index) {
67 match traffic_pattern {
68 TrafficPattern::AllToOne | TrafficPattern::AllToAllFixed => {
70 if source_index == initial_dest_index {
71 0
72 } else {
73 num_send_frames
74 }
75 }
76 _ => num_send_frames,
78 }
79 } else {
80 0
81 };
82
83 total_expected_frames += num_frames_from_source;
84
85 let data_generator: std::option::Option<Box<dyn Iterator<Item = DataFrame>>> =
86 if active_port_indices.contains(&source_index) {
87 Some(Box::new(FrameGen::new(
88 top,
89 config,
90 source_index,
91 initial_dest_index,
92 traffic_pattern,
93 overhead_size_bytes,
94 payload_size_bytes,
95 num_send_frames,
96 seed,
97 )))
98 } else {
99 None
100 };
101 sources.push(
102 Source::new_and_register(
103 engine,
104 top,
105 format!("source{source_index}").as_str(),
106 data_generator,
107 )
108 .unwrap(),
109 );
110 }
111
112 let sinks: Sinks = config
113 .port_indices()
114 .iter()
115 .map(|i| Sink::new_and_register(engine, top, format!("sink{i}").as_str()).unwrap())
116 .collect();
117
118 (sources, sinks, total_expected_frames)
119}