1. 什么是 Embassy
Embassy 使用 Rust 语言的 async
/ .await
元语实现,本质上来说是一个 Rust异步运行时(Runtime),通过实现 Rust 提供的接口加上硬件抽象层(HAL) 支持对嵌入式设备的的协助式多任务支持。
根据 Dion 对 FreeRTOS 和 Embassy 的比较,Embassy 在中断时间,中断延迟,程序大小等方面表现相当不错。
2. Rust 对 Zero-cost async
/.await
的探索
因为 Embassy 基于 Rust 提供的异步抽象,我们先对 Rust 的异步抽象概念进行解释。内容参考了 OS-Blog 中 Async/Await 的理解
2.1 Future
对象
Future
表明了一个最终会完成计算的值,一个 Future
只定义了一个计算过程,但是不会开始计算,需要通过运行时不断通过 Future::poll
对它进行执行。
它的接口定义为
pub trait Future {
type Output;
// Required method
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
其中 poll
为其核心函数,我们将它理解为 “try to make some progress”。每一次对 Future::poll
的调用返回一个 Poll
作为结果,它被定义为
pub enum Poll<T> {
Ready(T),
Pending,
}
当返回结果为 Poll::Ready
时,其元组的内容为计算值,表示这个 Future
已经完成,当返回内容为 Poll::Pending
时,表示仍为进行完成,需要再次调用 Future::poll
。至于在什么时候调用就是像 Embassy 这样的运行时决定了。
我们需要注意到 Future::poll
的函数定义,其中第一个参数为 Pin<&mut Self>
,第二个参数为 &mut Context
的执行上下文。我们下面对它们进行一一分析。
self: Pin<&mut Self>
为一个 self
对象,这里使用的 arbitrary self type 将 self
范化成一个 “可以通过一系列解引用得到 self
的对象“,例如
fn do_something(self: Rc<Self>);
fn do_something(self: Arc<Self>);
fn do_something(self: Box<Self>);
fn do_something(self: MutexGuard<Self>);
2.2 移动/地址不敏感类型
对于更详细的解释,可以参照 Rust 官方文档
我们需要注意到 Pin<Ptr>
这个对象,它能够保证对象内部的内容不会被移动,因为在异步Rust中很多状态会对自己的某个 field 或自己进行自引用 (self-reference),因为编译器不能在编译时分辨出一个类型是否包含自引用,所以我们通过 Pin<Ptr>
来保证:它的指向对象 要么永远不会移动,要么实现 Unpin
(i.e. 对移动不敏感)
我们接下来举例说明对移动敏感的类型:
对于一个类型,它有一个数组,并且有一个指向数组第一个元素的指针,定义如下
struct AddressSensitiveType {
data: [u8; 1024],
first_element: *const u8,
}
如果我们在栈上初始化(局部变量)后希望将它通过 Box
移动到堆空间 (recall: 对于 Copy trait 只会进行 bitwise copy,其 first_element
仍指向栈的空间的无效的 data
,而不是移动后的堆空间的 data
。就如下图展示的:
移动前
移动后
需要注意,Unpin
是一个自动接口 (auto trait),在编译器眼中,一个类型的任意一个 field 如果实现 !Unpin
,整个类型为 !Unpin
(不能对其进行 Unpin
, i.e. 对移动敏感)我们可以通过一个无大小的标记 field 对这个类型进行标记,这个标记实现了 !Unpin
struct AddressSensitiveType {
data: [u8; 1024],
first_element: *const u8,
_pin: PhantomPinned
}
Pin<Ptr>
的实现十分简单,即对任何 Ptr: Deref
实现解引用,但仅对实现了 Unpin
的 Ptr:Target
进行可变解引用。
impl<Ptr: DerefMut<Ptr::Target: Unpin>> DerefMut for Pin<Ptr> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut *self.ptr
}
}
我们仍然可以通过 unsafe 方法得到对类型的可变引用,但是我们需要保证不会对地址敏感内容进行修改和对这个类型进行移动
2.3 Future
执行上下文
当前的执行上下文仅用于保存 Waker
对象,我们通过 Waker
对象唤醒运行时,以免它进行忙等消耗资源。Waker
通过执行器 Executor
创建,这样我们才知道唤醒那一个 Executor
(可以有多个执行器并行执行,这需要线程抽象)。Waker
实现了 Clone
, Send
和 Sync
确保它能够在不同线程间传递。
当我们返回 Poll::Pending
时需要保证存在一个 Waker
在这个 Future
准备好被 poll
again 时唤醒执行器。否则它将不会再被执行。
需要注意,每次我们执行一次 Feature::poll
后都需要更新执行上下文/Waker
,因为一个 Future
可以在多个 Executor
间移动,我们需要保证后面 Waker
唤醒的是最新的一个 Executor
而不是移动之前的 Executor
(我们不进行细节上更深入的讨论)。
pub struct Context<'a> {
waker: &'a Waker,
// Ensure we future-proof against variance changes by forcing
// the lifetime to be invariant (argument-position lifetimes
// are contravariant while return-position lifetimes are
// covariant).
_marker: PhantomData<fn(&'a ()) -> &'a ()>,
// Ensure `Context` is `!Send` and `!Sync` in order to allow
// for future `!Send` and / or `!Sync` fields.
_marker2: PhantomData<*mut ()>,
}
其中 Waker
是对 RawWaker
的一个封装,RawWaker
内部实现机制为虚表(vtable
),对不同的操作调用函数进行保存。对 Waker
的唤醒被代理为对 RawWaker
保存函数的调用。
pub struct Waker {
waker: RawWaker,
}
impl Waker {
pub fn wake(self) {
// The actual wakeup call is delegated through a virtual function call
// to the implementation which is defined by the executor.
let wake = self.waker.vtable.wake;
let data = self.waker.data;
// Don't call `drop` -- the waker will be consumed by `wake`.
crate::mem::forget(self);
// SAFETY: This is safe because `Waker::from_raw` is the only way
// to initialize `wake` and `data` requiring the user to acknowledge
// that the contract of `RawWaker` is upheld.
unsafe { (wake)(data) };
}
pub fn wake_by_ref(&self) {
// The actual wakeup call is delegated through a virtual function call
// to the implementation which is defined by the executor.
// SAFETY: see `wake`
unsafe { (self.waker.vtable.wake_by_ref)(self.waker.data) }
}
// ...
}
对于 RawWaker
的实现我们简单给出源码
pub struct RawWaker {
/// A data pointer, which can be used to store arbitrary data as required
/// by the executor. This could be e.g. a type-erased pointer to an `Arc`
/// that is associated with the task.
/// The value of this field gets passed to all functions that are part of
/// the vtable as the first parameter.
data: *const (),
/// Virtual function pointer table that customizes the behavior of this waker.
vtable: &'static RawWakerVTable,
}
pub struct RawWakerVTable {
/// This function will be called when the [`RawWaker`] gets cloned, e.g. when
/// the [`Waker`] in which the [`RawWaker`] is stored gets cloned.
///
/// The implementation of this function must retain all resources that are
/// required for this additional instance of a [`RawWaker`] and associated
/// task. Calling `wake` on the resulting [`RawWaker`] should result in a wakeup
/// of the same task that would have been awoken by the original [`RawWaker`].
clone: unsafe fn(*const ()) -> RawWaker,
/// This function will be called when `wake` is called on the [`Waker`].
/// It must wake up the task associated with this [`RawWaker`].
///
/// The implementation of this function must make sure to release any
/// resources that are associated with this instance of a [`RawWaker`] and
/// associated task.
wake: unsafe fn(*const ()),
/// This function will be called when `wake_by_ref` is called on the [`Waker`].
/// It must wake up the task associated with this [`RawWaker`].
///
/// This function is similar to `wake`, but must not consume the provided data
/// pointer.
wake_by_ref: unsafe fn(*const ()),
/// This function gets called when a [`Waker`] gets dropped.
///
/// The implementation of this function must make sure to release any
/// resources that are associated with this instance of a [`RawWaker`] and
/// associated task.
drop: unsafe fn(*const ()),
}
2.4 Future
是一个状态机
我们并没有在执行上下文中保存寄存器内容,每个任务也没有独立的栈,这样虽然大大减少了分配任务和上下文切换需要的时间,但我们如何保存这些任务的执行状态,使得在每次放弃执行和继续执行时的状态相同呢?这需要 Rust 编译器的支持。
首先我们将探讨 Future
是如何可以被嵌套形成多种执行逻辑的。
因为对于执行器来说,每个 Future
都可以被调度执行,如果我们希望先执行 A
在执行 B
应该如何表示呢?我们仍然用一个新的 Future
表示,定义为 AndThen
。我们将尝试用伪代码实现它,尽管其中有很多细节没有考虑
struct<F> AndThenState {
WaittingOnFirstFut,
WaittingOnSecondFut {
first_fut_result: F
},
Done,
}
struct<F, S> AndThen {
first_fut: Future<F>,
second_fut: Future<S>,
state: AndThenState
}
impl<F, S> Future for AndThen {
type Output = S;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.state {
AndThenState::WaittingOnFirstFut => {
match self.first_fut.poll(...) {
Poll::Pending => {
do_something_with_the_waker();
return Poll::Pending,
}
Poll::Ready(reuslt) => {
self.state = AndThenState::WrittingOnSecondFut {
first_fut_result = result
};
do_something_with_the_waker();
return Poll::Pending;
}
}
},
AndThenState::WaittingOnSecondFut => {
match self.second_fut.poll(...) {
Poll::Pending => {
do_some_thing_with_the_waker();
return Poll::Pending,
}
Poll::Ready(result) => {
self.state = AndThenState::Done;
return Poll::Ready(result)
}
}
},
AndThenState::Done => {
panic!("Already Done!")
}
}
}
}
我们其实创建了一个状态机,在每一个状态中保存了这个状态执行所需要的内容。对于 Rust 编译器来说,每一个 .await
就是一个状态的节点,我们将在这里保存当前状态并将执行权交还执行器。下面的例子来自于 OS Blog
async fn example(min_len: usize) -> String {
let content = async_read_file("foo.txt").await;
if content.len() < min_len {
content + &async_read_file("bar.txt").await
} else {
content
}
}
编译器将生成一个有限状态机对每个状态进行保存。至于这些状态依赖的变量怎么得到,怎么裁减掉对于这个状态无用的变量等问题超出了本文的范畴,这将由编译器决定。
状态转换图如下
我们可以生成以下四个状态。注意到 WaitingOnBarTxtState
对 min_len
并没有需要,它不会保存在这个状态中。但是因为会使用 constent + &async_read_file("bar.txt")
,将 content
保存下来。
这是可能的编译器生成的状态
// The compiler-generated state structs:
struct StartState {
min_len: usize,
}
struct WaitingOnFooTxtState {
min_len: usize,
foo_txt_future: impl Future<Output = String>,
}
struct WaitingOnBarTxtState {
content: String,
bar_txt_future: impl Future<Output = String>,
}
struct EndState {}
于是我们可以创建状态机
enum ExampleStateMachine {
Start(StartState),
WaitingOnFooTxt(WaitingOnFooTxtState),
WaitingOnBarTxt(WaitingOnBarTxtState),
End(EndState),
}
我们需要对原代码分段加入不同的状态,并在执行完成这一段代码后改变状态并保存下一个状态依赖的内容。因为编译器生成条件语句时会将它转变为 goto
, 这对于实现状态机十分友好。
impl Future for ExampleStateMachine {
type Output = String; // return type of `example`
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match self { // TODO: handle pinning
ExampleStateMachine::Start(state) => {
// from body of `example`
let foo_txt_future = async_read_file("foo.txt");
// `.await` operation
let state = WaitingOnFooTxtState {
min_len: state.min_len,
foo_txt_future,
};
*self = ExampleStateMachine::WaitingOnFooTxt(state);
}
ExampleStateMachine::WaitingOnFooTxt(state) => {
match state.foo_txt_future.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(content) => {
// from body of `example`
if content.len() < state.min_len {
let bar_txt_future = async_read_file("bar.txt");
// `.await` operation
let state = WaitingOnBarTxtState {
content,
bar_txt_future,
};
*self = ExampleStateMachine::WaitingOnBarTxt(state);
} else {
*self = ExampleStateMachine::End(EndState);
return Poll::Ready(content);
}
}
}
}
ExampleStateMachine::WaitingOnBarTxt(state) => {
match state.bar_txt_future.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(bar_txt) => {
*self = ExampleStateMachine::End(EndState);
// from body of `example`
return Poll::Ready(state.content + &bar_txt);
}
}
}
ExampleStateMachine::End(_) => {
panic!("poll called after Poll::Ready was returned");
}
}
}
}
}
2.5 执行器 Executor
执行器用来执行多个 Future
计算,我们将每一个 Future
计算的执行作为一个 Task
任务,执行器管理对每一个任务的调度执行。
需要清楚,执行器是非抢占性的,每个任务必须自觉的将执行权归还给执行器(返回 Poll::Pending
),执行器才能进行它的工作,否则如果一个 Future
被阻塞或者一直循环,其他所有的任务都不能被继续执行(假设单个执行器)。故我们在使用系统调用时(如果操作系统)不要选择阻塞调用。
协助式多任务
- Each future that is added to the executor is basically a cooperative task.
- Instead of using an explicit yield operation, futures give up control of the CPU core by returning
Poll::Pending
(orPoll::Ready
at the end).
- There is nothing that forces futures to give up the CPU. If they want, they can never return from
poll
, e.g., by spinning endlessly in a loop.- Since each future can block the execution of the other futures in the executor, we need to trust them to not be malicious.
- Futures internally store all the state they need to continue execution on the next
poll
call. With async/await, the compiler automatically detects all variables that are needed and stores them inside the generated state machine.
- Only the minimum state required for continuation is saved.
- Since the
poll
method gives up the call stack when it returns, the same stack can be used for polling other futures.
3. Embassy 的运行时(Runtime)实现
下面的解释参考了 Embassy Documentation 和 Embassy 源码
我们现在探讨 Embassy 对任务的抽象 (Rust aysnc
API 之外)
3.1 TaskStorage
用来封装任务状态机和调度
embassy_executor::task
macro 会静态分配一个 TaskStorage
数组,这个数组保存了未初始化的任务(TaskHeader::state
= !STATE_SPAWNED
)
pub struct TaskPool<F: Future + 'static, const N: usize> {
pool: [TaskStorage<F>; N],
}
每一个 TaskStorage
保存了一个 TaskHeader
和 Future
。
pub struct TaskStorage<F: Future + 'static> {
raw: TaskHeader,
future: UninitCell<F>, // Valid if STATE_SPAWNED
}
/// An uninitialized [`TaskStorage`].
pub struct AvailableTask<F: Future + 'static> {
task: &'static TaskStorage<F>,
}
3.2 TaskHeader
用来封装任务调度
其中 TaskHeader
起到了一个保存执行调度状态和函数逻辑的功能
- 其中
State
为程序调度状态,分别有spawned
,run_queued
,timer_queued
,状态可以叠加。 -
run_queue_item
保存运行队列中的下一个任务,是运行队列的的一个节点RunQueue
→TaskHeader
(pointer) →TaskRef
(pointer,TaskHeader::run_queue_item
) →TaskHeader
(pointer) → … executor
指向执行器poll_fn
执行函数,这里保存任务的执行逻辑,这个函数会被调用并且完整执行(这个逻辑可以是一个对状态机的一次执行,我们将在后面对它进行解释)
/// Raw task header for use in task pointers.
pub(crate) struct TaskHeader {
pub(crate) state: State,
pub(crate) run_queue_item: RunQueueItem,
pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
#[cfg(feature = "integrated-timers")]
pub(crate) expires_at: SyncUnsafeCell<u64>,
#[cfg(feature = "integrated-timers")]
pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
}
TaskRef
是对 TaskHeader
的引用包裹,存有指向 TaskHeader
的非空指针
/// This is essentially a `&'static TaskStorage<F>` where the type of the future has been erased.
#[derive(Clone, Copy)]
pub struct TaskRef {
ptr: NonNull<TaskHeader>,
}
3.3 Executor
s
-
arch
Executor
这里对每个支持的 CPU 架构都进行了 Executor 设计,常通过中断唤醒
// risc-v 32 executor pub struct Executor { inner: raw::Executor, not_send: PhantomData<*mut ()>, }
-
raw
Executor
#[repr(transparent)] pub struct Executor { pub(crate) inner: SyncExecutor, _not_sync: PhantomData<*mut ()>, }
-
SyncExecutor
pub(crate) struct SyncExecutor { run_queue: RunQueue, pender: Pender, #[cfg(feature = "integrated-timers")] pub(crate) timer_queue: timer_queue::TimerQueue, #[cfg(feature = "integrated-timers")] alarm: AlarmHandle, }
所有的poll
逻辑在 SyncExecutor
中被实现,其实现很简单,将运行队列中所有的任务出队(并在这时更新在 TaskHeader
中的任务状态为出队, i.e. run_queued 位置 0)并执行任务
注意到任务并不会被重新放回 RunQueue
,如果开启时钟,我们可以将超时的任务重新放入 RunQueue
,在 SyncExecutor::timer_queue::dequeue_expired
中执行 wake_task_no_pend
将这些任务在 SyncExecutor::poll
执行 SyncExecutor::run_queue::dequeue_all
前将这些任务放入运行队列 run_queue
pub fn wake_task_no_pend(task: TaskRef) {
let header = task.header();
if header.state.run_enqueue() {
// We have just marked the task as scheduled, so enqueue it.
unsafe {
let executor = header.executor.get().unwrap_unchecked();
executor.run_queue.enqueue(task);
}
}
}
在任务执行后,再将这个任务加入时钟队列 SyncExecutor::timer_queue
(此时已经移出运行队列)
pub(crate) unsafe fn update(&self, p: TaskRef) {
let task = p.header();
if task.expires_at.get() != u64::MAX {
if task.state.timer_enqueue() {
task.timer_queue_item.next.set(self.head.get());
self.head.set(Some(p));
}
}
}
下面是执行逻辑
impl SyncExecutor {
/// # Safety
///
/// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
pub(crate) unsafe fn poll(&'static self) {
#[cfg(feature = "integrated-timers")]
embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ());
#[allow(clippy::never_loop)]
loop {
#[cfg(feature = "integrated-timers")]
self.timer_queue
.dequeue_expired(embassy_time_driver::now(), wake_task_no_pend);
self.run_queue.dequeue_all(|p| {
let task = p.header();
#[cfg(feature = "integrated-timers")]
task.expires_at.set(u64::MAX);
if !task.state.run_dequeue() {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
// - Task poll finishes, returning done=true
// - RUNNING bit is cleared, but the task is already in the queue.
return;
}
#[cfg(feature = "rtos-trace")]
trace::task_exec_begin(p.as_ptr() as u32);
// Run the task
task.poll_fn.get().unwrap_unchecked()(p);
#[cfg(feature = "rtos-trace")]
trace::task_exec_end();
// Enqueue or update into timer_queue
#[cfg(feature = "integrated-timers")]
self.timer_queue.update(p);
});
#[cfg(feature = "integrated-timers")]
{
// If this is already in the past, set_alarm might return false
// In that case do another poll loop iteration.
let next_expiration = self.timer_queue.next_expiration();
if embassy_time_driver::set_alarm(self.alarm, next_expiration) {
break;
}
}
#[cfg(not(feature = "integrated-timers"))]
{
break;
}
}
#[cfg(feature = "rtos-trace")]
trace::system_idle();
}
}
其中 RunQueue::dequeue_all
是一个安全抽象,实现的一个 non-blocking 链表,其算法在这里不做讨论
impl RunQueue {
/// Empty the queue, then call `on_task` for each task that was in the queue.
/// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue
/// and will be processed by the *next* call to `dequeue_all`, *not* the current one.
pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) {
// Atomically empty the queue.
let ptr = self.head.swap(ptr::null_mut(), Ordering::AcqRel);
// safety: the pointer is either null or valid
let mut next = unsafe { NonNull::new(ptr).map(|ptr| TaskRef::from_ptr(ptr.as_ptr())) };
// Iterate the linked list of tasks that were previously in the queue.
while let Some(task) = next {
// If the task re-enqueues itself, the `next` pointer will get overwritten.
// Therefore, first read the next pointer, and only then process the task.
// safety: there are no concurrent accesses to `next`
next = unsafe { task.header().run_queue_item.next.get() };
on_task(task);
}
}
}
3.4 Pender
Pender
表示执行器在对每一个能够执行的任务执行 poll
过程中有其他任务能够继续被执行,如在这个过程中 I/O 设备已经准备好缓存了。
这样我们在执行完成一遍 poll
后不进行等待(如等待中断),直接再进行一遍 poll
,因为已经有任务准备好了可以进一步执行。
Pender
是 arch specific 函数,在每一个 arch 下都有对 __pender
的实现,定义如下:
impl Pender {
pub(crate) fn pend(self) {
extern "Rust" {
fn __pender(context: *mut ());
}
unsafe { __pender(self.0) };
}
}
我们先了解一下 Pender
会在什么时候被调用
在开启时钟时,会在每次时钟超时后进行调用。我们在每次 SyncExecutor::poll
时会调用 set_alarm_callback
,这里就会将 SyncExecutor::alarm_callback
函数传入作为回调函数,将 当前 SyncExecutor
指针传入作为上下文。
impl SyncExecutor {
pub(crate) unsafe fn poll(&'static self) {
#[cfg(feature = "integrated-timers")]
embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ());
// polling logic ...
}
}
impl SyncExecutor {
#[cfg(feature = "integrated-timers")]
fn alarm_callback(ctx: *mut ()) {
let this: &Self = unsafe { &*(ctx as *const Self) };
this.pender.pend();
}
}
我们查看 RISC-V 32 下的实现
/// global atomic used to keep track of whether there is work to do since sev() is not available on RISCV
static SIGNAL_WORK_THREAD_MODE: AtomicBool = AtomicBool::new(false);
#[export_name = "__pender"]
fn __pender(_context: *mut ()) {
SIGNAL_WORK_THREAD_MODE.store(true, Ordering::SeqCst);
}
暂时不知道 embassy 对 RISC-V 时中中断是怎么实现的,可能这段代码会在中断程序中执行,然后会在 U-Mode 触发中断
下面是 RISC-V 32 CPU 的 Executor
的执行实现,通过 wfi
(wait for interrupt) 指令等待中断,在中断发生后就会执行。我们现在能够更加理解 SIGNAL_WORK_THREAD_MODE
与 Pender
的含义:如果在执行 poll
时出现中断,这个中断记号会被保存,在处理结束后会继续重新执行 poll
,如果没有中断产生,就会进入低耗能的等待模式
pub struct Executor {
inner: raw::Executor,
not_send: PhantomData<*mut ()>,
}
impl Executor {
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
init(self.inner.spawner());
loop {
unsafe {
self.inner.poll();
// we do not care about race conditions between the load and store operations, interrupts
// will only set this value to true.
critical_section::with(|_| {
// if there is work to do, loop back to polling
// TODO can we relax this?
if SIGNAL_WORK_THREAD_MODE.load(Ordering::SeqCst) {
SIGNAL_WORK_THREAD_MODE.store(false, Ordering::SeqCst);
}
// if not, wait for interrupt
else {
core::arch::asm!("wfi");
}
});
// if an interrupt occurred while waiting, it will be serviced here
}
}
}
}
Q: Future 状态机的特性在 Embassy 中怎么体现?
我们会注意到,Embassy 将 我们创建的 Future
状态机保存在了 TaskStorage
中(它在一个静态数组内)
这样我们就可以将状态机和调度解耦,执行器在 Embassy 中仅负责调度,即执行那个任务,唤醒那个任务等;对每个任务,它都保存成了一个状态机,对这个状态机的执行是通过调用任务相对应的 TaskHeader::poll_fn
进行执行,这个 poll_fn
对每个任务来说都是相同的, i.e. 是状态机的执行函数,与我们之前看到的 Rust 状态机执行类似。
pub struct TaskStorage<F: Future + 'static> {
raw: TaskHeader,
future: UninitCell<F>, // Valid if STATE_SPAWNED
}
我们在调用 Embassy 的执行器执行时会调用 task.poll_fn.get().unwrap_unchecked()(p)
,我们接下来分析这个函数
这里的 task
是一个 TaskRef
,指向一个 TaskHeader
pub(crate) struct TaskHeader {
pub(crate) state: State,
pub(crate) run_queue_item: RunQueueItem,
pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,
#[cfg(feature = "integrated-timers")]
pub(crate) expires_at: SyncUnsafeCell<u64>,
#[cfg(feature = "integrated-timers")]
pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
}
Task创建的初始化代码我们可以在 TaskPool
中找到,原因我们会在后面讲解。这里我们注意这个函数,它初始化了 TaskHeader::poll_fn
函数的执行逻辑
fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> {
unsafe {
self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll));
self.task.future.write_in_place(future);
let task = TaskRef::new(self.task);
SpawnToken::new(task)
}
}
我们现在查看 TaskStorage::poll
的逻辑:这就是正统的 Future
状态机的一次执行模型(这个函数会被调用多次)
unsafe fn poll(p: TaskRef) {
let this = &*(p.as_ptr() as *const TaskStorage<F>);
let future = Pin::new_unchecked(this.future.as_mut());
let waker = waker::from_task(p);
let mut cx = Context::from_waker(&waker);
match future.poll(&mut cx) {
Poll::Ready(_) => {
this.future.drop_in_place();
this.raw.state.despawn();
#[cfg(feature = "integrated-timers")]
this.raw.expires_at.set(u64::MAX);
}
Poll::Pending => {}
}
// the compiler is emitting a virtual call for waker drop, but we know
// it's a noop for our waker.
mem::forget(waker);
}
我们可以借此机会通过了解 Timer 的案例来了解以下自定义 Future
的实现
这里,我们在返回 Poll::Pending
时就通知了时间队列 time_queue,让它在一段时间后调用 waker
对执行器进行唤醒
/// A future that completes at a specified [Instant](struct.Instant.html).
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Timer {
expires_at: Instant,
yielded_once: bool,
}
impl Unpin for Timer {}
impl Future for Timer {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.yielded_once && self.expires_at <= Instant::now() {
Poll::Ready(())
} else {
embassy_time_queue_driver::schedule_wake(self.expires_at.as_ticks(), cx.waker());
self.yielded_once = true;
Poll::Pending
}
}
}
对于 schedule_wake
的实现,我们现在给出
它将它 TaskHeader::expires_at
重新设置,使得我们在 TimerQueue
中能够在指定时间被唤醒
我们可以基于上述SyncExecutor
执行器的设计得到下面的执行流程
时钟中断 → Executor::run
的循环中先将超时任务出队执行器→ 任务需要重新设置 Timer:调用 schedule_wake
更新其 TaskHeader::expires_at
→ 执行结束,调用 TimerQueue::update(task)
,如果 expires_at
仍有更新,重新加入等待队列 → …e
#[cfg(feature = "integrated-timers")]
impl embassy_time_queue_driver::TimerQueue for TimerQueue {
fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) {
let task = waker::task_from_waker(waker);
let task = task.header();
unsafe {
let expires_at = task.expires_at.get();
task.expires_at.set(expires_at.min(at));
}
}
}
4. Embassy 如何执行
我们对 Embassy 的实现做一个 high-level 的了解,通过 blinky
案例
#[embassy_executor::main]
async fn main(_spawner: Spawner) {
let p = embassy_stm32::init(Default::default());
info!("Hello World!");
let mut led = Output::new(p.PC13, Level::High, Speed::Low);
loop {
info!("high");
led.set_high();
Timer::after_millis(300).await;
info!("low");
led.set_low();
Timer::after_millis(300).await;
}
}
为了方便我们了解 Embassy 到底干了什么,我们展开这个 macro
// Recursive expansion of main_cortex_m macro
// ===========================================
#[doc(hidden)]
async fn ____embassy_main_task(_spawner: Spawner) {
{
let p = embassy_stm32::init(Default::default());
info!("Hello World!");
let mut led = Output::new(p.PC13, Level::High, Speed::Low);
loop {
info!("high");
led.set_high();
Timer::after_millis(300).await;
info!("low");
led.set_low();
Timer::after_millis(300).await;
}
}
}
fn __embassy_main(_spawner: Spawner) -> ::embassy_executor::SpawnToken<impl Sized> {
const POOL_SIZE: usize = 1;
static POOL: ::embassy_executor::_export::TaskPoolRef = ::embassy_executor::_export::TaskPoolRef::new();
unsafe {
POOL.get::<_, POOL_SIZE>()
._spawn_async_fn(move || ____embassy_main_task(_spawner))
}
}
unsafe fn __make_static<T>(t: &mut T) -> &'static mut T {
::core::mem::transmute(t)
}
#[doc(hidden)]
#[export_name = "main"]
pub unsafe extern "C" fn __cortex_m_rt_main_trampoline() {
__cortex_m_rt_main()
}
fn __cortex_m_rt_main() -> ! {
let mut executor = ::embassy_executor::Executor::new();
let executor = unsafe { __make_static(&mut executor) };
executor.run(|spawner| {
spawner.must_spawn(__embassy_main(spawner));
})
}
简单的说,当执行交给 __cortex_m_rt_main
后,我们
- 新建一个执行器,并初始化任务池,并
-
将
____embassy_main_task
生成的Future
传递任务池(注意到这个函数被 async 修饰,调用这个函数得到的是Future
, 函数逻辑并不会被执行,这是一个 syntactic suger)这个功能通过调用
TaskPool::spawn_impl
于是调用AvailableTask::initialize_impl
实现
impl<F: Future + 'static, const N: usize> TaskPool<F, N> { fn spawn_impl<T>(&'static self, future: impl FnOnce() -> F) -> SpawnToken<T> { match self.pool.iter().find_map(AvailableTask::claim) { Some(task) => task.initialize_impl::<T>(future), None => SpawnToken::new_failed(), } } } impl<F: Future + 'static> AvailableTask<F> { fn initialize_impl<S>(self, future: impl FnOnce() -> F) -> SpawnToken<S> { unsafe { self.task.raw.poll_fn.set(Some(TaskStorage::<F>::poll)); self.task.future.write_in_place(future); let task = TaskRef::new(self.task); SpawnToken::new(task) } } }
注意到
AvailableTask
代表一个任务池中没有被初始化的任务
pub struct AvailableTask<F: Future + 'static> { task: &'static TaskStorage<F>, } pub struct TaskStorage<F: Future + 'static> { raw: TaskHeader, future: UninitCell<F>, // Valid if STATE_SPAWNED }
我们先通过任务池找到一个位置,再通过
AvailableTask::claim
函数占有这个位置,最后对这个位置进行初始化我们需要注意到初始化中,我们设置
self.task.raw
(TaskHeader
) 中的任务逻辑设置为TaskStorage::poll
,我们检查这个函数
unsafe fn poll(p: TaskRef) { let this = &*(p.as_ptr() as *const TaskStorage<F>); let future = Pin::new_unchecked(this.future.as_mut()); let waker = waker::from_task(p); let mut cx = Context::from_waker(&waker); match future.poll(&mut cx) { Poll::Ready(_) => { this.future.drop_in_place(); this.raw.state.despawn(); #[cfg(feature = "integrated-timers")] this.raw.expires_at.set(u64::MAX); } Poll::Pending => {} } // the compiler is emitting a virtual call for waker drop, but we know // it's a noop for our waker. mem::forget(waker); }
我们注意到在
TaskStorage
中保存了我们的async
逻辑,即 LED 点灯逻辑(在这个案例中),因为我们的TaskStorage
生命周期为'static
并且生成后(除非被删除)不会被移动,当然可以直接使用 unsafe 函数Pin::new_unchecked
创建Feature::poll
需要的参数self: Pin<&mut Self>
我们接下来对这个
Feature
进行poll
操作,这里就是 Rust 的异步接口了注意到当
poll
返回Poll::Pending
的时候我们不做任何事情,因为我们可以靠着中断来唤醒Executor
这里对
waker
使用mem::forget
进行优化,因为Waker
是一个接口,调用其析构函数会增加 overhead,因为我们知道这个Waker
什么都不会做,我们使用mem::forget
不调用其析构函数 -
最后调用
Executor::run
来启动执行器,注意到Spawner
是对执行器的安全抽象,我们将不安全的代码封装在闭包外,闭包内传递这个抽象 (init: impl FnOnce(Spawner)
)
/// Handle to spawn tasks into an executor. /// /// This Spawner can spawn any task (Send and non-Send ones), but it can /// only be used in the executor thread (it is not Send itself). /// /// If you want to spawn tasks from another thread, use [SendSpawner]. #[derive(Copy, Clone)] pub struct Spawner { executor: &'static raw::Executor, not_send: PhantomData<*mut ()>, } impl Executor { /// Get a spawner that spawns tasks in this executor. /// /// It is OK to call this method multiple times to obtain multiple /// `Spawner`s. You may also copy `Spawner`s. pub fn spawner(&'static self) -> super::Spawner { super::Spawner::new(self) } } impl Executor { pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! { init(self.inner.spawner()); // do polling in a loop... } }
这里的 init 逻辑就是为执行器增加任务
executor.run(|spawner| { spawner.must_spawn(__embassy_main(spawner)); })
这里的
Spawner
和SpawnToken
是 Embassy 对执行器和任务的安全抽象
pub fn must_spawn<S>(&self, token: SpawnToken<S>) { unwrap!(self.spawn(token)); } pub fn spawn<S>(&self, token: SpawnToken<S>) -> Result<(), SpawnError> { let task = token.raw_task; mem::forget(token); match task { Some(task) => { unsafe { self.executor.spawn(task) }; Ok(()) } None => Err(SpawnError::Busy), } } pub struct SpawnToken<S> { raw_task: Option<raw::TaskRef>, phantom: PhantomData<*mut S>, }
到此为止,Embassy 就能开始执行任务了
5. 总结
Embassy 是一个 Rust 异步运行时框架,高度结合了 Rust 语言对异步语法的优点。它借助编译器生成的状态机保存任务执行状态。它不是一个ROTS,并且不支持抢占式调度和优先级调度。由于不要求抢占式调度,它的执行是同步的(任务主动使用 .await
, yeild
让出CPU,此时由Rust编译器生成状态保存的代码),不使用操作系统中常用的方法(上下文切换,独立栈)的方法保存任务进度(因为没有必要)。这大大减少了保存和恢复上下文的损耗,使得性能的以提高。在 Dion 的测试中,STM32F446 微控制器在 Embassy 的平均中断时间较 FreeRTOS 减少 51.0%,平均中断延迟较 FreeRTOS 减少 24.8%,平均线程时间较 FreeRTOS 减少 28.1%。
但是经过目前的理解和探索,并没有找到 Embassy 对抢占式优先级调度的证据,暂且认为它不支持抢占式优先级调度,这可能使得在一些场景下不太适用。
Top comments (0)