摘要:前文拆解了bthread公共API。本文分析bthread如何让文件描述符的I/O等待不阻塞worker pthread。LazyArray按需分配fd到butex的映射表,EpollThread后台线程通过epoll监控fd事件,事件就绪时修改butex唤醒等待的bthread,CLOSING_GUARD哨兵实现fd_close与fd_wait的互斥。
在上一篇文章中,我们分析了 bthread 的公共 API 层。我们看到 bthread_yield 和 bthread_usleep 在 bthread 中使用协程机制,不阻塞底层 pthread。
但 I/O 操作是另一个难题。网络编程中,读写 socket 经常需要等待——等待连接建立、等待数据可读、等待缓冲区可写。如果 bthread 直接调用阻塞的 read/write/connect,底层的 worker pthread 也会被阻塞,其他 bthread 无法运行。
bthread 的 fd 机制解决了这个问题:通过后台 epoll 线程监控 I/O 事件,事件就绪时通过 butex 唤醒等待的 bthread,整个过程不阻塞 worker pthread。
一、fd 机制概览
worker pthread ┌────────────────────────────────────┐ │ bthread A: fd_wait(sock, EPOLLIN) │ │ │ │ │ │ 1. butex_wait 挂起 bthread A │ │ │ (worker 可执行其他 bthread)│ │ ▼ │ │ [执行 bthread B、C、...] │ └────────────────────────────────────┘ EpollThread (后台 bthread) 内核 epoll ┌──────────────────┐ ┌──────────────┐ │ epoll_wait() │◄──────────────│ sock 可读 │ │ butex++ │ │ │ │ butex_wake_all() │──唤醒─────────►│ │ └──────────────────┘ └──────────────┘ │ ▼ bthread A 被唤醒,继续执行 read()
三个核心组件:
- LazyArray:全局 fd → butex 映射表,按需分配内存,最多支持 6700 万个 fd
- EpollThread:后台 bthread,通过 epoll/kqueue 监控 fd 事件,事件就绪时修改 butex 唤醒等待者
- CLOSING_GUARD:哨兵值,实现 fd_wait 与 fd_close 的互斥,防止关闭期间新的等待
二、LazyArray:按需分配的映射表
2.1 数据结构
template <typename T, size_t NBLOCK, size_t BLOCK_SIZE>class LazyArray { struct Block { butil::atomic<T> items[BLOCK_SIZE]; // 每块 256 个原子槽位 }; butil::atomic<Block*> _blocks[NBLOCK]; // 最多 262144 个块指针};// 全局实例:fd → EpollButex* 映射LazyArray<EpollButex*, 262144/*NBLOCK*/, 256/*BLOCK_SIZE*/> fd_butexes;// 最多寻址 262144 × 256 = 67,108,864 个文件描述符
LazyArray 是一个两级索引的分块数组。262144 个块指针初始全为 NULL,每个块包含 256 个原子槽位。给定一个 fd,通过 fd / 256 定位块,fd % 256 定位块内偏移。
为什么不直接用 std::vector?因为 fd 的范围可能很大(Linux 默认最大 1024×1024),但实际使用的 fd 通常集中在较低值。如果预分配全部空间会浪费大量内存。LazyArray 按需分配块——只有被访问的块才会 new 出来。
2.2 get_or_new:获取或创建
butil::atomic<T>* get_or_new(size_t index){ const size_t block_index = index / BLOCK_SIZE; if (block_index >= NBLOCK) return NULL; // 越界检查 const size_t block_offset = index - block_index * BLOCK_SIZE; Block* b = _blocks[block_index].load(memory_order_consume); if (b != NULL) return b->items + block_offset; // 快速路径:块已存在 b = new Block; std::fill(b->items, b->items + BLOCK_SIZE, T()); // 初始化为默认值 Block* expected = NULL; if (_blocks[block_index].compare_exchange_strong( expected, b, memory_order_release, memory_order_consume)) { return b->items + block_offset; // CAS 成功 } delete b; // CAS 失败:其他线程已创建 return expected->items + block_offset;}
使用 CAS(Compare-And-Swap)保证无锁的并发安全。多个线程可能同时为同一个块调用 get_or_new,但只有一个线程的 CAS 会成功。失败的线程释放自己分配的块,使用成功者的块。
get_or_new 是幂等的——无论调用多少次,同一 index 总是返回相同的槽位指针。
2.3 get:只读获取
butil::atomic<T>* get(size_t index)const{ const size_t block_index = index / BLOCK_SIZE; if (block_index < NBLOCK) { Block* b = _blocks[block_index].load(memory_order_consume); if (b != NULL) return b->items + block_offset; } return NULL;}
get 不创建块——如果块未分配则返回 NULL。用于 fd_close 等只需要读取的场景,避免了不必要的内存分配。
三、EpollButex 与 CLOSING_GUARD
typedef butil::atomic<int> EpollButex;static EpollButex* const CLOSING_GUARD = (EpollButex*)(intptr_t)-1L;
就绪时 fetch_add(1) 修改值并 butex_wake_all 唤醒等待者。
CLOSING_GUARD 是一个特殊哨兵值((intptr_t)-1),用于 fd_close 期间的互斥:
fd_wait: fd_close: exchange(butex_ptr, CLOSING_GUARD) load(butex_ptr) → CLOSING_GUARD while (butex == CLOSING_GUARD) sched_yield() ← 自旋等待 butex++ + wake_all close(fd) exchange(butex_ptr, original) load(butex_ptr) → normal butex 继续执行 fd_wait
CLOSING_GUARD 确保在关闭 fd 期间,不会有新的 fd_wait 注册到 epoll 上。fd_wait 检测到 CLOSING_GUARD 后自旋等待,直到 fd_close 完成恢复 butex 指针。
四、EpollThread:后台 epoll 线程
classEpollThread{ int _epfd; // epoll/kqueue 实例 bool _stop; // 停止标志 bthread_t _tid; // 后台 bthread ID butil::Mutex _start_mutex; // 保护 start() 的互斥锁};
4.1 start:启动
intstart(int epoll_size) { if (started()) return -1; // 快速检查 _start_mutex.lock(); if (started()) { _start_mutex.unlock(); return -1; } // 双重检查 _epfd = epoll_create(epoll_size); // 创建 epoll 实例 _start_mutex.unlock(); bthread_attr_t attr = BTHREAD_ATTR_NORMAL; bthread_start_background(&_tid, &attr, run_this, this); // 创建后台 bthread return 0;}
双重检查锁定保证 epoll_create 和 bthread 创建只执行一次。start() 可以被多次调用(如 get_epoll_thread 每次都调用 start),但只有第一次生效。
EpollThread 运行在 bthread 上而非 pthread 上——这让 epoll 线程本身也参与 bthread 调度,与其他 bthread 共享 worker 资源。
4.2 run:事件循环
void* run(){ const size_t MAX_EVENTS = 32; epoll_event* e = new epoll_event[MAX_EVENTS]; while (!_stop) { const int n = epoll_wait(_epfd, e, MAX_EVENTS, -1); // 阻塞等待 if (_stop) break; if (n < 0) { if (errno == EINTR) continue; break; } for (int i = 0; i < n; ++i) { EpollButex* butex = static_cast<EpollButex*>(e[i].data.ptr); if (butex != NULL && butex != CLOSING_GUARD) { butex->fetch_add(1, memory_order_relaxed); // 修改值 butex_wake_all(butex); // 唤醒等待者 } } } delete[] e; return NULL;}
事件循环的核心逻辑:
epoll_wait 阻塞等待 I/O 事件(最多返回 32 个)- 修改 butex 值(
fetch_add(1)),然后唤醒所有等待的 bthread
每次唤醒都递增 butex 值——这确保了 fd_wait 能检测到事件变化。butex 值只增不减,不存在 ABA 问题。
三种 butex 状态的处理:
- NULL:fd 没有 bthread 在等待,忽略
- CLOSING_GUARD:fd 正在被关闭,忽略(由
fd_close 负责唤醒) - 正常指针:修改值并唤醒
4.3 多 EpollThread 支持
EpollThread epoll_thread[BTHREAD_EPOLL_THREAD_NUM]; // 默认 1 个staticinline EpollThread& get_epoll_thread(int fd){ if (BTHREAD_EPOLL_THREAD_NUM == 1) { EpollThread& et = epoll_thread[0]; et.start(BTHREAD_DEFAULT_EPOLL_SIZE); return et; } // 多个:哈希分配 EpollThread& et = epoll_thread[fmix32(fd) % BTHREAD_EPOLL_THREAD_NUM]; et.start(BTHREAD_DEFAULT_EPOLL_SIZE); return et;}
BTHREAD_EPOLL_THREAD_NUM 控制 epoll 线程数量。多个 epoll 线程通过 fmix32(fd) % N 将 fd 分配到不同线程,减少单个 epoll 实例的争用。编译期常量让单线程场景(默认)零开销——直接返回第一个实例,不计算哈希。
五、fd_wait:等待 I/O 事件

intfd_wait(int fd, unsigned events, const timespec* abstime){ // 1. 获取(或创建) fd 对应的 butex 槽位 atomic<EpollButex*>* p = fd_butexes.get_or_new(fd); // 2. 获取 butex 指针(懒创建) EpollButex* butex = p->load(memory_order_consume); if (butex == NULL) { butex = butex_create_checked<EpollButex>(); // 从 ObjectPool 分配 butex->store(0); EpollButex* expected = NULL; if (!p->compare_exchange_strong(expected, butex, ...)) { butex_destroy(butex); // 其他线程已创建 butex = expected; } } // 3. 等待 CLOSING_GUARD 消失 while (butex == CLOSING_GUARD) { sched_yield(); butex = p->load(memory_order_consume); } // 4. 保存 butex 当前值 const int expected_val = butex->load(memory_order_relaxed); // 5. 注册到 epoll epoll_event evt = { events | EPOLLONESHOT, { butex } }; epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt); // 先尝试 MOD if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 && errno != EEXIST) { return -1; } // 6. 等待循环 while (butex->load(memory_order_relaxed) == expected_val) { if (butex_wait(butex, expected_val, abstime) < 0 && errno != EWOULDBLOCK && errno != EINTR) { return -1; } } return 0;}
fd_wait 的六步流程:
步骤 1-2:获取 butex。 从 LazyArray 获取 fd 对应的槽位。如果 butex 为 NULL,通过 CAS 创建——乐观锁避免每个 fd 都需要互斥锁。
步骤 3:CLOSING_GUARD 检查。 如果 fd_close 正在进行,自旋等待直到完成。
步骤 4:保存期望值。 记录 butex 的当前值。butex_wait 会比较当前值与期望值——如果相等,说明事件未就绪,挂起等待;如果不等,说明事件已就绪,直接返回。
步骤 5:注册 epoll。 使用 EPOLLONESHOT 确保每次只触发一次事件,避免重复唤醒。先尝试 MOD(fd 可能已在 epoll 中),失败再 ADD。
步骤 6:等待循环。 反复检查 butex 值是否变化。butex_wait 在值未变时挂起当前 bthread,值变化后被 EpollThread 唤醒。循环设计处理假唤醒(spurious wakeup)和中断(EINTR)。
关键时序保证:步骤 4(保存值)在步骤 5(注册 epoll)之前执行。这确保不会丢失事件——如果在注册 epoll 之前事件就已就绪,但ex 值已经改变,步骤 6 的检查会立即发现。
六、fd_close:安全关闭
intfd_close(int fd){ atomic<EpollButex>* pbutex = fd_butexes.get(fd); if (pbutex == NULL) return close(fd); // 从未通过 bthread_fd_* 使用 // 1. 替换为 CLOSING_GUARD(阻止新的 fd_wait) EpollButex* butex = pbutex->exchange(CLOSING_GUARD, memory_order_relaxed); if (butex == CLOSING_GUARD) { errno = EBADF; return -1; } // 并发双重关闭 // 2. 唤醒所有等待者 if (butex != NULL) { butex->fetch_add(1); butex_wake_all(butex); } // 3. 从 epoll 移除 epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL); // 4. 关闭 fd const int rc = close(fd); // 5. 恢复 butex 指针 pbutex->exchange(butex, memory_order_relaxed); return rc;}
fd_close 的五步流程是一个精心设计的互斥协议:
步骤 1:设置哨兵。exchange(CLOSING_GUARD) 原子地将 butex 指针替换为哨兵值。此后新的 fd_wait 会检测到 CLOSING_GUARD 并自旋等待。
步骤 2:唤醒等待者。 递增 butex 值并唤醒所有在 fd_wait 中阻塞的 bthread。这些 bthread 醒来后检查 butex 值已变,退出等待循环,通常会得到错误返回。
步骤 3:从 epoll 移除。 防止已关闭的 fd 产生虚假事件。
步骤 4:关闭 fd。 实际的系统调用。
步骤 5:恢复 butex。 将 CLOSING_GUARD 替换回原来的 butex 指针。此时如果有 bthread 在 fd_wait 的 CLOSING_GUARD 自旋中,会检测到恢复并继续。
注意步骤 5 的设计意图:但ex 指针被恢复但不被销毁——因为同一个 fd 号可能被内核快速复用,新的 fd_wait 可以复用现有的 butex。
七、stop_and_join:停止 epoll 线程
intstop_and_join(){ const int saved_epfd = _epfd; _epfd = -1; // 标记未启动 _stop = true; // 设置停止标志 // 关闭 _epfd 不能唤醒 epoll_wait // 创建 pipe 并注册持续触发 EPOLLOUT 的事件来唤醒 int closing_epoll_pipe[2]; pipe(closing_epoll_pipe); epoll_event evt = { EPOLLOUT, { NULL } }; epoll_ctl(saved_epfd, EPOLL_CTL_ADD, closing_epoll_pipe[1], &evt); bthread_join(_tid, NULL); // 等待 bthread 退出 close(closing_epoll_pipe[0]); close(closing_epoll_pipe[1]); close(saved_epfd); return 0;}
stop_and_join 的难点:close(_epfd) 不能唤醒正在阻塞的 epoll_wait(内核行为)。解决方案是创建一个 pipe,将写端注册为持续触发 EPOLLOUT 的事件——pipe 的写端总是可写,epoll_wait 被唤醒后检查 _stop 标志为 true,退出循环。
八、公共 API 层
8.1 bthread_fd_wait / bthread_fd_timedwait
intbthread_fd_wait(int fd, unsigned events){ if (fd < 0) { errno = EINVAL; return -1; } TaskGroup* g = tls_task_group; if (g && !g->is_current_pthread_task()) { return get_epoll_thread(fd).fd_wait(fd, events, NULL); // bthread: epoll+butex } return pthread_fd_wait(fd, events, NULL); // pthread: poll()}intbthread_fd_timedwait(int fd, unsigned events, const timespec* abstime){ // 同上,但传入 abstime 支持超时}
经典的 tls_task_group 双路径分发:bthread 中使用 epoll+butex 协程等待,pthread 中回退到 poll() 系统调用。
8.2 bthread_connect:非阻塞连接
intbthread_connect(int sockfd, const sockaddr* addr, socklen_t addrlen){ TaskGroup* g = tls_task_group; if (!g || g->is_current_pthread_task()) { return ::connect(sockfd, addr, addrlen); // pthread: 直接连接 } bool is_blocking = butil::is_blocking(sockfd); if (is_blocking) butil::make_non_blocking(sockfd); // 临时设为非阻塞 auto guard = MakeScopeGuard([is_blocking, sockfd]() { if (is_blocking) butil::make_blocking(sockfd); // RAII 恢复 }); int rc = ::connect(sockfd, addr, addrlen); if (rc == 0 || errno != EINPROGRESS) return rc; // 等待可写(连接完成) if (bthread_fd_wait(sockfd, EPOLLOUT) < 0) return -1; return (butil::is_connected(sockfd) != 0) ? -1 : 0;}
bthread_connect 将阻塞的 connect() 转化为非阻塞模式:
- 发起非阻塞
connect(),预期返回 -1 + EINPROGRESS - 通过
bthread_fd_wait 等待 socket 可写(连接完成) - 检查连接是否真的成功(
getsockopt(SO_ERROR))
bthread_timed_connect 增加了超时支持,使用 bthread_fd_timedwait 代替 bthread_fd_wait。
8.3 bthread_close
intbthread_close(int fd) { return get_epoll_thread(fd).fd_close(fd);}
直接委托给对应 EpollThread 的 fd_close。
九、EPOLLONESHOT 的使用
// fd_wait 中注册事件epoll_event evt = { events | EPOLLONESHOT, { butex } };epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt);
EPOLLONESHOT 确保每次注册只触发一次事件。触发后 fd 在 epoll 中被禁用,需要下次 fd_wait 重新注册。这避免了多线程场景下的重复唤醒问题——一个 bthread 等待的事件不应被其他 bthread 消费。
如果内核有 epoll oneshot bug(BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG 未定义),则退化为每次事件触发后 EPOLL_CTL_DEL,下次 fd_wait 再 ADD——性能较差但更安全。
十、总结
bthread fd 机制的设计可以归纳为一句话:后台 epoll 线程监控 I/O 事件,通过 butex 唤醒等待的 bthread,LazyArray 按需映射 fd 到 butex,CLOSING_GUARD 哨兵保证关闭期间的互斥安全。
五个关键设计:
1. LazyArray 按需分配。 fd 范围大但实际使用集中。两级索引的分块数组只分配被访问的块,CAS 保证无锁并发安全。67M 的寻址能力满足几乎所有场景。
2. butex 作为事件通知机制。 EpollThread 在事件就绪时递增 butex 值并唤醒等待者。fd_wait 通过 butex_wait 挂起 bthread——这是协程级的挂起,不阻塞 worker pthread,其他 bthread 可以继续执行。
3. CLOSING_GUARD 原子互斥。fd_close 通过 exchange(CLOSING_GUARD) 原子地标记"正在关闭",阻止新的 fd_wait。关闭完成后恢复 butex 指针,等待中的 fd_wait 自动继续。无需额外的互斥锁。
4. EPOLLONESHOT 避免重复唤醒。 每次注册只触发一次事件,确保一个 bthread 等待的事件不会被其他 bthread 消费。下次 fd_wait 重新注册,保证正确性。
5. tls_task_group 双路径分发。 bthread 中使用 epoll+butex 协程等待,pthread 中回退到 poll() 系统调用。同一套 API 在两种环境下都能正确工作,用户无需区分。
本文基于 Apache brpc 源码(src/bthread/fd.cpp)撰写。