gwr_engine/
executor.rs

1// Copyright (c) 2023 Graphcore Ltd. All rights reserved.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::pin::Pin;
6use std::rc::Rc;
7use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
8
9use gwr_track::entity::Entity;
10
11use crate::time::clock::Clock;
12use crate::time::simtime::SimTime;
13use crate::types::SimResult;
14
15fn no_op(_: *const ()) {}
16
17fn task_raw_waker(task: Rc<Task>) -> RawWaker {
18    let vtable = &RawWakerVTable::new(clone_raw_waker, wake_task, no_op, no_op);
19    let ptr = Rc::into_raw(task) as *const ();
20    RawWaker::new(ptr, vtable)
21}
22
23fn waker_for_task(task: Rc<Task>) -> Waker {
24    unsafe { Waker::from_raw(task_raw_waker(task)) }
25}
26
27unsafe fn clone_raw_waker(data: *const ()) -> RawWaker {
28    unsafe {
29        // Tasks are always wrapped in a reference counter to allow them to be shared
30        // read-only.
31        let rc_task = Rc::from_raw(data as *const Task);
32        let clone = rc_task.clone();
33        let vtable = &RawWakerVTable::new(clone_raw_waker, wake_task, no_op, no_op);
34        let ptr = Rc::into_raw(clone) as *const ();
35        RawWaker::new(ptr, vtable)
36    }
37}
38
39unsafe fn wake_task(data: *const ()) {
40    unsafe {
41        // Tasks are always wrapped in a reference counter to allow them to be shared
42        // read-only.
43        let rc_task = Rc::from_raw(data as *const Task);
44        let cloned = rc_task.clone();
45        rc_task.executor_state.new_tasks.borrow_mut().push(cloned);
46    }
47}
48
49struct Task {
50    future: RefCell<Pin<Box<dyn Future<Output = SimResult>>>>,
51    executor_state: Rc<ExecutorState>,
52}
53
54impl Task {
55    pub fn new(
56        future: impl Future<Output = SimResult> + 'static,
57        executor_state: Rc<ExecutorState>,
58    ) -> Task {
59        Task {
60            future: RefCell::new(Box::pin(future)),
61            executor_state,
62        }
63    }
64
65    fn poll(&self, context: &mut Context) -> Poll<SimResult> {
66        self.future.borrow_mut().as_mut().poll(context)
67    }
68}
69
70struct ExecutorState {
71    task_queue: RefCell<Vec<Rc<Task>>>,
72    new_tasks: RefCell<Vec<Rc<Task>>>,
73    time: RefCell<SimTime>,
74}
75
76impl ExecutorState {
77    pub fn new(top: &Rc<Entity>) -> Self {
78        Self {
79            task_queue: RefCell::new(Vec::new()),
80            new_tasks: RefCell::new(Vec::new()),
81            time: RefCell::new(SimTime::new(top)),
82        }
83    }
84}
85
86/// Single-threaded executor
87///
88/// This is a thin-wrapper (using [`Rc`]) around the real executor, so that this
89/// struct can be cloned and passed around.
90///
91/// See the [module documentation] for more details.
92///
93/// [module documentation]: index.html
94#[derive(Clone)]
95pub struct Executor {
96    state: Rc<ExecutorState>,
97}
98
99impl Executor {
100    pub fn run(&self, finished: &Rc<RefCell<bool>>) -> SimResult {
101        loop {
102            self.step(finished)?;
103            if *finished.borrow() {
104                break;
105            }
106
107            if self.state.new_tasks.borrow().is_empty() {
108                if self.state.time.borrow().can_exit() {
109                    break;
110                }
111
112                if let Some(wakers) = self.state.time.borrow_mut().advance_time() {
113                    // No events left, advance time
114                    for task_waker in wakers.into_iter() {
115                        task_waker.waker.wake();
116                    }
117                } else {
118                    break;
119                }
120            }
121        }
122        Ok(())
123    }
124
125    pub fn step(&self, finished: &Rc<RefCell<bool>>) -> SimResult {
126        // Append new tasks created since the last step into the task queue
127        let mut task_queue = self.state.task_queue.borrow_mut();
128        task_queue.append(&mut self.state.new_tasks.borrow_mut());
129
130        // Loop over all tasks, polling them. If a task is not ready, add it to the
131        // pending tasks.
132        for task in task_queue.drain(..) {
133            if *finished.borrow() {
134                break;
135            }
136
137            // Dummy waker and context (not used as we poll all tasks)
138            let waker = waker_for_task(task.clone());
139            let mut context = Context::from_waker(&waker);
140
141            match task.poll(&mut context) {
142                Poll::Ready(Err(e)) => {
143                    // Error - return early
144                    return Err(e);
145                }
146                Poll::Ready(Ok(())) => {
147                    // Otherwise, drop task as it is complete
148                }
149                Poll::Pending => {
150                    // Task will have parked itself waiting somewhere
151                }
152            }
153        }
154        Ok(())
155    }
156
157    #[must_use]
158    pub fn get_clock(&self, freq_mhz: f64) -> Clock {
159        self.state.time.borrow_mut().get_clock(freq_mhz)
160    }
161
162    #[must_use]
163    pub fn time_now_ns(&self) -> f64 {
164        self.state.time.borrow().time_now_ns()
165    }
166}
167
168/// `Spawner` spawns new futures into the executor.
169#[derive(Clone)]
170pub struct Spawner {
171    state: Rc<ExecutorState>,
172}
173
174impl Spawner {
175    pub fn spawn(&self, future: impl Future<Output = SimResult> + 'static) {
176        self.state
177            .new_tasks
178            .borrow_mut()
179            .push(Rc::new(Task::new(future, self.state.clone())));
180    }
181}
182
183#[must_use]
184pub fn new_executor_and_spawner(top: &Rc<Entity>) -> (Executor, Spawner) {
185    let state = Rc::new(ExecutorState::new(top));
186    (
187        Executor {
188            state: state.clone(),
189        },
190        Spawner { state },
191    )
192}