1use std::cell::RefCell;
14use std::rc::Rc;
15
16use async_trait::async_trait;
17use gwr_engine::engine::Engine;
18use gwr_engine::port::{OutPort, PortStateResult};
19use gwr_engine::traits::{Runnable, SimObject};
20use gwr_engine::types::{SimError, SimResult};
21use gwr_model_builder::{EntityDisplay, EntityGet};
22use gwr_track::entity::Entity;
23use gwr_track::exit;
24use gwr_track::tracker::aka::Aka;
25
26#[macro_export]
27macro_rules! option_box_repeat {
28 ($value:expr ; $repeat:expr) => {
29 Some(Box::new(std::iter::repeat($value).take($repeat)))
30 };
31}
32use crate::types::DataGenerator;
33use crate::{connect_tx, take_option};
34
35#[macro_export]
36macro_rules! option_box_chain {
37 ($value1:expr , $value2:expr) => {
38 Some(Box::new((*($value1.unwrap())).chain(*($value2.unwrap()))))
39 };
40}
41
42#[derive(EntityGet, EntityDisplay)]
43pub struct Source<T>
44where
45 T: SimObject,
46{
47 entity: Rc<Entity>,
48 data_generator: RefCell<Option<DataGenerator<T>>>,
49 tx: RefCell<Option<OutPort<T>>>,
50}
51
52impl<T> Source<T>
53where
54 T: SimObject,
55{
56 pub fn new_and_register_with_renames(
57 engine: &Engine,
58 parent: &Rc<Entity>,
59 name: &str,
60 aka: Option<&Aka>,
61 data_generator: Option<DataGenerator<T>>,
62 ) -> Result<Rc<Self>, SimError> {
63 let entity = Rc::new(Entity::new(parent, name));
64 let tx = OutPort::new_with_renames(&entity, "tx", aka);
65 let rc_self = Rc::new(Self {
66 entity,
67 data_generator: RefCell::new(data_generator),
68 tx: RefCell::new(Some(tx)),
69 });
70 engine.register(rc_self.clone());
71 Ok(rc_self)
72 }
73
74 pub fn new_and_register(
75 engine: &Engine,
76 parent: &Rc<Entity>,
77 name: &str,
78 data_generator: Option<DataGenerator<T>>,
79 ) -> Result<Rc<Self>, SimError> {
80 Self::new_and_register_with_renames(engine, parent, name, None, data_generator)
81 }
82
83 #[must_use]
84 pub fn entity(&self) -> &Rc<Entity> {
85 &self.entity
86 }
87
88 pub fn set_generator(&self, data_generator: Option<DataGenerator<T>>) {
89 *self.data_generator.borrow_mut() = data_generator;
90 }
91
92 pub fn connect_port_tx(&self, port_state: PortStateResult<T>) -> SimResult {
93 connect_tx!(self.tx, connect ; port_state)
94 }
95}
96
97#[async_trait(?Send)]
98impl<T> Runnable for Source<T>
99where
100 T: SimObject,
101{
102 async fn run(&self) -> SimResult {
103 let mut data_generator = match self.data_generator.borrow_mut().take() {
104 Some(data_generator) => data_generator,
105 None => return Ok(()),
106 };
107
108 let tx = take_option!(self.tx);
109 loop {
110 let value = data_generator.next();
111 if let Some(value) = value {
112 exit!(self.entity ; value.id());
113 tx.put(value)?.await;
114 } else {
115 break;
116 }
117 }
118 Ok(())
119 }
120}