备注
AI Translation Notice
This document was automatically translated by hunyuan-turbos-latest model, for reference only.
Source document: kernel/sched/wait_queue.md
Translation time: 2026-01-13 12:52:37
Translation model:
hunyuan-turbos-latest
Please report issues via Community Channel
DragonOS Wait Queue Mechanism
1. Overview
The DragonOS WaitQueue is a process synchronization primitive based on the Waiter/Waker pattern, used for process blocking and waking. Through atomic operations and a carefully designed wait-wake protocol, it completely solves the “wake loss” problem.
1.1 Core Features
Zero wake loss: Ensures no wake signals are missed through the “register before check” mechanism
Single-use Waiter/Waker: Creates a pair of objects for each wait to avoid state reuse issues
wait_until as core: Uses the
wait_untilAPI that returns resources as the foundation, with other APIs built upon itAtomic wait and acquire: Supports atomically “waiting for condition and acquiring resource” (e.g., lock Guard)
Multi-core friendly: Wakers can be shared across CPUs, supporting concurrent wakeups
Signal awareness: Supports both interruptible and uninterruptible wait modes
2. Core Design
2.1 Waiter/Waker Pattern
The wait queue adopts a producer-consumer pattern, separating waiters and wakers:
pub struct Waiter {
waker: Arc<Waker>,
_nosend: PhantomData<Rc<()>>, // 标记为 !Send
}
pub struct Waker {
state: AtomicU8, // 唤醒状态机
target: Weak<PCB>, // 目标进程
}
Key Features:
Waiter: Thread-local held (
!Send/!Sync), can only wait on the thread that created itWaker: Shared through
Arc, can be passed and woken across CPUs/threadsSingle-use design: Creates new Waiter/Waker pairs for each wait to avoid state pollution
State machine handshake:
Idle/Sleeping/Notified/Closedfour states ensure no wake loss during concurrent wakeups
2.2 Wait Queue Structure
pub struct WaitQueue {
inner: SpinLock<InnerWaitQueue>,
num_waiters: AtomicU32, // 快速路径检查
}
struct InnerWaitQueue {
dead: bool, // 队列失效标志
waiters: VecDeque<Arc<Waker>>, // 等待者队列(FIFO)
}
Design Highlights:
Fast path optimization:
num_waitersatomic counter allows lock-free checking if queue is emptyFIFO order: Uses
VecDequeto ensure fairnessDeath marker:
deadflag for cleanup during resource destructionSpinlock protection: Uses
SpinLockinstead ofMutex, avoiding recursive dependencies
3. Core API: wait_until Family
3.1 wait_until: Core Wait Primitive
wait_until is the core of the wait queue mechanism, with all other wait methods built upon it:
pub fn wait_until<F, R>(&self, cond: F) -> R
where
F: FnMut() -> Option<R>,
{
// 1. 快速路径:先检查条件
if let Some(res) = cond() {
return res;
}
// 2. 创建唯一的 Waiter/Waker 对
let (waiter, waker) = Waiter::new_pair();
loop {
// 3. 先注册 waker 到队列(关键!)
self.register_waker(waker.clone());
// 4. 再检查条件(防止唤醒丢失)
if let Some(res) = cond() {
self.remove_waker(&waker);
return res;
}
// 5. 睡眠等待被唤醒
waiter.wait();
// 6. 被唤醒后循环继续(可能是伪唤醒或竞争失败)
}
}
Key Design Concepts:
Enqueue before check: Ensures any wake between condition check and sleep is properly handled
Returns resource rather than boolean:
Option<R>rather thanbool, allowing direct return of acquired resources (e.g., lock Guard)Loop retry: May fail competition after wakeup, requiring re-enqueue and recheck
Single Waiter/Waker pair: Only creates one pair for entire wait process, simplifying lifecycle management
3.2 wait_until Variants
// 不可中断等待
pub fn wait_until<F, R>(&self, cond: F) -> R
// 可中断等待(可被信号中断)
pub fn wait_until_interruptible<F, R>(&self, cond: F)
-> Result<R, SystemError>
// 带超时的可中断等待
pub fn wait_until_timeout<F, R>(&self, cond: F, timeout: Duration)
-> Result<R, SystemError>
Return Value Semantics:
Ok(R): Condition met, returns acquired resourceErr(ERESTARTSYS): Interrupted by signalErr(EAGAIN_OR_EWOULDBLOCK): Timeout
Cancellation Semantics:
Before returning due to timeout or signal interruption,
closethe correspondingWakerand recheck conditionThis prevents wake loss due to interleaving of cancellation and concurrent wakeup
3.3 Why wait_until is Better than wait_event
Traditional wait_event series APIs only return a boolean:
// 传统方式(存在竞态)
wait_event(|| lock.is_available());
let guard = lock.acquire(); // ❌ 可能失败!另一个线程可能抢先获取
While wait_until can atomically “wait and acquire”:
// wait_until 方式(无竞态)
let guard = wait_until(|| lock.try_acquire()); // ✅ 原子获取
Key Advantages:
Eliminates race window between “check and acquire”
More concise code with clearer semantics
Better performance (reduces one atomic operation)
4. Wait and Wake Flows
4.1 Detailed Wait Flow
┌─────────────────────┐
│ 调用 wait_until │
└──────────┬──────────┘
│
↓
┌──────────────┐
│ 快速路径: │
│ 检查条件 │
└──────┬───────┘
│
满足? ────────┐
│ 否 │ 是
↓ ↓
┌──────────┐ 返回结果
│ 创建 │
│ Waiter/ │
│ Waker │
└────┬─────┘
│
↓ [循环开始]
┌──────────────┐
│ 1. 注册 waker │ ← 关键:先入队
│ 到队列 │
└──────┬───────┘
│
↓
┌──────────────┐
│ 2. 再次检查 │ ← 防止唤醒丢失
│ 条件 │
└──────┬───────┘
│
满足? ────────┐
│ 否 │ 是
↓ ↓
┌──────────┐ ┌──────────┐
│ 检查信号 │ │ 移除 │
│ (可中断) │ │ waker │
└────┬─────┘ └────┬─────┘
│ │
有信号? ──┐ ↓
│ 否 │ 是 返回结果
↓ ↓
┌────┐ 返回
│睡眠│ ERESTARTSYS
└─┬──┘
│
↓ [被唤醒]
┌──────────────┐
│ 检查信号 │
│ (可中断) │
└──────┬───────┘
│
↓ [循环继续]
Key Points Explained:
Enqueue before check:
If check before enqueue, wake signal may be lost if condition becomes true and wakeup occurs between false check return and enqueue
Enqueue first ensures any subsequent wake can find our waker
Necessity of loop retry:
Wakeup doesn’t guarantee condition is met (could be spurious wakeup)
Even if condition was met, it may become unmet again due to competition
Thus need to re-enqueue and recheck
Advantages of single-use Waker:
Avoids complex lifecycle management
Uses state machine to consume notifications (
Notified -> Idle), preventing duplicate deliveryWaker is re-registered in loop but uses same
Arc<Waker>
4.2 Wake Flow
wake_one: Wake One Waiter
pub fn wake_one(&self) -> bool {
// 快速路径:队列为空
if self.is_empty() {
return false;
}
loop {
// 从队列头部取出一个 waker
let next = {
let mut guard = self.inner.lock_irqsave();
let waker = guard.waiters.pop_front();
if waker.is_some() {
self.num_waiters.fetch_sub(1, Ordering::Release);
}
waker
};
let Some(waker) = next else { return false };
// 释放锁后再唤醒(减少锁持有时间)
if waker.wake() {
return true; // 成功唤醒
}
// waker 已失效(进程已退出),继续尝试下一个
}
}
Design Points:
FIFO order: Takes from queue head to ensure fairness
Wake outside lock: Releases lock before calling
waker.wake(), reducing lock contentionAuto skip invalid waker: If target process exited, automatically tries next
wake_all: Wake All Waiters
pub fn wake_all(&self) -> usize {
// 快速路径:队列为空
if self.is_empty() {
return 0;
}
// 一次性取出所有 waker(减少锁持有时间)
let wakers = {
let mut guard = self.inner.lock_irqsave();
let mut drained = VecDeque::new();
mem::swap(&mut guard.waiters, &mut drained);
self.num_waiters.store(0, Ordering::Release);
drained
};
// 释放锁后逐个唤醒
let mut woken = 0;
for waker in wakers {
if waker.wake() {
woken += 1;
}
}
woken
}
Design Points:
Batch take: Clears queue at once, minimizing lock hold time
Wake outside lock: Wakes individually after releasing lock, allowing immediate competition
Returns actual wake count: Distinguishes between “wake request” and “actual wake”
4.3 Waker Wake Mechanism
impl Waker {
pub fn wake(&self) -> bool {
// 原子状态机:Sleep 时真正唤醒;Idle 时只记账通知
// Sleeping -> Notified: 触发 ProcessManager::wakeup
// Idle -> Notified: 记录提前唤醒
}
pub fn close(&self) {
// 关闭 waker,防止后续唤醒
self.state.store(STATE_CLOSED, Ordering::Release);
}
fn consume_notification(&self) -> bool {
// 消费通知(用于 block_current),Notified -> Idle
...
}
}
Key Features:
Atomic state machine:
statemanagesIdle/Sleeping/Notified/ClosedWeak reference to target process: Uses
Weak<PCB>to avoid circular references, auto-cleanup when process exitsMemory ordering guarantees: Release/Acquire semantics ensure modifications before wake are visible to woken process
4.4 Blocking Current Process
fn block_current(waiter: &Waiter, interruptible: bool) -> Result<(), SystemError> {
loop {
// 快速路径:已被唤醒
if waiter.waker.consume_notification() {
return Ok(());
}
// 禁用中断,进入临界区
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
// 进入睡眠握手(处理"先唤后睡"竞态)
match waiter.waker.prepare_sleep() {
WakerSleepState::Notified | WakerSleepState::Closed => {
drop(irq_guard);
return Ok(());
}
WakerSleepState::Sleeping => {}
}
// 标记进程为睡眠状态
ProcessManager::mark_sleep(interruptible)?;
// 再次消费通知,避免 prepare_sleep 与 mark_sleep 的竞态
if waiter.waker.consume_notification() {
drop(irq_guard);
return Ok(());
}
drop(irq_guard); // 恢复中断
// 调度到其他进程
schedule(SchedMode::SM_NONE);
// 被唤醒后,检查信号(可中断模式)
if interruptible && Signal::signal_pending_state(...) {
return Err(SystemError::ERESTARTSYS);
}
}
}
Key Design:
Handshake mechanism:
prepare_sleep()first transitions state toSleeping, then consumes notification aftermark_sleepInterrupt protection:
mark_sleepmust be executed with interrupts disabledSignal check: After being scheduled back, checks for pending signals
5. Memory Ordering and Correctness
5.1 Memory Ordering Guarantees
The wait queue uses following memory orderings to ensure correctness:
Operation |
Memory Ordering |
Purpose |
|---|---|---|
|
Release |
Ensures modifications before wake are visible to woken |
|
Acquire |
Ensures visibility of all modifications before wake |
|
Release |
Ensures visibility of enqueue operations |
|
Acquire/Release |
Synchronizes counter modifications |
5.2 Happens-Before Relationships
线程 A(唤醒方) 线程 B(等待方)
│ │
│ 修改共享数据 │
│ │
↓ ↓
wake() (Release) register_waker()
│ │
│ happens-before │
│ ─────────────────────────→ │
│ ↓
│ wait() (Acquire)
│ │
│ ↓
│ 观察到共享数据修改
Guarantees:
All modifications by waker before calling
wake()are visible to waiterThis forms the basis for correct synchronization
5.3 Race-Free Proof
Case 1: Enqueue before wake (normal flow)
时间轴:
T1: 等待方 register_waker()
T2: 等待方检查条件,返回 false
T3: 等待方准备睡眠
T4: 唤醒方 wake_one() ← waker 在队列中,正常唤醒
T5: 等待方被唤醒
Case 2: Wake before enqueue (race to handle)
时间轴:
T1: 等待方检查条件,返回 false
T2: 唤醒方修改条件为 true
T3: 唤醒方 wake_one() ← waker 还未入队,唤醒失败
T4: 等待方 register_waker()
T5: 等待方再次检查条件 ← 检测到 true,不会睡眠!
Key Point: Through “check after enqueue”, even if wake occurs before enqueue, second check can detect condition is already met.
6. Compatible API: wait_event Family
For backward compatibility, provides wait_event series APIs implemented based on wait_until:
// 可中断等待,返回 Result<(), SystemError>
pub fn wait_event_interruptible<F, B>(
&self,
mut cond: F,
before_sleep: Option<B>,
) -> Result<(), SystemError>
where
F: FnMut() -> bool,
B: FnMut(),
{
self.wait_until_impl(
|| if cond() { Some(()) } else { None },
true,
None,
before_sleep,
)
}
// 不可中断等待
pub fn wait_event_uninterruptible<F, B>(
&self,
mut cond: F,
before_sleep: Option<B>,
) -> Result<(), SystemError>
where
F: FnMut() -> bool,
B: FnMut(),
{
self.wait_until_impl(
|| if cond() { Some(()) } else { None },
false,
None,
before_sleep,
)
}
// 带超时的可中断等待
pub fn wait_event_interruptible_timeout<F>(
&self,
mut cond: F,
timeout: Option<Duration>,
) -> Result<(), SystemError>
where
F: FnMut() -> bool,
{
self.wait_until_impl(
|| if cond() { Some(()) } else { None },
true,
timeout,
None::<fn()>,
)
}
before_sleep Hook:
Executed after enqueue and before sleep
Typical use: release locks to avoid sleeping while holding them
Example:
wait_event_interruptible(|| cond(), Some(|| drop(guard)))
7. Convenience Methods
7.1 Sleep and Release Lock
// 释放 SpinLock 并睡眠(可中断)
pub fn sleep_unlock_spinlock<T>(&self, to_unlock: SpinLockGuard<T>)
-> Result<(), SystemError>
// 释放 Mutex 并睡眠(可中断)
pub fn sleep_unlock_mutex<T>(&self, to_unlock: MutexGuard<T>)
-> Result<(), SystemError>
// 释放 SpinLock 并睡眠(不可中断)
pub fn sleep_uninterruptible_unlock_spinlock<T>(&self, to_unlock: SpinLockGuard<T>)
// 释放 Mutex 并睡眠(不可中断)
pub fn sleep_uninterruptible_unlock_mutex<T>(&self, to_unlock: MutexGuard<T>)
Usage Example:
let guard = lock.lock();
// 检查条件
if !condition_met() {
// 需要等待,释放锁并睡眠
wait_queue.sleep_unlock_spinlock(guard)?;
// 被唤醒后需要重新获取锁
}
7.2 Queue Lifecycle Management
// 标记队列失效,唤醒并清空所有等待者
pub fn mark_dead(&self) {
let mut drained = VecDeque::new();
{
let mut guard = self.inner.lock_irqsave();
guard.dead = true;
mem::swap(&mut guard.waiters, &mut drained);
self.num_waiters.store(0, Ordering::Release);
}
for w in drained {
w.wake();
w.close();
}
}
// 检查队列是否为空
pub fn is_empty(&self) -> bool {
self.num_waiters.load(Ordering::Acquire) == 0
}
// 获取等待者数量
pub fn len(&self) -> usize {
self.num_waiters.load(Ordering::Acquire) as usize
}
8. Event Wait Queue
Besides regular wait queues, also provides event mask-based wait queues:
pub struct EventWaitQueue {
wait_list: SpinLock<Vec<(u64, Arc<Waker>)>>,
}
impl EventWaitQueue {
// 等待特定事件
pub fn sleep(&self, events: u64)
// 等待特定事件并释放锁
pub fn sleep_unlock_spinlock<T>(&self, events: u64, to_unlock: SpinLockGuard<T>)
// 唤醒等待任意匹配事件的线程(位掩码 AND)
pub fn wakeup_any(&self, events: u64) -> usize
// 唤醒等待精确匹配事件的线程(相等比较)
pub fn wakeup(&self, events: u64) -> usize
// 唤醒所有等待者
pub fn wakeup_all(&self)
}
Use Cases:
Waiting for multiple event types (e.g.,
READABLE | WRITABLE)Waking specific waiters by event type
Example: socket poll/select implementation
9. Timeout Support
9.1 Timeout Mechanism
pub fn wait_until_timeout<F, R>(&self, cond: F, timeout: Duration)
-> Result<R, SystemError>
where
F: FnMut() -> Option<R>,
{
self.wait_until_impl(cond, true, Some(timeout), None::<fn()>)
}
Implementation Principle:
Calculate deadline:
deadline = now + timeoutCreate timer that wakes waiter upon expiration
After wakeup, check if timeout occurred:
If timer triggered, return
EAGAIN_OR_EWOULDBLOCKIf condition met, cancel timer and return result
9.2 TimeoutWaker
pub struct TimeoutWaker {
waker: Arc<Waker>,
}
impl TimerFunction for TimeoutWaker {
fn run(&mut self) -> Result<(), SystemError> {
// 定时器到期,唤醒等待者
self.waker.wake();
Ok(())
}
}
Key Design:
Timer wakes through
Waker::wake()rather than directly waking PCBThis allows
Waiter::wait()to correctly observe wake stateUses same mechanism as normal wake for consistency
10. Usage Examples
10.1 Semaphore Implementation
struct Semaphore {
counter: AtomicU32,
wait_queue: WaitQueue,
}
impl Semaphore {
fn down(&self) -> Result<(), SystemError> {
// 使用 wait_until 直接获取信号量
self.wait_queue.wait_until(|| {
let old = self.counter.load(Acquire);
if old > 0 {
// 尝试原子递减
if self.counter.compare_exchange(
old, old - 1, Acquire, Relaxed
).is_ok() {
return Some(()); // 成功获取
}
}
None // 获取失败,继续等待
});
Ok(())
}
fn up(&self) {
self.counter.fetch_add(1, Release);
self.wait_queue.wake_one();
}
}
Advantages:
Uses
wait_untilto ensure atomic “wait and acquire”Avoids race window between “check and acquire”
Clean and concise code
10.2 Condition Variable Implementation
struct CondVar {
wait_queue: WaitQueue,
}
impl CondVar {
fn wait<T>(&self, guard: MutexGuard<T>) -> Result<MutexGuard<T>, SystemError> {
let mutex = guard.mutex();
// 使用 before_sleep 钩子释放锁
let mut guard = Some(guard);
self.wait_queue.wait_event_interruptible(
|| false, // 等待 notify
Some(|| {
if let Some(g) = guard.take() {
drop(g); // 释放锁
}
}),
)?;
// 重新获取锁
mutex.lock_interruptible()
}
fn notify_one(&self) {
self.wait_queue.wake_one();
}
fn notify_all(&self) {
self.wait_queue.wake_all();
}
}
10.3 RwSem Integration
impl<T: ?Sized> RwSem<T> {
pub fn read(&self) -> RwSemReadGuard<'_, T> {
// 直接返回 Guard,无竞态
self.queue.wait_until(|| self.try_read())
}
pub fn write(&self) -> RwSemWriteGuard<'_, T> {
self.queue.wait_until(|| self.try_write())
}
pub fn read_interruptible(&self) -> Result<RwSemReadGuard<'_, T>, SystemError> {
self.queue.wait_until_interruptible(|| self.try_read())
}
}
Why this design:
try_read()returnsOption<Guard>, perfectly matchingwait_untilrequirementsOne line of code implements complete “wait and acquire” logic
Compiler ensures type safety, preventing forgotten checks
10.4 Timed Wait
fn wait_with_timeout(queue: &WaitQueue, timeout_ms: u64) -> Result<(), SystemError> {
let timeout = Duration::from_millis(timeout_ms);
queue.wait_event_interruptible_timeout(
|| condition_met(),
Some(timeout),
)
}
11. Performance Characteristics
11.1 Fast Path Optimization
No waiters:
is_empty()requires only one atomic read, no lock contentionQuick check:
wait_untilfirst checks condition to avoid unnecessary enqueueWake outside lock: Wake operations occur after releasing queue lock, reducing lock hold time
11.2 Scalability
FIFO queue: Ensures fairness, prevents starvation
Batch wake:
wake_all()takes all wakers at once, minimizing lock contentionCross-CPU wake: Wakers can wake across different CPUs without lock synchronization
11.3 Memory Overhead
Waiter: Stack allocated, no heap involvement
Waker: Shared through
Arc, one per waiterQueue: Only occupies space when waiters exist, minimal overhead for empty queue
12. Comparison and Evolution
12.1 Comparison with Traditional Implementations
Feature |
Traditional wait_event |
DragonOS wait_until |
|---|---|---|
API Return |
|
|
Atomic Acquire |
Not supported (manual) |
Native support |
Wake Loss |
Requires careful handling |
Design guarantees none |
Race Window |
Check-acquire has window |
No window |
Code Complexity |
High (manual state management) |
Low (compiler guaranteed) |
Performance |
May require multiple atomics |
Minimizes atomics |
12.2 Design Evolution
Old Design (Problematic):
// ❌ 可能存在唤醒丢失
loop {
if condition() {
return;
}
register_waker();
sleep();
}
Current Design (Correct):
// ✅ 无唤醒丢失
loop {
register_waker(); // 先入队
if condition() { // 再检查
remove_waker();
return;
}
sleep();
}
13. Best Practices
13.1 Usage Recommendations
Prefer wait_until: Compared to
wait_event, it provides stronger guarantees and cleaner codeLeverage Option return value: Directly return acquired resources to avoid secondary acquisition
Properly use interruptible variants: User-space processes should use
*_interruptibleto avoid inability to terminateProperly handle timeouts: Distinguish between
ERESTARTSYS(signal) andEAGAIN_OR_EWOULDBLOCK(timeout)
13.2 Pitfalls to Avoid
// ❌ 错误:分离检查和获取
if condition() {
let guard = acquire(); // 可能失败!
}
// ✅ 正确:原子等待并获取
let guard = wait_until(|| try_acquire());
// ❌ 错误:忘记处理信号
let result = wait_queue.wait_event_interruptible(|| cond(), None);
// 未检查 result
// ✅ 正确:正确处理错误
let result = wait_queue.wait_event_interruptible(|| cond(), None)?;
13.3 Debugging Suggestions
Use
wait_queue.len()to check waiter countMonitor
statestate machine changes (via logs)Check for processes blocked for extended periods (possible wake logic errors)
14. Implementation Principle Summary
┌─────────────────────────────────────┐
│ WaitQueue │
│ ┌───────────────────────────────┐ │
│ │ inner: SpinLock<InnerQueue> │ │
│ │ ├─ dead: bool │ │
│ │ └─ waiters: VecDeque │ │
│ └───────────────────────────────┘ │
│ ┌───────────────────────────────┐ │
│ │ num_waiters: AtomicU32 │ │
│ └───────────────────────────────┘ │
└─────────────────────────────────────┘
│
┌───────────┴───────────┐
↓ ↓
┌────────────────────┐ ┌────────────────────┐
│ Waiter │ │ Waker │
│ ┌──────────────┐ │ │ ┌──────────────┐ │
│ │ waker: Arc │──┼──┼─→│ state: │ │
│ │ │ │ │ │ AtomicBool │ │
│ └──────────────┘ │ │ └──────────────┘ │
│ ┌──────────────┐ │ │ ┌──────────────┐ │
│ │ _nosend │ │ │ │ target: │ │
│ │ PhantomData │ │ │ │ Weak<PCB> │ │
│ └──────────────┘ │ │ └──────────────┘ │
└────────────────────┘ └────────────────────┘
(!Send) (Send+Sync)
核心流程:
┌──────────────────────────────────────────────────────────┐
│ wait_until(cond) │
│ │
│ 1. 快速路径: if cond() → return │
│ │
│ 2. 创建 (waiter, waker) 对 │
│ │
│ 3. loop { │
│ ├─ register_waker(waker) ← 先入队 │
│ ├─ if cond() → return ← 再检查 │
│ ├─ waiter.wait() ← 睡眠等待 │
│ └─ [被唤醒,循环继续] │
│ } │
└──────────────────────────────────────────────────────────┘
唤醒策略:
┌────────────────────┐
│ wake_one() │ → 取出队首 waker → 唤醒一个进程
└────────────────────┘
┌────────────────────┐
│ wake_all() │ → 取出所有 waker → 唤醒所有进程
└────────────────────┘
内存序保证:
唤醒方: wake() (Release)
│
│ happens-before
↓
等待方: wait() (Acquire)
This design achieves a zero wake loss, high-performance, easy-to-use wait queue mechanism through the core mechanisms of “enqueue before check”, single-use Waiter/Waker pattern, and atomic memory ordering guarantees, providing a solid foundation for various synchronization primitives in DragonOS.