Skip to content

Commit 53bbccb

Browse files
committed
rust: add a workqueue-based executor
This allows async Rust to run tasks on dedicated or shared thread pools that are managed by existing C kernel infrastructure. Signed-off-by: Wedson Almeida Filho <wedsonaf@google.com>
1 parent 6a4300b commit 53bbccb

File tree

2 files changed

+293
-0
lines changed

2 files changed

+293
-0
lines changed

rust/kernel/kasync/executor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use core::{
1212
task::{RawWaker, RawWakerVTable, Waker},
1313
};
1414

15+
pub mod workqueue;
16+
1517
/// Spawns a new task to run in the given executor.
1618
///
1719
/// It also automatically defines a new lockdep lock class for executors (e.g., workqueue) that
Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
// SPDX-License-Identifier: GPL-2.0
2+
3+
//! Kernel support for executing futures in C workqueues (`struct workqueue_struct`).
4+
5+
use super::{AutoStopHandle, RefWake};
6+
use crate::{
7+
error::code::*,
8+
mutex_init,
9+
revocable::AsyncRevocable,
10+
sync::{LockClassKey, Mutex, Ref, RefBorrow, UniqueRef},
11+
unsafe_list,
12+
workqueue::{BoxedQueue, Queue, Work, WorkAdapter},
13+
Either, Left, Result, Right,
14+
};
15+
use core::{cell::UnsafeCell, future::Future, marker::PhantomPinned, pin::Pin, task::Context};
16+
17+
trait RevocableTask {
18+
fn revoke(&self);
19+
fn flush(self: Ref<Self>);
20+
fn to_links(&self) -> &unsafe_list::Links<dyn RevocableTask>;
21+
}
22+
23+
// SAFETY: `Task` has a single `links` field and only one adapter.
24+
unsafe impl unsafe_list::Adapter for dyn RevocableTask {
25+
type EntryType = dyn RevocableTask;
26+
fn to_links(obj: &dyn RevocableTask) -> &unsafe_list::Links<dyn RevocableTask> {
27+
obj.to_links()
28+
}
29+
}
30+
31+
struct Task<T: 'static + Send + Future> {
32+
links: unsafe_list::Links<dyn RevocableTask>,
33+
executor: Ref<Executor>,
34+
work: Work,
35+
future: AsyncRevocable<UnsafeCell<T>>,
36+
}
37+
38+
// SAFETY: The `future` field is only used by one thread at a time (in the `poll` method, which is
39+
// called by the work queue, who guarantees no reentrancy), so a task is `Sync` as long as the
40+
// future is `Send`.
41+
unsafe impl<T: 'static + Send + Future> Sync for Task<T> {}
42+
43+
// SAFETY: If the future `T` is `Send`, so is the task.
44+
unsafe impl<T: 'static + Send + Future> Send for Task<T> {}
45+
46+
impl<T: 'static + Send + Future> Task<T> {
47+
fn try_new(
48+
executor: Ref<Executor>,
49+
key: &'static LockClassKey,
50+
future: T,
51+
) -> Result<Ref<Self>> {
52+
let task = UniqueRef::try_new(Self {
53+
executor: executor.clone(),
54+
links: unsafe_list::Links::new(),
55+
// SAFETY: `work` is initialised below.
56+
work: unsafe { Work::new() },
57+
future: AsyncRevocable::new(UnsafeCell::new(future)),
58+
})?;
59+
60+
Work::init(&task, key);
61+
62+
let task = Ref::from(task);
63+
64+
// Add task to list.
65+
{
66+
let mut guard = executor.inner.lock();
67+
if guard.stopped {
68+
return Err(EINVAL);
69+
}
70+
71+
// Convert one reference into a pointer so that we hold on to a ref count while the
72+
// task is in the list.
73+
Ref::into_raw(task.clone());
74+
75+
// SAFETY: The task was just created, so it is not in any other lists. It remains alive
76+
// because we incremented the refcount to account for it being in the list. It never
77+
// moves because it's pinned behind a `Ref`.
78+
unsafe { guard.tasks.push_back(&*task) };
79+
}
80+
81+
Ok(task)
82+
}
83+
}
84+
85+
unsafe impl<T: 'static + Send + Future> WorkAdapter for Task<T> {
86+
type Target = Self;
87+
const FIELD_OFFSET: isize = crate::offset_of!(Self, work);
88+
fn run(task: Ref<Task<T>>) {
89+
let waker = super::ref_waker(task.clone());
90+
let mut ctx = Context::from_waker(&waker);
91+
92+
let guard = if let Some(g) = task.future.try_access() {
93+
g
94+
} else {
95+
return;
96+
};
97+
98+
// SAFETY: `future` is pinned when the task is. The task is pinned because it's behind a
99+
// `Ref`, which is always pinned.
100+
//
101+
// Work queues guarantee no reentrancy and this is the only place where the future is
102+
// dereferenced, so it's ok to do it mutably.
103+
let future = unsafe { Pin::new_unchecked(&mut *guard.get()) };
104+
if future.poll(&mut ctx).is_ready() {
105+
drop(guard);
106+
task.revoke();
107+
}
108+
}
109+
}
110+
111+
impl<T: 'static + Send + Future> super::Task for Task<T> {
112+
fn sync_stop(self: Ref<Self>) {
113+
self.revoke();
114+
self.flush();
115+
}
116+
}
117+
118+
impl<T: 'static + Send + Future> RevocableTask for Task<T> {
119+
fn revoke(&self) {
120+
if !self.future.revoke() {
121+
// Nothing to do if the task was already revoked.
122+
return;
123+
}
124+
125+
// SAFETY: The object is inserted into the list on creation and only removed when the
126+
// future is first revoked. (Subsequent revocations don't result in additional attempts
127+
// to remove per the check above.)
128+
unsafe { self.executor.inner.lock().tasks.remove(self) };
129+
130+
// Decrement the refcount now that the task is no longer in the list.
131+
//
132+
// SAFETY: `into_raw` was called from `try_new` when the task was added to the list.
133+
unsafe { Ref::from_raw(self) };
134+
}
135+
136+
fn flush(self: Ref<Self>) {
137+
self.work.cancel();
138+
}
139+
140+
fn to_links(&self) -> &unsafe_list::Links<dyn RevocableTask> {
141+
&self.links
142+
}
143+
}
144+
145+
impl<T: 'static + Send + Future> RefWake for Task<T> {
146+
fn wake(self: Ref<Self>) {
147+
if self.future.is_revoked() {
148+
return;
149+
}
150+
151+
match &self.executor.queue {
152+
Left(q) => &**q,
153+
Right(q) => *q,
154+
}
155+
.enqueue(self.clone());
156+
}
157+
158+
fn wake_by_ref(self: RefBorrow<'_, Self>) {
159+
Ref::from(self).wake();
160+
}
161+
}
162+
163+
struct ExecutorInner {
164+
stopped: bool,
165+
tasks: unsafe_list::List<dyn RevocableTask>,
166+
}
167+
168+
/// An executor backed by a work queue.
169+
///
170+
/// # Examples
171+
///
172+
/// The following example runs two tasks on the shared system workqueue.
173+
///
174+
/// ```
175+
/// # use kernel::prelude::*;
176+
/// use kernel::kasync::executor::workqueue::Executor;
177+
/// use kernel::workqueue;
178+
/// use kernel::spawn_task;
179+
///
180+
/// fn example_shared_workqueue() -> Result {
181+
/// let mut handle = Executor::try_new(workqueue::system())?;
182+
/// spawn_task!(handle.executor(), async {
183+
/// pr_info!("First workqueue task\n");
184+
/// })?;
185+
/// spawn_task!(handle.executor(), async {
186+
/// pr_info!("Second workqueue task\n");
187+
/// })?;
188+
/// handle.detach();
189+
/// Ok(())
190+
/// }
191+
///
192+
/// # example_shared_workqueue().unwrap();
193+
/// ```
194+
pub struct Executor {
195+
queue: Either<BoxedQueue, &'static Queue>,
196+
inner: Mutex<ExecutorInner>,
197+
_pin: PhantomPinned,
198+
}
199+
200+
// SAFETY: The executor is backed by a kernel `struct workqueue_struct`, which works from any
201+
// thread.
202+
unsafe impl Send for Executor {}
203+
204+
// SAFETY: The executor is backed by a kernel `struct workqueue_struct`, which can be used
205+
// concurrently by multiple threads.
206+
unsafe impl Sync for Executor {}
207+
208+
impl Executor {
209+
/// Creates a new workqueue-based executor using a static work queue.
210+
pub fn try_new(wq: &'static Queue) -> Result<AutoStopHandle<Self>> {
211+
Self::new_internal(Right(wq))
212+
}
213+
214+
/// Creates a new workqueue-based executor using an owned (boxed) work queue.
215+
pub fn try_new_owned(wq: BoxedQueue) -> Result<AutoStopHandle<Self>> {
216+
Self::new_internal(Left(wq))
217+
}
218+
219+
/// Creates a new workqueue-based executor.
220+
///
221+
/// It uses the given work queue to run its tasks.
222+
fn new_internal(queue: Either<BoxedQueue, &'static Queue>) -> Result<AutoStopHandle<Self>> {
223+
let mut e = Pin::from(UniqueRef::try_new(Self {
224+
queue,
225+
_pin: PhantomPinned,
226+
// SAFETY: `mutex_init` is called below.
227+
inner: unsafe {
228+
Mutex::new(ExecutorInner {
229+
stopped: false,
230+
tasks: unsafe_list::List::new(),
231+
})
232+
},
233+
})?);
234+
// SAFETY: `tasks` is pinned when the executor is.
235+
let pinned = unsafe { e.as_mut().map_unchecked_mut(|e| &mut e.inner) };
236+
mutex_init!(pinned, "Executor::inner");
237+
238+
Ok(AutoStopHandle::new(e.into()))
239+
}
240+
}
241+
242+
impl super::Executor for Executor {
243+
fn spawn(
244+
self: RefBorrow<'_, Self>,
245+
key: &'static LockClassKey,
246+
future: impl Future + 'static + Send,
247+
) -> Result<Ref<dyn super::Task>> {
248+
let task = Task::try_new(self.into(), key, future)?;
249+
task.clone().wake();
250+
Ok(task)
251+
}
252+
253+
fn stop(&self) {
254+
// Set the `stopped` flag.
255+
self.inner.lock().stopped = true;
256+
257+
// Go through all tasks and revoke & flush them.
258+
//
259+
// N.B. If we decide to allow "asynchronous" stops, we need to ensure that tasks that have
260+
// been revoked but not flushed yet remain in the list so that we can flush them here.
261+
// Otherwise we may have a race where we may have a running task (was revoked while
262+
// running) that isn't the list anymore, so we think we've synchronously stopped all tasks
263+
// when we haven't really -- unloading a module in this situation leads to memory safety
264+
// issues (running unloaded code).
265+
loop {
266+
let guard = self.inner.lock();
267+
268+
let front = if let Some(t) = guard.tasks.front() {
269+
t
270+
} else {
271+
break;
272+
};
273+
274+
// Get a new reference to the task.
275+
//
276+
// SAFETY: We know all entries in the list are of type `Ref<dyn RevocableTask>` and
277+
// that a reference exists while the entry is in the list, and since we are holding the
278+
// list lock, we know it cannot go away. The `into_raw` call below ensures that we
279+
// don't decrement the refcount accidentally.
280+
let tasktmp = unsafe { Ref::<dyn RevocableTask>::from_raw(front.as_ptr()) };
281+
let task = tasktmp.clone();
282+
Ref::into_raw(tasktmp);
283+
284+
// Release the mutex before revoking the task.
285+
drop(guard);
286+
287+
task.revoke();
288+
task.flush();
289+
}
290+
}
291+
}

0 commit comments

Comments
 (0)