RwSem 读写信号量
1. 简介
RwSem (Read-Write Semaphore) 是一种可睡眠的读写锁,用于保护进程上下文中的共享数据。与自旋锁实现的 RwLock 不同,RwSem 在无法获取锁时会让出 CPU 并进入睡眠状态,而不是忙等待。
1.1 适用场景
特性 |
RwLock (spinlock) |
RwSem (semaphore) |
|---|---|---|
上下文 |
进程 / 中断 |
仅进程上下文 |
等待方式 |
忙等待 (自旋) |
睡眠 (调度) |
适用场景 |
短时间临界区 |
长时间临界区 |
中断上下文 |
可用 |
不可用 |
重要: RwSem 不能在中断上下文中使用,因为它可能会睡眠。
2. 核心设计
2.1 锁状态表示
RwSem 使用一个原子整数 (AtomicUsize) 维护锁状态,通过位域编码实现高效的状态管理:
64位系统状态位布局:
+--------+--------------+------------+----------+------------------+
| Bit 63 | Bit 62 | Bit 61 | Bit 60 | Bits 59..0 |
+--------+--------------+------------+----------+------------------+
| WRITER | UPGRADEABLE | BEING | OVERFLOW | READER COUNT |
| | READER | UPGRADED | DETECT | (读者计数) |
+--------+--------------+------------+----------+------------------+
字段 |
位置 |
说明 |
|---|---|---|
|
Bit 63 |
写者锁,1 表示有写者持有锁 |
|
Bit 62 |
可升级读者锁,1 表示有可升级读者持有锁 |
|
Bit 61 |
升级进行中标志,1 表示正在从可升级读者升级到写者 |
|
Bit 60 |
读者溢出检测位,当读者数量达到 2^60 时置位 |
|
Bits 59…0 |
当前激活的读者数量 |
const READER: usize = 1;
const WRITER: usize = 1 << (usize::BITS - 1); // Bit 63
const UPGRADEABLE_READER: usize = 1 << (usize::BITS - 2); // Bit 62
const BEING_UPGRADED: usize = 1 << (usize::BITS - 3); // Bit 61
const MAX_READER: usize = 1 << (usize::BITS - 4); // Bit 60
2.2 三种锁模式
RwSem 支持三种锁模式,提供不同级别的访问权限:
读锁(Read Lock)
多个读者可以并发持有读锁
提供对数据的只读访问(
&T)与写者和正在升级的可升级读者互斥
写锁(Write Lock)
只有一个写者可以持有写锁
提供对数据的可变访问(
&mut T)与所有其他锁模式互斥
可升级读锁(Upgradeable Read Lock)
只有一个可升级读者可以持有可升级读锁
初始提供只读访问(
&T)可以原子地升级到写锁
与写者和其他可升级读者互斥,但可与普通读者共存
2.3 等待队列设计
pub struct RwSem<T: ?Sized> {
lock: AtomicUsize, // 锁状态
queue: WaitQueue, // 单一等待队列
val: UnsafeCell<T>, // 被保护的数据
}
设计特点:
使用单一
WaitQueue管理所有等待者(读者、写者、可升级读者)利用
WaitQueue.wait_until()的原子语义确保正确性通过不同的唤醒策略实现公平性和性能平衡
3. 锁获取机制
3.1 读锁获取
pub fn read(&self) -> RwSemReadGuard<'_, T> {
self.queue.wait_until(|| self.try_read())
}
pub fn try_read(&self) -> Option<RwSemReadGuard<'_, T>> {
let lock = self.lock.fetch_add(READER, Acquire);
if lock & (WRITER | BEING_UPGRADED | MAX_READER) == 0 {
// 无写者、无升级中的可升级读者、未溢出 → 成功
Some(RwSemReadGuard {
inner: self,
_nosend: PhantomData,
})
} else {
// 有阻塞因素,回滚计数
self.lock.fetch_sub(READER, Release);
None
}
}
获取条件:
无写者(
WRITER位为 0)无正在升级的可升级读者(
BEING_UPGRADED位为 0)读者数量未溢出(
MAX_READER位为 0)
关键设计:
先乐观地递增读者计数(
fetch_add)再检查阻塞条件
失败则回滚计数(
fetch_sub)这种”先递增后检查”的方式在无竞争场景下性能更优
3.2 写锁获取
pub fn write(&self) -> RwSemWriteGuard<'_, T> {
self.queue.wait_until(|| self.try_write())
}
pub fn try_write(&self) -> Option<RwSemWriteGuard<'_, T>> {
if self.lock.compare_exchange(0, WRITER, Acquire, Relaxed).is_ok() {
Some(RwSemWriteGuard {
inner: self,
_nosend: PhantomData,
})
} else {
None
}
}
获取条件:
锁状态必须为 0(无读者、无写者、无可升级读者)
使用 CAS 操作保证原子性
3.3 可升级读锁获取
pub fn upread(&self) -> RwSemUpgradeableGuard<'_, T> {
self.queue.wait_until(|| self.try_upread())
}
pub fn try_upread(&self) -> Option<RwSemUpgradeableGuard<'_, T>> {
let lock = self.lock.fetch_or(UPGRADEABLE_READER, Acquire)
& (WRITER | UPGRADEABLE_READER);
if lock == 0 {
// 无写者且无其他可升级读者 → 成功
return Some(RwSemUpgradeableGuard {
inner: self,
_nosend: PhantomData,
});
} else if lock == WRITER {
// 有写者,需要回滚
self.lock.fetch_sub(UPGRADEABLE_READER, Release);
}
// lock == UPGRADEABLE_READER 表示已有其他可升级读者,
// fetch_or 没有改变状态,无需回滚
None
}
获取条件:
无写者(
WRITER位为 0)无其他可升级读者(
UPGRADEABLE_READER位为 0)可与普通读者共存
关键设计:
使用
fetch_or原子设置可升级读者位通过检查返回的旧值判断是否成功
只在设置了
WRITER位但失败时才需要回滚
3.4 wait_until 核心机制
所有阻塞式的锁获取都依赖 WaitQueue.wait_until() 方法:
pub fn wait_until<F, R>(&self, cond: F) -> R
where
F: FnMut() -> Option<R>,
{
// 1. 快速路径:先检查条件
if let Some(res) = cond() {
return res;
}
// 2. 创建一对 Waiter/Waker
let (waiter, waker) = Waiter::new_pair();
loop {
// 3. 先注册 waker 到等待队列
self.register_waker(waker.clone());
// 4. 再检查条件(防止唤醒丢失)
if let Some(res) = cond() {
self.remove_waker(&waker);
return res;
}
// 5. 睡眠等待被唤醒
waiter.wait();
// 6. 被唤醒后循环继续(可能是伪唤醒)
}
}
关键点:
先注册 waker,再检查条件,确保不会错过任何唤醒信号
即使被唤醒,也可能获取锁失败(公平竞争),需要继续循环
这种设计避免了复杂的唤醒丢失问题
4. 锁释放与唤醒策略
4.1 读锁释放
impl<T: ?Sized> Drop for RwSemReadGuard<'_, T> {
fn drop(&mut self) {
// 原子递减读者计数
if self.inner.lock.fetch_sub(READER, Release) == READER {
// 这是最后一个读者,唤醒一个等待者
self.inner.queue.wake_one();
}
}
}
唤醒策略:
只有最后一个读者释放时才唤醒
唤醒一个等待者(可能是写者或可升级读者)
避免不必要的唤醒开销
4.2 写锁释放
impl<T: ?Sized> Drop for RwSemWriteGuard<'_, T> {
fn drop(&mut self) {
// 清除写者位
self.inner.lock.fetch_and(!WRITER, Release);
// 唤醒所有等待者
self.inner.queue.wake_all();
}
}
唤醒策略:
唤醒所有等待者
允许多个读者并发获取锁
只有一个写者能成功(通过 CAS 竞争)
4.3 可升级读锁释放
impl<T: ?Sized> Drop for RwSemUpgradeableGuard<'_, T> {
fn drop(&mut self) {
let res = self.inner.lock.fetch_sub(UPGRADEABLE_READER, Release);
if res == UPGRADEABLE_READER {
// 没有其他读者,唤醒所有等待者
self.inner.queue.wake_all();
}
}
}
唤醒策略:
如果没有其他读者(
res == UPGRADEABLE_READER),唤醒所有等待者如果还有其他读者,不唤醒(等待最后一个读者唤醒)
5. 高级特性
5.1 写锁降级
将写锁原子地降级为可升级读锁,允许其他读者并发访问:
impl<'a, T> RwSemWriteGuard<'a, T> {
pub fn downgrade(mut self) -> RwSemUpgradeableGuard<'a, T> {
loop {
self = match self.try_downgrade() {
Ok(guard) => return guard,
Err(e) => e,
};
}
}
fn try_downgrade(self) -> Result<RwSemUpgradeableGuard<'a, T>, Self> {
let inner = self.inner;
let res = self.inner.lock.compare_exchange(
WRITER,
UPGRADEABLE_READER,
AcqRel,
Relaxed,
);
if res.is_ok() {
core::mem::forget(self);
Ok(RwSemUpgradeableGuard {
inner,
_nosend: PhantomData,
})
} else {
Err(self)
}
}
}
使用场景:
写入数据后,需要长时间持有锁进行只读操作
通过降级允许其他读者并发访问,提高并发性
注意事项:
降级过程中使用 CAS 操作保证原子性
使用循环重试处理 CAS 失败(通常是由于 ABA 问题)
降级后不会唤醒等待者(由可升级读锁释放时处理)
5.2 可升级读锁升级
将可升级读锁原子地升级为写锁:
impl<'a, T> RwSemUpgradeableGuard<'a, T> {
pub fn upgrade(mut self) -> RwSemWriteGuard<'a, T> {
// 先设置升级标志,阻塞新的读者
self.inner.lock.fetch_or(BEING_UPGRADED, Acquire);
loop {
self = match self.try_upgrade() {
Ok(guard) => return guard,
Err(e) => e,
};
}
}
pub fn try_upgrade(self) -> Result<RwSemWriteGuard<'a, T>, Self> {
let res = self.inner.lock.compare_exchange(
UPGRADEABLE_READER | BEING_UPGRADED,
WRITER | UPGRADEABLE_READER,
AcqRel,
Relaxed,
);
if res.is_ok() {
let inner = self.inner;
core::mem::forget(self);
Ok(RwSemWriteGuard {
inner,
_nosend: PhantomData,
})
} else {
Err(self)
}
}
}
升级机制:
先设置
BEING_UPGRADED标志,阻塞新的读者自旋等待现有读者全部释放(读者计数降为 0)
使用 CAS 将状态从”可升级读者+升级中”转换为”写者”
关键设计:
升级过程中不会睡眠,而是自旋等待
BEING_UPGRADED标志确保不会有新的读者进入保持
UPGRADEABLE_READER位直到升级完成,防止其他线程获取可升级读锁
5.3 可中断的锁获取
支持被信号中断的锁获取操作:
// 可中断的读锁获取
pub fn read_interruptible(&self) -> Result<RwSemReadGuard<'_, T>, SystemError> {
self.queue.wait_until_interruptible(|| self.try_read())
}
// 可中断的写锁获取
pub fn write_interruptible(&self) -> Result<RwSemWriteGuard<'_, T>, SystemError> {
self.queue.wait_until_interruptible(|| self.try_write())
}
使用场景:
用户态进程获取锁时,需要响应信号(如 Ctrl+C)
避免进程无限期阻塞
错误处理:
返回
Err(SystemError::ERESTARTSYS)表示被信号中断调用者需要适当处理错误(通常是返回到用户态)
6. API 参考
6.1 创建
// 编译期常量初始化
pub const fn new(value: T) -> Self
// 运行时初始化
let rwsem = RwSem::new(data);
6.2 读锁操作
// 阻塞获取(不可中断)
pub fn read(&self) -> RwSemReadGuard<'_, T>
// 阻塞获取(可被信号中断)
pub fn read_interruptible(&self) -> Result<RwSemReadGuard<'_, T>, SystemError>
// 非阻塞尝试获取
pub fn try_read(&self) -> Option<RwSemReadGuard<'_, T>>
6.3 写锁操作
// 阻塞获取(不可中断)
pub fn write(&self) -> RwSemWriteGuard<'_, T>
// 阻塞获取(可被信号中断)
pub fn write_interruptible(&self) -> Result<RwSemWriteGuard<'_, T>, SystemError>
// 非阻塞尝试获取
pub fn try_write(&self) -> Option<RwSemWriteGuard<'_, T>>
6.4 可升级读锁操作
// 阻塞获取(不可中断)
pub fn upread(&self) -> RwSemUpgradeableGuard<'_, T>
// 非阻塞尝试获取
pub fn try_upread(&self) -> Option<RwSemUpgradeableGuard<'_, T>>
6.5 锁转换操作
impl<'a, T> RwSemWriteGuard<'a, T> {
// 写锁降级为可升级读锁
pub fn downgrade(self) -> RwSemUpgradeableGuard<'a, T>
}
impl<'a, T> RwSemUpgradeableGuard<'a, T> {
// 可升级读锁升级为写锁
pub fn upgrade(self) -> RwSemWriteGuard<'a, T>
// 非阻塞尝试升级
pub fn try_upgrade(self) -> Result<RwSemWriteGuard<'a, T>, Self>
}
6.6 直接访问
// 获取可变引用(需要独占的 &mut self)
pub fn get_mut(&mut self) -> &mut T
7. 使用示例
7.1 基本读写
use crate::libs::rwsem::RwSem;
static DATA: RwSem<Vec<u32>> = RwSem::new(Vec::new());
// 多个读者可并发访问
fn reader() {
let guard = DATA.read();
println!("Data: {:?}", *guard);
// guard 离开作用域时自动释放读锁
}
// 写者独占访问
fn writer() {
let mut guard = DATA.write();
guard.push(42);
// guard 离开作用域时自动释放写锁
}
7.2 可升级读锁
fn reader_that_may_write() {
// 先以可升级读者身份获取锁
let guard = DATA.upread();
// 读取数据判断是否需要修改
if guard.is_empty() {
// 需要修改,升级到写锁
let mut write_guard = guard.upgrade();
write_guard.push(1);
}
// 不需要修改,直接释放
}
7.3 写锁降级
fn writer_with_downgrade() {
// 先获取写锁进行修改
let mut guard = DATA.write();
guard.clear();
guard.push(1);
// 修改完成,降级为可升级读锁
let read_guard = guard.downgrade();
// 现在允许其他读者并发访问
println!("After write: {:?}", *read_guard);
}
7.4 可中断的获取
fn interruptible_reader() -> Result<(), SystemError> {
// 可被信号中断
let guard = DATA.read_interruptible()?;
println!("Data: {:?}", *guard);
Ok(())
}
7.5 非阻塞尝试
fn try_reader() -> Option<()> {
// 立即返回,不会睡眠
let guard = DATA.try_read()?;
println!("Data: {:?}", *guard);
Some(())
}
8. 内存序与正确性
8.1 内存序保证
RwSem 使用以下内存序保证正确性:
Acquire:获取锁时使用,确保锁保护的数据可见
Release:释放锁时使用,确保临界区内的修改对后续获取者可见
AcqRel:同时需要 Acquire 和 Release 语义的操作(如降级、升级)
Relaxed:CAS 失败路径,不需要同步
8.2 happens-before 关系
写者释放 (Release) ────┐
│ happens-before
读者获取 (Acquire) ←──┘
保证:
写者在释放前的所有修改,对后续读者可见
多个读者之间没有 happens-before 关系(并发读)
9. 与其他实现的对比
特性 |
Linux rw_semaphore |
Rust parking_lot::RwLock |
DragonOS RwSem |
|---|---|---|---|
状态存储 |
atomic_long_t |
AtomicUsize |
AtomicUsize |
等待队列 |
单队列 + 类型标记 |
单队列 + parking |
单队列 + WaitQueue |
可升级锁 |
不支持 |
支持 |
支持 |
锁降级 |
支持(down_write_to_read) |
支持 |
支持 |
公平策略 |
写者优先 + HANDOFF |
FIFO + 反饥饿 |
FIFO 公平竞争 |
可中断等待 |
支持 |
不支持 |
支持 |
中断上下文 |
不支持 |
不支持 |
不支持 |
10. 性能特性
10.1 快速路径优化
无竞争读取:单次原子操作(
fetch_add)无竞争写入:单次 CAS 操作
读者释放:单次原子操作,只有最后一个读者才唤醒
10.2 可扩展性
并发读:读者之间完全并发,无额外同步开销
写者唤醒:
wake_all()允许多个读者并发唤醒队列开销:只在有等待者时才操作队列
10.3 性能建议
优先使用
try_*方法避免睡眠(在能够快速重试的场景下)使用可升级读锁避免读锁升级导致的死锁
读密集场景下性能优于写密集场景
11. 注意事项
11.1 使用限制
不可在中断上下文使用 - RwSem 可能会睡眠
避免嵌套锁 - 同一线程递归获取同一 RwSem 会导致死锁
避免读锁升级 - 不支持将普通读锁升级为写锁(会导致死锁)
Guard 不可跨线程 - Guard 类型标记为
!Send,不能跨线程传递
11.2 死锁场景
// ❌ 错误:嵌套获取同一锁
let guard1 = rwsem.read();
let guard2 = rwsem.write(); // 死锁!
// ❌ 错误:尝试升级普通读锁
let guard = rwsem.read();
drop(guard);
let guard = rwsem.write(); // 不是原子升级,可能有竞态
// ✅ 正确:使用可升级读锁
let guard = rwsem.upread();
let guard = guard.upgrade(); // 原子升级
11.3 最佳实践
优先使用可升级读锁而非普通读锁(当可能需要写入时)
使用降级而非释放+重新获取(保持原子性)
在用户态进程中使用
*_interruptible变体尽量减少临界区大小,避免长时间持有锁
12. 实现原理总结
┌─────────────────────────────────────┐
│ RwSem<T> │
│ │
│ ┌───────────────────────────────┐ │
│ │ lock: AtomicUsize │ │
│ │ [W|U|B|O|READER_COUNT] │ │
│ │ W: Writer │ │
│ │ U: Upgradeable Reader │ │
│ │ B: Being Upgraded │ │
│ │ O: Overflow Detect │ │
│ └───────────────────────────────┘ │
│ ┌───────────────────────────────┐ │
│ │ queue: WaitQueue │ │
│ │ (all waiters in FIFO order) │ │
│ └───────────────────────────────┘ │
│ ┌───────────────────────────────┐ │
│ │ val: UnsafeCell<T> │ │
│ └───────────────────────────────┘ │
└─────────────────────────────────────┘
锁获取流程(以读锁为例):
read() → wait_until(try_read)
│
↓
┌──────────────┐
│ 注册 waker │ ← 先入队(防止唤醒丢失)
└──────┬───────┘
│
↓
┌──────────────┐
│ 调用 cond() │ ← 再检查条件
│ = try_read() │
└──────┬───────┘
│
成功 ←┴─→ 失败
│ │
↓ ↓
返回 Guard 睡眠等待
│
被唤醒后循环
锁释放与唤醒:
读锁: 最后一个读者 → wake_one()
写锁: 总是 → wake_all()
可升级读锁: 无其他读者时 → wake_all()
这个设计通过巧妙利用位域编码、wait_until 原子等待机制和差异化的唤醒策略,实现了一个高效、正确且功能丰富的读写信号量。