在前几篇文章中,我们拆解了 brpc 的内存管理基础设施:IOBuf 的零拷贝缓冲区、ObjectPool 的无锁对象池、ResourcePool 的 ID 索引对象池。
现在我们进入 bthread 的核心领域。
bthread 是 brpc 的 M:N 协程库——成千上万的用户态协程复用到少量 pthread 上运行。既然多个协程共享线程,就需要同步机制:一个协程等锁、等条件变量、等信号量时,需要一种方式挂起自己,等条件满足时再被唤醒。
Linux 内核提供了 futex(fast user-space mutex)实现这一语义。但 futex 是内核的系统调用,只适用于 pthread。bthread 需要一个自己的 futex——这就是 butex(bthread-futex)。
butex 是 bthread 互斥锁、条件变量、信号量的统一基石。它围绕一个 32 位原子整数,提供 wait/wake 操作,同时支持 bthread 和 pthread 的混合同步。
本文从源码出发,拆解 butex 的设计。
一、butex 是什么?
一句话概括:butex 是一个 32 位原子整数,配合 wait/wake 操作实现同步。
核心语义与 Linux futex 类似:
- wait:如果当前值等于期望值,则挂起当前线程/bthread,直到被 wake 或超时
- wake:唤醒一个或多个在 butex 上等待的线程/bthread
但与 Linux futex 有几个关键区别:
这种混合能力是 butex 的核心价值——同一个 butex 上,bthread 和 pthread 可以互相等待和唤醒。
二、核心数据结构
2.1 Butex:缓存行对齐的同步原语
struct BAIDU_CACHELINE_ALIGNMENT Butex { butil::atomic<int> value; // 32 位原子值 ButexWaiterList waiters; // 等待者链表(FIFO) FastPthreadMutex waiter_lock; // 保护 waiters 的互斥锁};
三个要点:
1. value 必须在偏移 0 处。butex_create()返回&value而非Butex*。外部代码拿到的是一个int*,而butex_wake等函数通过container_of从value的地址反推出整个Butex的地址。这要求value 必须是第一个成员。
2. 缓存行对齐。 整个 Butex 恰好占 64 字节(一个缓存行),避免与其他 butex 发生伪共享。
3. 等待者是侵入式链表。waiters 是一个 LinkedList<ButexWaiter>,按 FIFO 顺序排列。waiter_lock 是一把 FastPthreadMutex(内部用 futex 实现),保护链表的并发访问。
2.2 ButexWaiter:等待者的统一抽象
struct ButexWaiter : public butil::LinkNode<ButexWaiter> { bthread_t tid; // pthread 的 tid 为 0,bthread 不为 0 butil::atomic<Butex*> container; // 所属的 Butex(用于安全移除)};
ButexWaiter 是一个侵入式双向链表节点,所有等待者都挂在 Butex::waiters 上。通过 tid 区分等待者类型:
- tid == 0 → pthread 等待者(ButexPthreadWaiter)
- tid != 0 → bthread 等待者(ButexBthreadWaiter)
container 字段指向等待者当前所在的 Butex。移除等待者时需要加锁二次验证,防止在并发操作(如 requeue)中被修改。
2.3 两种等待者
bthread 等待者——分配在栈上,包含调度所需的所有信息:
struct ButexBthreadWaiter : public ButexWaiter { TaskMeta* task_meta; // 关联的任务元数据 TimerThread::TaskId sleep_id; // 超时定时器 ID WaiterState waiter_state; // 等待状态 int expected_value; // 期望的 butex 值 Butex* initial_butex; // 初始等待的 butex TaskControl* control; // 所属的 TaskControl const timespec* abstime; // 超时时间(NULL = 无限等待) bthread_tag_t tag; // 标签,用于选择 TaskGroup};
pthread 等待者——更轻量,通过内核 futex 阻塞:struct ButexPthreadWaiter : public ButexWaiter { butil::atomic<int> sig; // PTHREAD_NOT_SIGNALLED → PTHREAD_SIGNALLED};
pthread 没有用户态调度器,只能通过 futex 系统调用阻塞。sig 字段就是 futex 操作的地址——唤醒时设为 PTHREAD_SIGNALLED 并调用 futex_wake。2.4 等待者状态
enum WaiterState { WAITER_STATE_NONE, // 未使用 WAITER_STATE_READY, // 已就绪,即将入队 WAITER_STATE_TIMEDOUT, // 等待超时 WAITER_STATE_UNMATCHEDVALUE, // 值不匹配 WAITER_STATE_INTERRUPTED, // 被中断};
状态转换由 TimerThread(超时)、wake 操作(唤醒)和 interrupt(中断)触发。入队前的 READY 检查是一个关键的乐观锁机制——如果 TimerThread 在入队前就触发了超时,则放弃入队,避免无效等待。
三、创建与销毁
butex 的生命周期由 ObjectPool 管理(关于 ObjectPool 的详细设计,参见本系列第二篇文章)。
创建:butex_create() 从 ObjectPool 分配一个 Butex,返回 &value(即 32 位整数的指针)。
void* butex_create() { Butex* b = butil::get_object<Butex>(); return b ? &b->value : NULL;}
销毁:butex_destroy() 通过 container_of 从 value 地址反推 Butex 地址,归还到 ObjectPool。voidbutex_destroy(void* butex){ Butex* b = container_of( static_cast<butil::atomic<int>*>(butex), Butex, value); butil::return_object(b);}

值得注意的是,ObjectPool 永不释放内存——归还的 Butex 仍留在池中等待复用。这是有意为之:butex_wake 和 butex_destroy 之间存在竞态——Event 对象在 signal 线程解锁后,wait 线程可能获取锁并销毁 Event,导致 signal 线程后续的 butex_wake 访问已释放的内存。ObjectPool 的"永不释放"特性完美解决了这个问题——归还后内存仍然有效,butex_wake 最多产生一次无害的虚假唤醒。
Butex 在 ObjectPool 中的 Block 容量也被特化为 128(默认为 4096):template <> struct ObjectPoolBlockMaxItem<bthread::Butex> { static const size_t value = 128; // 每个 Block 最多 128 个 Butex};
这是因为每个 Butex 占 64 字节(一个缓存行),128 × 64 = 8KB,恰好是一个 Block 的默认大小。
四、唤醒操作
唤醒是 butex 的"发信号"侧。核心挑战是:等待者可能是 bthread,也可能是 pthread,需要用不同的方式唤醒。
4.1 butex_wake:唤醒一个等待者
intbutex_wake(void* arg, bool nosignal){ Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); ButexWaiter* front = NULL; { BAIDU_SCOPED_LOCK(b->waiter_lock); if (b->waiters.empty()) return 0; // 没人等,直接返回 front = b->waiters.head()->value(); front->RemoveFromList(); front->container.store(NULL, butil::memory_order_relaxed); } if (front->tid == 0) { // pthread → futex 唤醒 wakeup_pthread(static_cast<ButexPthreadWaiter*>(front)); } else { // bthread → 取消定时器 + 调度执行 ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front); unsleep_if_necessary(bbw, get_global_timer_thread()); TaskGroup* g = get_task_group(bbw->control, bbw->tag); if (g == tls_task_group) { run_in_local_task_group(g, bbw->task_meta, nosignal); } else { g->ready_to_run_remote(bbw->task_meta, nosignal); } } return 1;}
取出队头等待者 → 判断类型 ├─ pthread → wakeup_pthread(设 sig + futex 系统调用) └─ bthread → 取消定时器 → 加入 TaskGroup 就绪队列
对于 bthread 等待者,还需要区分本地和远程:如果等待者属于当前线程的 TaskGroup,直接 exchange 切换执行(最快);否则放入远程 TaskGroup 的就绪队列。4.2 butex_wake_n:唤醒 n 个等待者
当需要唤醒多个等待者时(如 broadcast),需要批量处理。butex_wake_n 的策略是先分流,再分别处理:加锁 → 从 waiters 取出 n 个等待者 ├─ 按 tid 分流到 bthread_waiters / pthread_waiters └─ 解锁pthread_waiters → 逐个 wakeup_pthreadbthread_waiters: ├─ 头节点单独摘出(最后 exchange,最高效的唤醒方式) ├─ 其余从尾部弹出 → ready_to_run_general(批量调度) ├─ flush 各 tag 的 TaskGroup └─ 头节点 exchange 或 ready_to_run_remote
为什么头节点要留到最后?因为 exchange 会直接切换执行——如果先处理头节点,后续的批量操作就被延迟了。把 exchange 放在最后,确保所有其他等待者先调度好,再切换执行。4.3 butex_wake_except:排除式唤醒
用于 Mutex 场景:锁持有者释放锁时唤醒其他等待者,但不唤醒自己。
intbutex_wake_except(void* arg, bthread_t excluded_bthread){ // 遍历 waiters,tid == excluded_bthread 的放回队列 // 其余正常唤醒}
4.4 butex_requeue:重排队(避免惊群效应)
这是 butex 中最精巧的操作,类似 Linux futex 的 FUTEX_REQUEUE。
场景:ConditionVariable::broadcast 时,所有等待在条件变量 butex 上的线程被唤醒,然后它们立刻去竞争 Mutex butex——这就是"惊群效应"(thundering herd)。
解决方案:不唤醒所有人,而是唤醒第一个等待者,将其余等待者直接转移到 Mutex butex 的等待队列上。这样被转移的等待者不需要被唤醒再重新阻塞,省去了一次 futex 系统调用。
int butex_requeue(void* butex1, void* butex2) { // 唤醒 butex1 的第一个等待者 // 将 butex1 的其余等待者转移到 butex2 的等待队列}
关键细节:双重锁。操作涉及两个 butex 的等待队列,必须同时加锁。为了避免死锁,使用 double_lock 按地址顺序加锁(先加地址小的锁)。
五、等待操作
等待是 butex 的"阻塞"侧。核心挑战是:bthread 不能真正阻塞线程——它需要让出 CPU,让其他 bthread 继续运行。
5.1 butex_wait:上层统一接口

intbutex_wait(void* arg, int expected_value, const timespec* abstime, bool prepend){ Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value); // 快速检查:值不匹配,立即返回 if (b->value.load(butil::memory_order_relaxed) != expected_value) { errno = EWOULDBLOCK; return -1; } TaskGroup* g = tls_task_group; if (NULL == g || g->is_current_pthread_task()) { // pthread 路径 return butex_wait_from_pthread(g, b, expected_value, abstime, prepend); } // bthread 路径 // ...(见下一节)}
入口处做两件事:值检查(值不匹配就不等了)和身份判断(当前是 bthread 还是 pthread)。
5.2 bthread 的等待路径
bthread 不能阻塞线程,它需要"挂起自己,让出 CPU"。这分两步完成:
第一步:设置 remained 回调 + 让出执行
ButexBthreadWaiter bbw; // 栈上分配等待者bbw.tid = g->current_tid();bbw.waiter_state = WAITER_STATE_READY;bbw.expected_value = expected_value;bbw.initial_butex = b;// ...bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);WaitForButexArgs args{ &bbw, prepend };g->set_remained(wait_for_butex, &args); // 设置回调TaskGroup::sched(&g); // 让出 CPU
set_remained 注册 wait_for_butex 作为 remained 函数——当这个 bthread 被调度器切换出去后,remained 函数会在调度上下文中执行。
第二步:remained 函数中入队
voidwait_for_butex(void* arg) { auto args = static_cast<WaitForButexArgs*>(arg); ButexBthreadWaiter* bw = args->bw; Butex* b = bw->initial_butex; BAIDU_SCOPED_LOCK(b->waiter_lock); if (b->value != bw->expected_value) { // 值已变,不等了 bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE; } else if (bw->waiter_state == WAITER_STATE_READY && !bw->task_meta->interrupted) { // 值匹配、未超时、未中断 → 入队 b->waiters.Append(bw); // 或 Prepend(高优先级) bw->container.store(b); if (bw->abstime != NULL) { // 注册超时定时器 bw->sleep_id = get_global_timer_thread()->schedule( erase_from_butex_and_wakeup, bw, *bw->abstime); } return; } // 入队失败,直接标记为就绪 tls_task_group->ready_to_run(bw->task_meta);}
为什么要分两步?因为入队需要加锁。如果在 bthread 的执行上下文中直接加锁入队,会导致持有锁期间无法被调度器切换。分两步后:第一步是无锁的准备工作,切换到调度上下文后再加锁入队。
入队前的 READY 检查是一个乐观锁机制:在 set_remained 和实际入队之间,TimerThread 可能已经触发了超时。如果 waiter_state 不再是 READY,说明已被超时处理,放弃入队。
5.3 pthread 的等待路径
pthread 没有用户态调度器,只能通过内核 futex 阻塞:// wait_pthread 的核心循环while (true) { if (timeout_us > MIN_SLEEP_US || abstime == NULL) { rc = futex_wait_private(&pw.sig, PTHREAD_NOT_SIGNALLED, ptimeout); if (pw.sig != PTHREAD_NOT_SIGNALLED) { return rc; // 被唤醒 } } else { errno = ETIMEDOUT; rc = -1; } if (rc != 0 && errno == ETIMEDOUT) { // 超时 → 从 butex 队列中移除自己 erase_from_butex(&pw, false, WAITER_STATE_TIMEDOUT); return rc; }}
关键点:pthread 在 futex 上阻塞,唤醒后还需从 butex 的等待队列中移除自己。而 bthread 的唤醒由 butex_wake 负责移除——两种等待者的"谁负责清理"责任分配不同。5.4 两种路径的对比
六、erase_from_butex:安全的等待者移除
这是 butex 中最易出错的操作——从等待队列中移除一个等待者。难在哪里?并发。
butex_wake 可能正在唤醒butex_requeue 可能正在转移
erase_from_butex 用一个自旋+二次验证的模式解决这个问题:
inlineboolerase_from_butex(ButexWaiter* bw, bool wakeup, WaiterState state){ Butex* b; while ((b = bw->container.load(butil::memory_order_acquire))) { BAIDU_SCOPED_LOCK(b->waiter_lock); // 加锁 if (b == bw->container.load(butil::memory_order_relaxed)) { // 二次验证:container 没变,安全移除 bw->RemoveFromList(); bw->container.store(NULL); if (bw->tid) { static_cast<ButexBthreadWaiter*>(bw)->waiter_state = state; } break; } // container 变了(被 requeue 转移了),重新循环 } // 可选:唤醒等待者}
循环的必要性:requeue 操作可能把等待者从 butex1 转移到 butex2,导致 container 改变。加锁后再次检查 container 是否一致,如果不一致说明正在被转移,需要重新定位。
七、可见性保证
butex 的 wait 和 wake 之间有严格的可见性保证,这是正确性的基石。
场景 1:wait 在 wake 之前
[thread1] [thread2]wait() value = new_value wake()wait() 发现值不匹配,不阻塞
[thread1] [thread2] value = new_value wake()wait()wake() 的内存屏障确保 value 的修改对 wait() 可见
在实现上,wait_pthread 中的 futex_wait 系统调用本身提供了内核级内存屏障;bthread 路径中,set_remained 的 release 语义与 wait_for_butex 入队时的锁获取(acquire 语义)共同保证了可见性。
八、总结
butex 的设计可以归纳为一句话:在 32 位原子整数上,实现同时支持 bthread 和 pthread 的 futex 语义。
四个关键设计:
1. 统一的等待者抽象。ButexWaiter 作为基类,通过 tid 区分 bthread 和 pthread。同一个等待队列可以混合存放两种等待者,唤醒时按类型分发——这是 butex 支持混合同步的基础。
2. 两步等待(bthread)。 bthread 不能阻塞线程,因此等待分两步:先设置 remained 回调并让出 CPU,再在回调中加锁入队。入队前的 READY 状态检查实现了乐观锁,防止在两步之间已被超时移除的等待者被重复入队。
3. ObjectPool 的"永不释放"特性解决竞态。butex_destroy 和 butex_wake 之间存在天然的竞态——归还到 ObjectPool 后内存仍然有效,butex_wake 最多产生一次无害的虚假唤醒,无需额外的生命周期管理。
4. requeue 避免惊群效应。 唤醒第一个等待者,将其余等待者直接转移到另一个 butex 的等待队列,省去了"唤醒 → 重新阻塞"的往返开销。双重锁按地址顺序加锁避免死锁。
butex 是 bthread 的同步基础设施——后续文章将拆解基于 butex 构建的互斥锁、条件变量、信号量等上层同步原语。
本文基于 Apache brpc 源码(src/bthread/butex.h、src/bthread/butex.cpp、src/bthread/sys_futex.cpp)撰写。