摘要:前文拆解了TaskGroup的调度机制。本文分析其底层同步原语ParkingLot——基于futex实现worker线程的高效挂起与唤醒。_pending_signal低位编码stop标志与信号计数,State快照实现check-then-wait一致性,_no_signal_when_no_waiter在繁忙时跳过futex_wake减少系统调用。
在上一篇文章中,我们拆解了 TaskGroup——bthread 调度的核心单元。我们看到 worker pthread 在没有任务时通过 wait_task 进入等待,被唤醒后从队列中窃取任务执行。
这个"等待/唤醒"机制的底层实现就是 ParkingLot。
bthread 的 M:N 模型中,worker pthread 是稀缺资源。当没有就绪的 bthread 时,worker 需要被挂起以节省 CPU;当新任务到来时,需要唤醒足够数量的 worker 来处理。ParkingLot 就是管理这个"停泊/唤醒"机制的核心同步原语——名字取自"停车场"的比喻:worker 是车,ParkingLot 是停车场,空转的 worker 在这里"停放",有任务时再"驶出"。
一、ParkingLot 概览
TaskControl ┌────────────────────────────────┐ │ ParkingLot[0] ←── worker 0 │ │ ParkingLot[1] ←── worker 1 │ │ ... │ │ ParkingLot[N-1] ←── worker N-1│ └────────────────────────────────┘ worker 0 的视角: ┌──────────┐ get_state() ┌──────────────┐ │ worker 0 │ ──────────────────→│ ParkingLot[0] │ │ │ wait(state) │ │ │ │ ──────────────────→│ futex_wait │ │ │ │ │ │ │ ←──────────────────│ futex_wake │ │ (唤醒) │ signal(n) │ │ └──────────┘ └──────────────┘ ↑ ↑ │ 调度器提交任务 │ └──── signal(num_task) ──────────┘
每个 worker pthread 对应一个 ParkingLot 实例。worker 通过 get_state() + wait() 在 ParkingLot 上等待,调度器通过 signal() 唤醒指定数量的 worker。这种一对一的关系简化了唤醒逻辑——每个 worker 有自己专属的"停车位"。
ParkingLot 基于 Linux futex(fast userspace mutex)实现。futex 的核心思想:在用户态快速检查条件,仅在需要等待时才陷入内核。这避免了传统信号量/条件变量在无竞争场景下的系统调用开销。
二、数据结构
classBAIDU_CACHEIN_ALIGNMENTParkingLot{public: classState{ public: State(): val(0) {} bool stopped() const { return val & 1; } // LSB = stop 标志 private: friend classParkingLot; State(int val) : val(val) {} int val; // _pending_signal 的快照 };private: butil::atomic<int> _pending_signal; // 信号 + stop 标志 butil::atomic<int> _waiter_num; // 等待者计数 bool _no_signal_when_no_waiter; // 无等待者时跳过 signal};
ParkingLot 只有三个成员变量,结构精简:
2.1 _pending_signal:编码信号与停止标志
_pending_signal 是一个 32 位原子整数,编码了两种信息:
31 1 0 ┌───────────────────────────────┬───┐ │ 信号计数 (signal) │ S │ └───────────────────────────────┴───┘ 高 31 位:每次 signal 增加的计数 LSB:stop 标志 每次signal(num_task) += num_task << 1 0=正常 1=已停止
- bit 0(LSB):stop 标志。
stop() 将其置 1,表示 ParkingLot 已停止服务 - bit 1~31:信号计数。每次
signal(num_task) 执行 fetch_add(num_task << 1),将 num_task 左移一位后加到高 31 位
为什么用 num_task << 1 而不是直接加 num_task?因为 LSB 被 stop 标志占用,左移一位确保不干扰 stop 位。这种编码让一个原子变量同时承载两种信息,且操作互不干扰——signal 只修改高位,stop 只修改低位。
2.2 _waiter_num:等待者计数
当前在此 ParkingLot 上等待(即调用了 futex_wait)的 worker 数量。仅在 _no_signal_when_no_waiter 启用时才有实际意义——用于在所有 worker 都繁忙时跳过不必要的 futex_wake。
2.3 _no_signal_when_no_waiter:无等待者优化
从 gflags FLAGS_parking_lot_no_signal_when_no_waiter 读取,默认值取决于编译配置。启用后,当没有 worker 在等待时,signal() 跳过 futex_wake 系统调用。
这个优化针对的是所有 worker 都在处理任务的繁忙场景:此时 futex_wake 唤醒不了任何人(没有人在等),白白执行了一次系统调用。
2.4 State:状态快照
State 是 _pending_signal 的只读快照。它只有一个 val 字段和一个 stopped() 方法。State 的核心价值在于实现 check-then-wait 的一致性——下一节详述。
注意 BAIDU_CACHEIN_ALIGNMENT:ParkingLot 被对齐到缓存行(64 字节),避免不同 ParkingLot 实例之间的 false sharing。每个 worker 频繁读写自己的 ParkingLot,缓存行隔离确保不会互相干扰。
三、signal:唤醒 worker
int signal(int num_task) { // 1. 增加信号计数(高 31 位) _pending_signal.fetch_add((num_task << 1), butil::memory_order_release); // 2. 检查是否有等待者 if(_no_signal_when_no_waiter && _waiter_num.load(butil::memory_order_relaxed) == 0) { return 0; // 无等待者,跳过 futex_wake } // 3. 唤醒最多 num_task 个等待者 return futex_wake_private(&_pending_signal, num_task);}
三步操作:
步骤 1:增加信号计数。fetch_add(num_task << 1) 原子地修改 _pending_signal 的高 31 位。即使步骤 3 的 futex_wake 被跳过,信号计数的增加也已经发生——后续的 wait() 会通过 State 快照检测到变化。
步骤 2:检查等待者。 如果启用了 _no_signal_when_no_waiter 且当前没有等待者,直接返回 0。使用 memory_order_relaxed 读取 _waiter_num——这里的判断是乐观的:即使 _waiter_num 即将变为非零(某个 worker 正准备等待),也不会造成正确性问题,只是本次 signal 没有唤醒,该 worker 等待后会正常醒来(因为信号计数已增加)。
步骤 3:唤醒等待者。futex_wake_private 是 Linux futex 系统调用的封装,唤醒在 _pending_signal 上等待的最多 num_task 个线程。返回实际唤醒的线程数。
关键设计:先修改信号计数,再决定是否唤醒。即使跳过了 futex_wake,信号计数的变化也被记录——后续 worker 的 wait() 检测到不匹配时不会阻塞。
四、get_state 与 wait:等待任务
State get_state() { return _pending_signal.load(butil::memory_order_acquire);}
返回当前 _pending_signal 的快照。worker 在 wait() 之前先调用 get_state() 保存当前状态,用于后续的一致性检查。
4.2 wait:挂起 worker
void wait(const State& expected_state) { // 1. 快速路径:检查状态是否已变化 if(get_state().val != expected_state.val) { return; // 已有新信号,不需要等待 } // 2. 增加等待者计数 if(_no_signal_when_no_waiter) { _waiter_num.fetch_add(1, butil::memory_order_relaxed); } // 3. 阻塞等待 futex_wait_private(&_pending_signal, expected_state.val, NULL); // 4. 减少等待者计数 if(_no_signal_when_no_waiter) { _waiter_num.fetch_sub(1, butil::memory_order_relaxed); }}
四步操作:
步骤 1:快速路径检查。 重新读取 _pending_signal 并与 expected_state 比较。如果不相等,说明在 get_state() 和 wait() 之间有 signal() 发生过——新任务已经到来,不需要等待,直接返回。
这是 futex 的经典用法:用户态快速检查 + 内核态等待。在没有竞争的常见场景下,信号在 worker 开始等待之前就到了,此时快速路径命中,完全不需要系统调用。
步骤 2:增加等待者计数。 如果启用了 _no_signal_when_no_waiter,在进入等待前递增 _waiter_num,让 signal() 知道有人在等待。
步骤 3:futex 等待。futex_wait_private(&_pending_signal, expected_state.val, NULL) 调用 futex 系统调用。内核会比较 _pending_signal 的当前值与 expected_state.val:
- 如果相等:阻塞当前线程,直到
futex_wake 唤醒 - 如果不等:立即返回(EAGAIN),说明在步骤 1 和步骤 3 之间有 signal 发生
步骤 1 的用户态检查 + 步骤 3 的内核态检查构成双重检查:即使在步骤 1 通过后、步骤 3 执行前有 signal 发生,futex 的内核态检查也能捕获这个变化。
步骤 4:减少等待者计数。 等待结束(被唤醒或假唤醒)后递减 _waiter_num。
4.3 State 快照的一致性保证
worker 视角: signal 视角: get_state() ─→ state=S0 fetch_add(2) → _pending_signal=S0+2 futex_wake(1) wait(S0): 检查 S0 != S0+2 → 不等 → 直接返回!
如果没有 State 快照机制,worker 可能错过信号: 无快照的情况(假设直接 wait): wait(): futex_wait(&_pending_signal, 当前值) ← 如果 signal 在读取当前值和 futex_wait 之间发生 → 丢失唤醒!
State 快照将"读取期望值"和"传入期望值"分离到两个调用中,中间的窗口由 futex 内核的原子比较来弥合。这是 wait/wake 模式的标准实践。
五、stop:停止服务
void stop() { // 1. 设置 stop 标志(LSB 置 1) _pending_signal.fetch_or(1); // 2. 唤醒所有等待者 futex_wake_private(&_pending_signal, 10000);}
stop() 在程序退出时调用(TaskControl::stop_and_join),通知所有 worker 停止工作。
两步操作:
步骤 1:设置 stop 标志。fetch_or(1) 原子地将 _pending_signal 的最低位置 1。这同时改变了 _pending_signal 的值——正在 futex_wait 的 worker 会因为值不匹配而被唤醒。
步骤 2:唤醒所有等待者。 传入 10000 是一个足够大的值,确保所有在此 ParkingLot 上等待的 worker 都被唤醒。
唤醒后,worker 在 wait_task 中检查 _last_pl_state.stopped() 返回 true,主循环 run_main_task 退出。
六、在 TaskControl 中的集成
ParkingLot 在 TaskControl 中被创建和管理:classTaskControl{ // 通常与 worker 数量相同 ParkingLot* _pl; // ParkingLot 数组};
每个 worker pthread 在 TaskControl::add_worker 中被分配一个 ParkingLot: worker 0 ←→ _pl[0] worker 1 ←→ _pl[1] ... worker N ←→ _pl[N]
6.1 worker 等待流程
// TaskGroup::wait_task (简化)boolTaskGroup::wait_task(bthread_t* tid){ do { if (_last_pl_state.stopped()) return false; // stop 检查 _pl->wait(_last_pl_state); // 在 ParkingLot 上等待 _last_pl_state = _pl->get_state(); // 更新快照 if (steal_task(tid)) return true; // 窃取任务 } while (true);}
worker 等待后更新 _last_pl_state——确保下次 wait() 使用最新的快照,避免重复唤醒。
6.2 调度器唤醒流程
// TaskControl::signal_task (简化)voidTaskControl::signal_task(int num_task, bthread_tag_t tag){ // 遍历 ParkingLot,唤醒 num_task 个 worker for (int i = 0; i < _options.num_workers; ++i) { int wake = _pl[idx].signal(num_task); num_task -= wake; if (num_task <= 0) break; }}
调度器遍历 ParkingLot 数组,依次唤醒 worker,直到满足所需数量。唤醒成功(signal 返回 > 0)时递减剩余需求,所有需求满足后停止遍历。
七、_no_signal_when_no_waiter 优化详解
这个优化值得单独分析,因为它体现了 bthread 对实际工作负载的细致考量。
7.1 问题:繁忙场景下的无效唤醒
场景:8 个 worker 全部在执行 bthread worker 0: [执行任务 A] ← 繁忙 worker 1: [执行任务 B] ← 繁忙 ... worker 7: [执行任务 G] ← 繁忙 此时 signal(3) 到来: - 没有任何一个 worker 在 ParkingLot 上等待 - futex_wake(&_pending_signal, 3) → 返回 0(唤醒了 0 个线程) - 但系统调用开销仍然存在
在高负载场景下,worker 大部分时间都在执行任务,很少在 ParkingLot 上等待。每次 signal_task 都调用 futex_wake,但几乎总是返回 0。系统调用的开销虽然不大(通常 < 1μs),但在高频调度场景下会累积。
7.2 解决方案:条件唤醒
启用 _no_signal_when_no_waiter 后:
signal(3): fetch_add(6) → 信号计数增加(始终执行) _waiter_num == 0 ? → 是 → 跳过 futex_wake,返回 0
信号计数始终增加——即使跳过了 futex_wake,当 worker 完成当前任务回到 wait_task 时,State 快照不匹配会命中快速路径,worker 不会阻塞,直接尝试窃取新任务。
7.3 正确性保证
可能担心一个竞态场景:signal() 检查 _waiter_num 时为 0,但 wait() 随后增加了 _waiter_num。
signal(): wait(): fetch_add(6) _waiter_num == 0 → true _waiter_num.fetch_add(1) → 变为 1 futex_wait(&signal, S0) → S0+6 ≠ S0 → 立即返回 跳过 futex_wake 被 futex_wait 快速路径唤醒
即使 signal() 跳过了 futex_wake,wait() 中的 futex_wait_private 会检测到 _pending_signal 的值(S0+6)与 expected_state.val(S0)不匹配,立即返回 EAGAIN。futex 的内核态比较是最后一道安全网。
_waiter_num 使用 memory_order_relaxed 也是安全的——它只是一个提示,不参与同步。最坏情况是多一次不必要的 futex_wake,不会导致丢失唤醒。
八、与 TaskGroup 的协作时序
完整的 worker 生命周期中,ParkingLot 的调用时序:
run_main_task(): while (wait_task(&tid)) { ←── 外层循环 sched_to(&dummy, tid); ←── 执行 bthread } wait_task(): loop: _last_pl_state.stopped()? ←── 检查 stop return false _pl->wait(_last_pl_state) ←── ParkingLot.wait() _last_pl_state = get_state() ←── 更新快照 steal_task(tid)? ←── 尝试窃取 return true goto loop ←── 窃取失败,重新等待
关键观察:
1. wait 之前必须先 get_state。wait_task 在循环中先检查 stopped(),再调用 wait()。_last_pl_state 在上一次循环中通过 get_state() 更新。
2. wait 返回后立即更新快照。 避免下次 wait 使用过期状态。
3. 窃取失败后重新等待。 假唤醒(spurious wakeup)或被其他 worker 抢先窃取时,worker 会回到 ParkingLot 重新等待。
九、futex 基础
ParkingLot 的实现基于 Linux futex 系统调用。理解 futex 有助于理解 ParkingLot 的设计:// futex_wait:如果 *uaddr == val,则阻塞当前线程intfutex_wait_private(int* uaddr, int val, const timespec* timeout);// futex_wake:唤醒最多 count 个在 *uaddr 上等待的线程intfutex_wake_private(int* uaddr, int count);
futex 的核心优势:
- 用户态快速路径:先在用户态检查条件(
get_state().val != expected_state.val),条件不满足时直接返回,不需要系统调用 - 内核态等待:仅在需要阻塞时才陷入内核,避免了传统条件变量的互斥锁开销
- 精确唤醒:
futex_wake 可以精确控制唤醒的线程数量,而不是广播唤醒所有等待者
ParkingLot 使用的 futex_wait_private / futex_wake_private 是 FUTEX_WAIT_PRIVATE / FUTEX_WAKE_PRIVATE 的封装——它们要求等待和唤醒在同一个进程内,避免了跨进程的开销。
十、总结
ParkingLot 的设计可以归纳为一句话:基于 futex 的轻量级同步原语,一个原子变量编码信号计数与停止标志,State 快照确保 check-then-wait 一致性,_no_signal_when_no_waiter 优化繁忙场景。
四个关键设计:
1. _pending_signal 的位编码。 一个 32 位原子变量同时承载两种信息:高 31 位是信号计数(每次 signal 增加 num_task << 1),LSB 是停止标志。两种操作互不干扰——signal 修改高位,stop 修改低位。编码方式简洁高效。
2. State 快照 + 双重检查。 worker 先 get_state() 获取快照,再 wait(expected_state) 等待。wait 中有两次检查:用户态比较(快速路径)和 futex 内核态比较(慢速路径)。双重检查确保不会丢失唤醒信号。
3. _no_signal_when_no_waiter 条件唤醒。 繁忙场景下所有 worker 都在执行任务,futex_wake 只会白白执行系统调用。通过 _waiter_num 检查是否有等待者,没有则跳过唤醒。信号计数始终增加,确保 worker 回到等待时能检测到变化。
4. 缓存行对齐。BAIDU_CACHEIN_ALIGNMENT 确保每个 ParkingLot 独占一个缓存行。每个 worker 频繁读写自己的 ParkingLot 的 _pending_signal 和 _waiter_num,缓存行隔离消除 false sharing。
ParkingLot 虽然代码不到 100 行,但每个设计细节都有明确的性能或正确性考量。它是 bthread 调度系统的"齿轮"——小而精密,连接着 TaskGroup 的调度循环和操作系统的 futex 机制。
本文基于 Apache brpc 源码(src/bthread/parking_lot.h)撰写。