DragonOS 等待队列机制
概述
DragonOS 等待队列是基于 Waiter/Waker 模式的同步原语,用于进程阻塞等待和唤醒。通过原子操作解决”唤醒丢失”问题。
核心设计
Waiter/Waker 模式
每次等待操作创建一对对象:
Waiter:线程本地持有(!Send/!Sync),用于等待
Waker:跨 CPU 共享(Arc),用于唤醒
关键结构
struct WaitQueue {
inner: SpinLock<VecDeque<Arc<Waker>>>,
num_waiters: AtomicU32, // 快速路径检查
}
struct Waker {
has_woken: AtomicBool, // 唤醒标记
target: Weak<PCB>, // 目标进程
}
等待与唤醒流程
等待流程
检查条件,满足则直接返回
创建 Waiter/Waker 对,加锁将 Waker 加入队列
再次检查条件(处理竞态)
执行 before_sleep 钩子(通常释放锁)
调用 Waiter::wait() 阻塞当前进程
// 等待条件满足
wait_queue.wait_event_interruptible(
|| condition_is_met(),
Some(|| unlock_mutex()),
)?;
唤醒流程
┌─────────────────┐
│ wake_one() │
└─────────────────┘
│
▼
┌─────────────────┐
│ 检查队列是否空 │ (快速路径)
└─────────────────┘
│
空 │ │ 非空
▼ ▼
返回 加锁队列
│
▼
┌─────────────────┐
│ 取出一个 Waker │
└─────────────────┘
│
▼
┌─────────────────┐
│ 释放锁后唤醒 │
└─────────────────┘
│
▼
┌─────────────────┐
│设置 has_woken │
│并唤醒进程 │
└─────────────────┘
内存序保证
唤醒方:使用 Release 语义设置
has_woken等待方:使用 Acquire 语义读取
has_woken保证唤醒前的所有操作对等待方可见
核心实现
Waker::wake
pub fn wake(&self) -> bool {
// 原子设置唤醒标记
if self.has_woken.swap(true, Ordering::Release) {
return false; // 已被唤醒
}
// 唤醒目标进程
if let Some(pcb) = self.target.upgrade() {
ProcessManager::wakeup(&pcb);
}
true
}
block_current
fn block_current(waiter: &Waiter, interruptible: bool) -> Result<(), SystemError> {
loop {
// 快速路径:已被唤醒
if waiter.waker.consume_wake() {
return Ok(());
}
// 禁用中断,标记进程为睡眠状态
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(interruptible)?;
drop(irq_guard);
// 调度出去
schedule(SchedMode::SM_NONE);
// 检查信号(可中断等待)
if interruptible && Signal::signal_pending_state(...) {
return Err(SystemError::ERESTARTSYS);
}
}
}
主要 API
// 等待事件
wait_queue.wait_event_interruptible(|| condition_met, Some(|| unlock()))?;
// 唤醒
wait_queue.wakeup(None); // 唤醒一个
wait_queue.wake_all(); // 唤醒所有
// 便利方法
wait_queue.sleep_unlock_spinlock(guard)?; // 睡眠并释放锁
使用示例
信号量
struct Semaphore {
counter: AtomicU32,
wait_queue: WaitQueue,
}
impl Semaphore {
fn down(&self) -> Result<(), SystemError> {
loop {
// 尝试获取
if self.counter.fetch_sub(1, Acquire) > 0 {
return Ok(());
}
// 失败,等待
self.counter.fetch_add(1, Release);
self.wait_queue.wait_event_interruptible(
|| self.counter.load(Acquire) > 0,
None,
)?;
}
}
fn up(&self) {
self.counter.fetch_add(1, Release);
self.wait_queue.wakeup(None);
}
}
关键优势
无唤醒丢失:原子
has_woken标记确保简单易用:统一的 API,无需手动管理状态
高性能:快速路径优化,最小化锁持有时间
多核友好:支持并发唤醒,减少锁竞争
这套机制为 DragonOS 的同步原语(信号量、条件变量、futex 等)提供了基础支撑。