1use std::cell::RefCell;
14use std::rc::Rc;
15
16use async_trait::async_trait;
17use gwr_engine::engine::Engine;
18use gwr_engine::port::{OutPort, PortStateResult};
19use gwr_engine::traits::{Runnable, SimObject};
20use gwr_engine::types::{SimError, SimResult};
21use gwr_model_builder::{EntityDisplay, EntityGet};
22use gwr_track::entity::Entity;
23use gwr_track::tracker::aka::Aka;
24
25#[macro_export]
26macro_rules! option_box_repeat {
27 ($value:expr ; $repeat:expr) => {
28 Some(Box::new(std::iter::repeat($value).take($repeat)))
29 };
30}
31use crate::types::DataGenerator;
32use crate::{connect_tx, take_option};
33
34#[macro_export]
35macro_rules! option_box_chain {
36 ($value1:expr , $value2:expr) => {
37 Some(Box::new((*($value1.unwrap())).chain(*($value2.unwrap()))))
38 };
39}
40
41#[derive(EntityGet, EntityDisplay)]
42pub struct Source<T>
43where
44 T: SimObject,
45{
46 entity: Rc<Entity>,
47 data_generator: RefCell<Option<DataGenerator<T>>>,
48 tx: RefCell<Option<OutPort<T>>>,
49}
50
51impl<T> Source<T>
52where
53 T: SimObject,
54{
55 pub fn new_and_register_with_renames(
56 engine: &Engine,
57 parent: &Rc<Entity>,
58 name: &str,
59 aka: Option<&Aka>,
60 data_generator: Option<DataGenerator<T>>,
61 ) -> Result<Rc<Self>, SimError> {
62 let entity = Rc::new(Entity::new(parent, name));
63 let tx = OutPort::new_with_renames(&entity, "tx", aka);
64 let rc_self = Rc::new(Self {
65 entity,
66 data_generator: RefCell::new(data_generator),
67 tx: RefCell::new(Some(tx)),
68 });
69 engine.register(rc_self.clone());
70 Ok(rc_self)
71 }
72
73 pub fn new_and_register(
74 engine: &Engine,
75 parent: &Rc<Entity>,
76 name: &str,
77 data_generator: Option<DataGenerator<T>>,
78 ) -> Result<Rc<Self>, SimError> {
79 Self::new_and_register_with_renames(engine, parent, name, None, data_generator)
80 }
81
82 #[must_use]
83 pub fn entity(&self) -> &Rc<Entity> {
84 &self.entity
85 }
86
87 pub fn set_generator(&self, data_generator: Option<DataGenerator<T>>) {
88 *self.data_generator.borrow_mut() = data_generator;
89 }
90
91 pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
92 connect_tx!(self.tx, connect ; port_state)
93 }
94}
95
96#[async_trait(?Send)]
97impl<T> Runnable for Source<T>
98where
99 T: SimObject,
100{
101 async fn run(&self) -> SimResult {
102 let mut data_generator = match self.data_generator.borrow_mut().take() {
103 Some(data_generator) => data_generator,
104 None => return Ok(()),
105 };
106
107 let tx = take_option!(self.tx);
108 loop {
109 let value = data_generator.next();
110 if let Some(value) = value {
111 self.entity.track_exit(value.id());
112 tx.put(value)?.await;
113 } else {
114 break;
115 }
116 }
117 Ok(())
118 }
119}