gwr_engine/
executor.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::pin::Pin;
6use std::rc::Rc;
7use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
8
9use gwr_track::entity::Entity;
10
11use crate::time::clock::Clock;
12use crate::time::simtime::SimTime;
13use crate::types::SimResult;
14
15fn no_op(_: *const ()) {}
16
17fn task_raw_waker(task: Rc<Task>) -> RawWaker {
18    let vtable = &RawWakerVTable::new(clone_raw_waker, wake_task, no_op, no_op);
19    let ptr = Rc::into_raw(task) as *const ();
20    RawWaker::new(ptr, vtable)
21}
22
23fn waker_for_task(task: Rc<Task>) -> Waker {
24    unsafe { Waker::from_raw(task_raw_waker(task)) }
25}
26
27unsafe fn clone_raw_waker(data: *const ()) -> RawWaker {
28    unsafe {
29        // Tasks are always wrapped in a reference counter to allow them to be shared
30        // read-only.
31        let rc_task = Rc::from_raw(data as *const Task);
32        let clone = rc_task.clone();
33        let vtable = &RawWakerVTable::new(clone_raw_waker, wake_task, no_op, no_op);
34        let ptr = Rc::into_raw(clone) as *const ();
35        RawWaker::new(ptr, vtable)
36    }
37}
38
39unsafe fn wake_task(data: *const ()) {
40    unsafe {
41        // Tasks are always wrapped in a reference counter to allow them to be shared
42        // read-only.
43        let rc_task = Rc::from_raw(data as *const Task);
44        let cloned = rc_task.clone();
45        rc_task.executor_state.new_tasks.borrow_mut().push(cloned);
46    }
47}
48
49struct Task {
50    future: RefCell<Pin<Box<dyn Future<Output = SimResult>>>>,
51    executor_state: Rc<ExecutorState>,
52}
53
54impl Task {
55    pub fn new(
56        future: impl Future<Output = SimResult> + 'static,
57        executor_state: Rc<ExecutorState>,
58    ) -> Task {
59        Task {
60            future: RefCell::new(Box::pin(future)),
61            executor_state,
62        }
63    }
64
65    fn poll(&self, context: &mut Context) -> Poll<SimResult> {
66        self.future.borrow_mut().as_mut().poll(context)
67    }
68}
69
70struct ExecutorState {
71    task_queue: RefCell<Vec<Rc<Task>>>,
72    new_tasks: RefCell<Vec<Rc<Task>>>,
73    time: RefCell<SimTime>,
74}
75
76impl ExecutorState {
77    pub fn new(top: &Rc<Entity>) -> Self {
78        Self {
79            task_queue: RefCell::new(Vec::new()),
80            new_tasks: RefCell::new(Vec::new()),
81            time: RefCell::new(SimTime::new(top)),
82        }
83    }
84}
85
86/// Single-threaded executor
87///
88/// This is a thin-wrapper (using [`Rc`]) around the real executor, so that this
89/// struct can be cloned and passed around.
90///
91/// See the [module documentation] for more details.
92///
93/// [module documentation]: index.html
94#[derive(Clone)]
95pub struct Executor {
96    pub entity: Rc<Entity>,
97    state: Rc<ExecutorState>,
98}
99
100impl Executor {
101    pub fn run(&self, finished: &Rc<RefCell<bool>>) -> SimResult {
102        loop {
103            self.step(finished)?;
104            if *finished.borrow() {
105                break;
106            }
107
108            if self.state.new_tasks.borrow().is_empty() {
109                if self.state.time.borrow().can_exit() {
110                    break;
111                }
112
113                if let Some(wakers) = self.state.time.borrow_mut().advance_time() {
114                    // No events left, advance time
115                    for task_waker in wakers.into_iter() {
116                        task_waker.waker.wake();
117                    }
118                } else {
119                    break;
120                }
121            }
122        }
123        Ok(())
124    }
125
126    pub fn step(&self, finished: &Rc<RefCell<bool>>) -> SimResult {
127        // Append new tasks created since the last step into the task queue
128        let mut task_queue = self.state.task_queue.borrow_mut();
129        task_queue.append(&mut self.state.new_tasks.borrow_mut());
130
131        // Loop over all tasks, polling them. If a task is not ready, add it to the
132        // pending tasks.
133        for task in task_queue.drain(..) {
134            if *finished.borrow() {
135                break;
136            }
137
138            // Dummy waker and context (not used as we poll all tasks)
139            let waker = waker_for_task(task.clone());
140            let mut context = Context::from_waker(&waker);
141
142            match task.poll(&mut context) {
143                Poll::Ready(Err(e)) => {
144                    // Error - return early
145                    return Err(e);
146                }
147                Poll::Ready(Ok(())) => {
148                    // Otherwise, drop task as it is complete
149                }
150                Poll::Pending => {
151                    // Task will have parked itself waiting somewhere
152                }
153            }
154        }
155        Ok(())
156    }
157
158    #[must_use]
159    pub fn get_clock(&self, freq_mhz: f64) -> Clock {
160        self.state.time.borrow_mut().get_clock(freq_mhz)
161    }
162
163    #[must_use]
164    pub fn time_now_ns(&self) -> f64 {
165        self.state.time.borrow().time_now_ns()
166    }
167}
168
169/// `Spawner` spawns new futures into the executor.
170#[derive(Clone)]
171pub struct Spawner {
172    state: Rc<ExecutorState>,
173}
174
175impl Spawner {
176    pub fn spawn(&self, future: impl Future<Output = SimResult> + 'static) {
177        self.state
178            .new_tasks
179            .borrow_mut()
180            .push(Rc::new(Task::new(future, self.state.clone())));
181    }
182}
183
184#[must_use]
185pub fn new_executor_and_spawner(top: &Rc<Entity>) -> (Executor, Spawner) {
186    let state = Rc::new(ExecutorState::new(top));
187    let entity = Rc::new(Entity::new(top, "executor"));
188    (
189        Executor {
190            entity,
191            state: state.clone(),
192        },
193        Spawner { state },
194    )
195}