摘要:上一篇文章拆解了 bthread_mutex_t 的实现——一个 unsigned 如何拆成 locked + contended 两个字节,配合 butex 实现协程友好的互斥锁。本文继续向上,拆解基于 butex 构建的另一个核心同步原语:条件变量 bthread_cond_t。它只用一个 bthread_mutex_t* 和一个 int*(序列号 butex)就实现了完整的 wait/signal/broadcast 语义。其中 broadcast 的 butex_requeue 优化尤其精妙——唤醒一个等待者,将其余等待者直接转移到互斥锁的 butex 上,避免了"惊群效应"。本文从源码出发,拆解条件变量的每一个操作。在上一篇文章中,我们拆解了 bthread_mutex_t——协程友好的互斥锁。我们看到它用一个 unsigned* butex 实现了完整的三态状态机(0/1/3),trylock 只需一次单字节 exchange,lock/unlock 各只需一次原子操作。
现在我们继续向上,看一个更复杂的同步原语:条件变量。
条件变量的核心场景是:"等某个条件成立"。一个线程检查条件不满足时,调用 wait 挂起自己;另一个线程让条件成立后,调用 signal 或 broadcast 唤醒等待者。经典的"生产者-消费者"模式就依赖条件变量。
但条件变量有一个天然的需求:wait 时必须先释放互斥锁(让其他线程能修改条件),被唤醒后必须重新获取互斥锁(检查条件是否真的满足)。这意味着条件变量必须与互斥锁紧密配合。
bthread 的条件变量是如何实现这种配合的?让我们从数据结构开始。
一、bthread_cond_t 与 CondInternal
1.1 结构体定义
typedef struct bthread_cond_t { bthread_mutex_t* m; // 关联的互斥锁 int* seq; // 序列号 butex} bthread_cond_t;
只有两个字段:
m:指向一个 bthread_mutex_t,在首次 wait 时绑定,绑定后不可更换seq:一个 butex 指针(butex_create_checked<int>() 返回的 int*),作为序列号
这比 pthread 的 pthread_cond_t 简洁得多——pthread 的条件变量内部需要维护一个等待队列,而 bthread 的等待队列由 butex 内部管理。
1.2 CondInternal:内部视角
实现代码将 bthread_cond_treinterpret_cast 为 CondInternal:
struct CondInternal { butil::atomic<bthread_mutex_t*> m; // 原子化的互斥锁指针 butil::atomic<int>* seq; // 原子化的序列号};
BAIDU_CASSERT(sizeof(CondInternal) == sizeof(bthread_cond_t), sizeof_innercond_must_equal_cond);BAIDU_CASSERT(offsetof(CondInternal, m) == offsetof(bthread_cond_t, m), offsetof_cond_mutex_must_equal);BAIDU_CASSERT(offsetof(CondInternal, seq) == offsetof(bthread_cond_t, seq), offsetof_cond_seq_must_equal);
CondInternal 把 m 和 seq 都变成原子类型,这是实现无锁互斥锁绑定的基础。
二、初始化与销毁
2.1 bthread_cond_init
intbthread_cond_init(bthread_cond_t* __restrict c, const bthread_condattr_t*) { c->m = NULL; c->seq = bthread::butex_create_checked<int>(); *c->seq = 0; return 0;}
初始化做两件事:
m = NULL- 创建 seq butex,初始值为 0:序列号从 0 开始,每次 signal/broadcast 递增
注意 bthread_condattr_t 是空结构体——当前没有需要配置的属性。
2.2 bthread_cond_destroy
intbthread_cond_destroy(bthread_cond_t* c){ bthread::butex_destroy(c->seq); c->seq = NULL; return 0;}
销毁只需归还 seq butex 到 ObjectPool。m 不需要释放——它指向的互斥锁由调用者管理。
这里有一个与 butex 的竞态问题(参见本系列第四篇):butex_destroy 后,可能还有未完成的 butex_wake。ObjectPool 的"永不释放"特性保证了安全——归还后内存仍然有效,最多产生一次无害的虚假唤醒。
三、signal:单次通知
intbthread_cond_signal(bthread_cond_t* c){ bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c); butil::atomic<int>* const saved_seq = ic->seq; saved_seq->fetch_add(1, butil::memory_order_release); bthread::butex_wake(saved_seq); return 0;}
signal 的实现极其简洁,只有两步:
第一步:fetch_add(1) 递增序列号。 这让所有正在等待"旧序列号"的等待者的 butex_wait 条件不再满足——它们要么已经在等待中被唤醒,要么在下次检查时发现值已变。
第二步:butex_wake 唤醒一个等待者。 唤醒 butex 等待队列的队头。
为什么先 fetch_add 再 butex_wake,而不是反过来?因为 butex_wait 的语义是"如果当前值等于期望值才等待"(参见本系列第四篇)。如果先 butex_wake 再改值,被唤醒者可能在值还没改变时就被唤醒,产生虚假唤醒。虽然条件变量本来就允许虚假唤醒(POSIX 语义),但先改值可以减少不必要的等待。
一个关键细节:saved_seq 在 fetch_add 之前保存。原因:
fetch_add 之后,ic(即 c)可能被销毁——调用者可能在另一个线程中执行 bthread_cond_destroy。保存 saved_seq 后,后续操作只使用 saved_seq,不再通过 ic 间接访问。
四、broadcast:广播通知与 butex_requeue 优化
intbthread_cond_broadcast(bthread_cond_t* c){ bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c); bthread_mutex_t* m = ic->m.load(butil::memory_order_relaxed); butil::atomic<int>* const saved_seq = ic->seq; if (!m) { return 0; } void* const saved_butex = m->butex; // Wakeup one thread and requeue the rest on the mutex. ic->seq->fetch_add(1, butil::memory_order_release); bthread::butex_requeue(saved_seq, saved_butex); return 0;}
broadcast 是条件变量中最精巧的操作。它解决的核心问题是惊群效应:
4.1 什么是惊群效应?
考虑一个经典场景:多个消费者等待生产者。broadcast 时,所有消费者被唤醒,它们从 butex_wait 返回后,立刻执行 bthread_mutex_lock_contended 去竞争互斥锁。但只有一个能拿到锁,其余的又阻塞在互斥锁的 butex 上。
这就是"惊群效应"——N 个等待者被唤醒,但只有 1 个能继续运行,其余 N-1 个白白经历了一次"唤醒 → 重新阻塞"的往返。
4.2 butex_requeue 的解决方案
butex_requeue(saved_seq, saved_butex) 做了一件聪明的事(参见本系列第四篇):
- 唤醒 seq butex 上的第一个等待者——正常唤醒
- 将 seq butex 上的其余等待者直接转移到互斥锁 butex 的等待队列上——不需要唤醒它们
被转移的等待者直接排到互斥锁的等待队列上。当被 signal 唤醒的那个等待者解锁互斥锁时,butex_wake 会按正常流程唤醒下一个——这恰好就是被 requeue 过来的等待者。效果:N-1 个等待者省去了一次"唤醒 → 重新阻塞"的往返,直接在互斥锁上排队。
4.3 早期返回:m == NULL
if (!m) { return 0; // 还没有人 wait 过,互斥锁未绑定}
如果 m 为空,说明还没有人调用过 wait——没有人可以唤醒,直接返回。4.4 安全性:saved_seq 与 saved_butex
与 signal 类似,saved_seq 和 saved_butex 都在 fetch_add 之前保存。fetch_add 之后不再访问 ic,避免条件变量被并发销毁时的 use-after-free。
五、wait:等待条件变量

intbthread_cond_wait(bthread_cond_t* __restrict c, bthread_mutex_t* __restrict m) { bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c); const int expected_seq = ic->seq->load(butil::memory_order_relaxed); if (ic->m.load(butil::memory_order_relaxed) != m) { // bind m to c bthread_mutex_t* expected_m = NULL; if (!ic->m.compare_exchange_strong( expected_m, m, butil::memory_order_relaxed)) { return EINVAL; } } bthread_mutex_unlock(m); int rc1 = 0; if (bthread::butex_wait(ic->seq, expected_seq, NULL) < 0 && errno != EWOULDBLOCK && errno != EINTR) { rc1 = errno; } const int rc2 = bthread_mutex_lock_contended(m); return (rc2 ? rc2 : rc1);}
wait 是条件变量最复杂的操作。它分四步执行:
5.1 第一步:保存期望序列号
const int expected_seq = ic->seq->load(butil::memory_order_relaxed);
在解锁之前,先读取当前序列号。这个值是"等什么"的依据——如果 seq 的值在 wait 期间被改变(由 signal 或 broadcast),butex_wait 就会返回。
5.2 第二步:绑定互斥锁
if(ic->m.load(butil::memory_order_relaxed) != m) { bthread_mutex_t* expected_m = NULL; if(!ic->m.compare_exchange_strong( expected_m, m, butil::memory_order_relaxed)) { return EINVAL; }}
条件变量在首次 wait 时与互斥锁绑定。这是一个 CAS 操作:
m 为 NULL → CAS 成功,绑定传入的互斥锁m 不为 NULL 且等于传入的互斥锁 → 已经绑定了同一个锁,跳过m 不为 NULL 但不等于传入的互斥锁 → CAS 失败,返回 EINVAL
为什么要首次绑定时才绑定? 因为条件变量初始化时还没有互斥锁——互斥锁由调用者管理。等第一次 wait 时才知道用哪个互斥锁。这与 POSIX的 pthread_cond_wait 语义一致:同一个条件变量必须始终使用同一个互斥锁。
5.3 第三步:解锁 + 等待
bthread_mutex_unlock(m);bthread::butex_wait(ic->seq, expected_seq, NULL);
先解锁互斥锁(让其他线程/协程能修改条件),然后调用 butex_wait 等待序列号变化。
butex_wait 的语义(参见本系列第四篇):如果 *seq == expected_seq,挂起当前 bthread/pthread;如果值已经变了(被 signal/broadcast 修改),立即返回。
第三个参数 NULL 表示无限等待。
5.4 第四步:重新加锁
const int rc2 = bthread_mutex_lock_contended(m);return (rc2 ? rc2 : rc1);
被唤醒后,直接调用 bthread_mutex_lock_contended(竞争路径加锁,参见本系列第六篇),重新获取互斥锁。为什么不调用 bthread_mutex_lock?因为解锁操作已经把 butex 清零了,如果此时 trylock 成功,是最理想的情况;即使失败,lock_contended 内部的 exchange(3) + butex_wait 也能正确处理。5.5 错误处理
if (bthread::butex_wait(ic->seq, expected_seq, NULL) < 0 && errno != EWOULDBLOCK && errno != EINTR) { rc1 = errno;}
butex_wait 返回 -1 时检查 errno:
EWOULDBLOCK:值不匹配(被 signal/broadcast 改了),这是正常的唤醒路径,不视为错误EINTR:被中断,POSIX 规定条件变量可以虚假唤醒,用户应在循环中检查条件
// EINTR should not be returned by cond_*wait according to docs on// pthread, however spurious wake-up is OK, just as we do here// so that users can check flags in the loop often companioning// with the cond_wait ASAP.
这意味着 bthread 的 cond_wait 可能因为中断而提前返回——但这是允许的。用户的标准用法是:bthread_mutex_lock(&mutex);while (!condition) { bthread_cond_wait(&cond, &mutex);}// 条件满足,执行操作bthread_mutex_unlock(&mutex);
六、timedwait:带超时的等待
intbthread_cond_timedwait(bthread_cond_t* __restrict c, bthread_mutex_t* __restrict m, const struct timespec* __restrict abstime) { bthread::CondInternal* ic = reinterpret_cast<bthread::CondInternal*>(c); const int expected_seq = ic->seq->load(butil::memory_order_relaxed); if (ic->m.load(butil::memory_order_relaxed) != m) { bthread_mutex_t* expected_m = NULL; if (!ic->m.compare_exchange_strong( expected_m, m, butil::memory_order_relaxed)) { return EINVAL; } } bthread_mutex_unlock(m); int rc1 = 0; if (bthread::butex_wait(ic->seq, expected_seq, abstime) < 0 && errno != EWOULDBLOCK && errno != EINTR) { rc1 = errno; } const int rc2 = bthread_mutex_lock_contended(m); return (rc2 ? rc2 : rc1);}
timedwait 与 wait 的逻辑完全相同,唯一的区别是 butex_wait 的第三个参数从 NULL 变为 abstime——绝对超时时间。
超时后 butex_wait 返回 -1,errno 为 ETIMEDOUT,最终通过 rc1 传递给调用者。
注意 abstime 是绝对时间(CLOCK_REALTIME),与 Linux futex 的相对时间不同。这是 butex 的设计选择(参见本系列第四篇)。
七、wait 的完整流程图
把 wait 的四个步骤与 signal/broadcast 串联起来,形成完整的交互流程:
[消费者] bthread_cond_wait()┌─────────────────────────────────────────────┐│ 1. expected_seq = seq->load() │ ← 保存当前序列号│ 2. CAS(m, NULL → mutex) │ ← 首次绑定时绑定互斥锁│ 3. bthread_mutex_unlock(m) │ ← 释放互斥锁│ 4. butex_wait(seq, expected_seq) │ ← 等待序列号变化│ └─ 挂起当前 bthread/pthread │└─────────────────────────────────────────────┘ │ │ 等待中... │[生产者] bthread_cond_signal() [生产者] bthread_cond_broadcast()┌──────────────────────────┐ ┌──────────────────────────────────────┐│ seq->fetch_add(1) │ │ seq->fetch_add(1) ││ butex_wake(seq) │ │ butex_requeue(seq, mutex->butex) ││ └─ 唤醒 1 个等待者 │ │ ├─ 唤醒 1 个等待者 ││ │ │ └─ 其余转移到 mutex butex 排队 │└──────────────────────────┘ └──────────────────────────────────────┘ │ ▼[消费者] 被唤醒┌─────────────────────────────────────────────┐│ 5. bthread_mutex_lock_contended(m) │ ← 重新获取互斥锁│ └─ exchange(3) + 可能 butex_wait ││ 6. 检查条件,如果不满足,回到步骤 1 │└─────────────────────────────────────────────┘
关键观察:wait 操作中,互斥锁的释放和 butex 的等待是两个独立步骤。 在 unlock 和 butex_wait 之间存在一个窗口——signal 可能在这个窗口中发生。但 butex_wait 的语义保证了安全:如果 seq 已经被 signal 改变,butex_wait 发现值不匹配,立即返回 EWOULDBLOCK,不会真正等待。
八、C++ 封装层:ConditionVariable
bthread 提供了 bthread::ConditionVariable 类,是 bthread_cond_t 的 RAII 封装。
8.1 基本接口
class ConditionVariable {public: ConditionVariable() { CHECK_EQ(0, bthread_cond_init(&_cond, NULL)); } ~ConditionVariable() { CHECK_EQ(0, bthread_cond_destroy(&_cond)); } voidwait(std::unique_lock<bthread::Mutex>& lock) { bthread_cond_wait(&_cond, lock.mutex()->native_handler()); } voidwait(std::unique_lock<bthread_mutex_t>& lock) { bthread_cond_wait(&_cond, lock.mutex()); } voidnotify_one() { bthread_cond_signal(&_cond); } voidnotify_all() { bthread_cond_broadcast(&_cond); } // ...private: bthread_cond_t _cond;};
构造时初始化,析构时销毁。wait 接受 unique_lock,自动获取底层互斥锁的指针。
8.2 带谓词的 wait
template<typename Predicate>void wait(std::unique_lock<bthread::Mutex>& lock, Predicate p) { while (!p()) { bthread_cond_wait(&_cond, lock.mutex()->native_handler()); }}
接受一个谓词 p,内部循环等待直到 p() 返回 true。这消除了手动写 while 循环的需要,也避免了忘记检查条件导致的逻辑错误。
8.3 超时等待
intwait_for(std::unique_lock<bthread::Mutex>& lock, long timeout_us){ return wait_until(lock, butil::microseconds_from_now(timeout_us));}intwait_until(std::unique_lock<bthread::Mutex>& lock, timespec duetime){ const int rc = bthread_cond_timedwait( &_cond, lock.mutex()->native_handler(), &duetime); return rc == ETIMEDOUT ? ETIMEDOUT : 0;}
与 std::condition_variable 的一个关键区别:超时时返回 ETIMEDOUT,而非抛出 std::timeout 异常。8.4 典型用法
bthread::Mutex mutex;bthread::ConditionVariable cond;bool ready = false;// 等待者std::unique_lock<bthread::Mutex> lock(mutex);cond.wait(lock, []{ return ready; });// 等价于:// while (!ready) { cond.wait(lock); }// 通知者{ std::lock_guard<bthread::Mutex> lock(mutex); ready = true;}cond.notify_one();
九、与 pthread 条件变量的对比
| | |
|---|
| | |
| | butex_wait(bthread 让出 CPU,pthread 用 futex) |
| | 首次 wait 时 CAS 绑定,不一致返回 EINVAL |
| glibc 内部实现(FUTEX_REQUEUE) | butex_requeue,直接转移等待者到互斥锁 butex |
| | |
| | 同一个条件变量上 bthread/pthread 可互相同步 |
最后一点尤其重要:bthread_cond_t 天然支持 bthread 和 pthread 的混合同步。这是因为 butex_wait 和 butex_wake 内部根据等待者的 tid 区分类型(参见本系列第四篇),同一个 butex 上可以同时有 bthread 和 pthread 等待者。这意味着一个 pthread 通知者可以唤醒一个等待在条件变量上的 bthread,反之亦然。
十、总结
bthread_cond_t 的设计可以归纳为一句话:用序列号 butex 实现等待/唤醒语义,用 butex_requeue 优化广播性能。
四个关键设计:
1. 序列号 butex。seq 是一个 butex(32 位原子整数),初始值为 0。每次 signal/broadcast 递增序列号,等待者通过比较序列号判断是否被唤醒。值不匹配时 butex_wait 立即返回,避免了无效等待。
2. 首次绑定的互斥锁关联。 互斥锁指针 m 在首次 wait 时通过 CAS 绑定。绑定后不可更换——后续使用不同互斥锁的 wait 会返回 EINVAL。这与 POSIX 语义一致。
3. butex_requeue 避免惊群效应。 broadcast 不唤醒所有等待者,而是唤醒第一个,将其余等待者直接转移到互斥锁的 butex 等待队列上。被转移的等待者省去了一次"唤醒 → 重新阻塞"的往返。
4. bthread/pthread 混合同步。 由于底层使用 butex,同一个条件变量上 bthread 和 pthread 可以互相等待和唤醒。这使得在 bthread 代码中使用 pthread 库时,同步原语仍然正确工作。
本文基于 Apache brpc 源码(src/bthread/condition_variable.h、src/bthread/condition_variable.cpp、src/bthread/types.h)撰写。