摘要:前文拆解了TaskControl全局协调器。本文转向用户最直接接触的层面——bthread公共API。get_or_new_task_control双重检查创建全局控制器,urgent/background两种启动模式对应前台/后台调度,NOSIGNAL批量提交优化减少唤醒开销,concurrency动态伸缩只增不减。
在前面的十五篇文章中,我们从底层到上层逐步拆解了 bthread 调度系统的内部实现:IOBuf(第一篇)、ObjectPool(第二篇)、ResourcePool(第三篇)、同步原语(第四至九篇)、TimerThread(第十篇)、栈管理(第十一篇)、TaskMeta(第十二篇)、TaskGroup(第十三篇)、ParkingLot(第十四篇)、TaskControl(第十五篇)。
现在我们来到用户最直接接触的层面——bthread 公共 API。这些函数定义在 bthread.h 中,实现分布在 bthread.cpp,是 bthread 库与用户代码的接口层。它们将前文分析的底层组件封装成类似 pthread 的编程模型,让用户无需关心 M:N 调度的细节。
一、API 全景
用户代码 │ ▼ ┌──────────────────────────────────────────┐ │ bthread 公共 API 层 │ │ │ │ 创建: start_urgent / start_background │ │ 等待: join / bthread_exit │ │ 控制: yield / usleep / interrupt / stop │ │ 并发: setconcurrency / getconcurrency │ │ 定时: timer_add / timer_del │ │ 批量: list_init / list_stop / list_join │ │ TLS: key_create / setspecific / getspecific │ │ 同步: mutex / cond / rwlock / sem │ │ │ └──────────────────────────────────────────┘ │ │ ▼ ▼ TaskGroup TaskControl (worker 内部) (全局协调)
API 的核心设计原则:在 bthread 中使用协程语义,在 pthread 中退化为系统调用。大多数函数通过 tls_task_group 判断当前线程是否为 worker——是则使用 bthread 的协程机制,否则回退到 pthread 对应的系统调用。
二、全局初始化:get_or_new_task_control
inline TaskControl* get_or_new_task_control(){ butil::atomic<TaskControl*>* p = (butil::atomic<TaskControl*>*)&g_task_control; TaskControl* c = p->load(memory_order_consume); if (c != NULL) return c; // 快速路径 BAIDU_SCOPED_LOCK(g_task_control_mutex); // 加锁 c = p->load(memory_order_consume); if (c != NULL) return c; // 双重检查 c = new TaskControl; int concurrency = FLAGS_bthread_min_concurrency > 0 ? FLAGS_bthread_min_concurrency : FLAGS_bthread_concurrency; c->init(concurrency); p->store(c, memory_order_release); return c;}
这是 bthread 的入口——几乎所有 API 在执行前都需要获取全局 TaskControl。实现使用经典的**双重检查锁定(Double-Checked Locking)**模式:
- 第一次检查(无锁):快速路径,
g_task_control 已初始化时直接返回,零开销 - 加锁:防止并发创建
- 第二次检查(持锁):其他线程可能在等锁期间已完成创建
FLAGS_bthread_min_concurrency 控制延迟创建:正值时先创建少量 worker,后续按需增加(通过 signal_task 触发 add_workers);零或负值时直接按 FLAGS_bthread_concurrency 创建全部 worker。
三、创建 bthread:urgent 与 background
3.1 bthread_start_urgent:紧急模式(前台启动)
int bthread_start_urgent(bthread_t* tid, const bthread_attr_t* attr, void* (*fn)(void*), void* arg) { TaskGroup* g = tls_task_group; if (g && can_run_thread_local(attr)) { // worker 线程 + tag 匹配:前台启动(立即切换) return TaskGroup::start_foreground(&g, tid, attr, fn, arg); } // 非 worker 或 tag 不匹配:投递到其他 TaskGroup return start_from_non_worker(tid, attr, fn, arg);}
紧急模式的核心语义:新 bthread 立即抢占当前 bthread 执行。在 worker 线程中调用 start_foreground(第十三篇),通过 sched_to 切换到新 bthread,调用者被放回队列等待。
3.2 bthread_start_background:后台模式
int bthread_start_background(bthread_t* tid, const bthread_attr_t* attr, void* (*fn)(void*), void* arg) { TaskGroup* g = tls_task_group; if (g && can_run_thread_local(attr)) { // worker 线程:入队但不切换 return g->start_background<false>(tid, attr, fn, arg); } return start_from_non_worker(tid, attr, fn, arg);}
后台模式类似 pthread_create:将新 bthread 加入调度队列后立即返回,调用者继续执行。模板参数 false 表示非 NOSIGNAL 模式,会立即通知 worker 调度。
3.3 两种模式的对比
urgent (前台): caller ──创建──→ new_bthread 立即执行 │ │ └──→ 放回队列 │ 等待调度 │ │ ←── sched_to ─────┘ (新 bthread 结束后回到 caller) background (后台): caller ──创建──→ new_bthread 入队 │ └──→ 继续执行 new_bthread 等待被调度
选择原则:如果新任务的优先级高于当前任务(如 RPC 响应处理),使用 start_urgent;否则使用 start_background。
3.4 can_run_thread_local:tag 匹配检查
BUTIL_FORCE_INLINE boolcan_run_thread_local(constbthread_attr_t* attr){ return attr == nullptr || attr->tag == tls_task_group->tag() || attr->tag == BTHREAD_TAG_INVALID;}
只有当请求的 tag 与当前 TaskGroup 的 tag 匹配(或未指定 tag)时,才能在当前 worker 上直接启动。tag 不匹配时回退到 start_from_non_worker,选择正确 tag 的 TaskGroup。3.5 start_from_non_worker:非 worker 线程启动
intstart_from_non_worker(bthread_t* tid, constbthread_attr_t* attr, void* (*fn)(void*), void* arg) { TaskControl* c = get_or_new_task_control(); auto tag = BTHREAD_TAG_DEFAULT; if (attr && attr->tag != BTHREAD_TAG_INVALID) tag = attr->tag; if (attr && (attr->flags & BTHREAD_NOSIGNAL)) { // NOSIGNAL 模式:缓存 TaskGroup,不唤醒 worker auto g = tls_task_group_nosignal; if (!g) { g = c->choose_one_group(tag); tls_task_group_nosignal = g; // 缓存 } return g->start_background<true>(tid, attr, fn, arg); // true = nosignal } return c->choose_one_group(tag)->start_background<true>(tid, attr, fn, arg);}
非 worker 线程没有自己的 TaskGroup,通过 choose_one_group 随机选择一个目标 TaskGroup 投递任务。NOSIGNAL 模式下,选择的 TaskGroup 被缓存到 tls_task_group_nosignal——同一个非 worker 线程连续创建的多个 NOSIGNAL bthread 会进入同一个 TaskGroup,最大化批量效应。
四、bthread_flush:批量提交
void bthread_flush() { TaskGroup* g = tls_task_group; if (g) { return g->flush_nosignal_tasks(); // worker: 刷新本地队列 } g = tls_task_group_nosignal; if (g) { tls_task_group_nosignal = NULL; return g->flush_nosignal_tasks_remote(); // 非 worker: 远程刷新 }}
bthread_flush 与 BTHREAD_NOSIGNAL 配合使用。典型的批量创建模式:// 批量创建 100 个 bthread,不立即唤醒 workerbthread_attr_t attr = BTHREAD_ATTR_NORMAL;attr.flags |= BTHREAD_NOSIGNAL;for (int i = 0; i < 100; ++i) { bthread_start_background(&tid, &attr, fn, arg);}// 一次性唤醒 workerbthread_flush();
将 100 次 signal_task(每次一次 futex_wake 系统调用)减少为 1 次,显著降低开销。这是第十三篇分析的 nosignal 优化在 API 层的体现。
worker 线程和非 worker 线程的 flush 路径不同:worker 直接操作本地队列(flush_nosignal_tasks),非 worker 需要通过远程队列(flush_nosignal_tasks_remote)。
五、bthread_join:等待结束
intbthread_join(bthread_t tid, void** thread_return){ return TaskGroup::join(tid, thread_return);}
join 通过 version_butex 实现(第十二篇、第十三篇):bthread 结束时版本号递增,join 方通过 butex_wait 阻塞等待版本变化。
注意:bthread_join 不受 bthread_interrupt 影响。这是有意设计——interrupt 用于中断阻塞操作(usleep、butex_wait),而 join 是"等待结束"的语义,不应被中断。
六、bthread_exit:退出
void bthread_exit(void* retval) { TaskGroup* g = tls_task_group; if (g && !g->is_current_main_task()) { throw ExitException(retval); // bthread: 抛异常(被 task_runner 捕获) } else { pthread_exit(retval); // pthread: 系统调用 }}
bthread 的 exit 通过 C++ 异常实现。ExitException 在 task_runner 的 try-catch 中被捕获,触发正常的 bthread 结束流程(版本号递增、资源释放、ending_sched)。
为什么不用 longjmp?因为 longjmp 不调用析构函数,会跳过栈上的 RAII 对象清理。C++ 异常确保所有局部对象的析构函数都被正确调用。
七、bthread_yield 与 bthread_usleep
intbthread_yield(void){ TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); if (g && !g->is_current_pthread_task()) { TaskGroup::yield(&g); // bthread: 切换到其他 bthread return 0; } return sched_yield(); // pthread: 系统调度让步}intbthread_usleep(uint64_t microseconds){ TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); if (g && !g->is_current_pthread_task()) { return TaskGroup::usleep(&g, microseconds); // bthread: 协程 sleep } return ::usleep(microseconds); // pthread: 系统 sleep}
这两个函数完美体现了 bthread API 的核心设计原则:
- 在 bthread 中:使用协程机制,不阻塞底层 pthread。yield 通过
sched 让出 CPU 给其他 bthread(第十三篇);usleep 通过 TimerThread 注册定时器后让出 CPU(第十篇、第十三篇) - 在 pthread 中:退化为标准系统调用。
sched_yield 让出 CPU 时间片,::usleep 阻塞当前线程
is_current_pthread_task() 区分两种场景——有些 bthread 使用 pthread 栈(STACK_TYPE_PTHREAD),它们在 worker 上运行但不能切换栈,所以退化为系统调用。
八、bthread_interrupt 与 bthread_stop
intbthread_interrupt(bthread_t tid, bthread_tag_t tag){ return TaskGroup::interrupt(tid, get_task_control(), tag);}intbthread_stop(bthread_t tid){ TaskGroup::set_stopped(tid); // 设置停止标志 return bthread_interrupt(tid); // 中断阻塞}
interrupt 的行为取决于 bthread 当前状态(第十三篇):
stop 是 set_stopped + interrupt 的组合。典型用法:
[Thread1] [Thread2] set stopping flag bthread_stop(Thread2) wake up check flag → quit
九、并发控制
9.1 bthread_setconcurrency
int bthread_setconcurrency(int num) { // 参数校验:4 <= num <= 1024 if (num < BTHREAD_MIN_CONCURRENCY || num > BTHREAD_MAX_CONCURRENCY) return EINVAL; // 延迟创建模式:只修改 flag if (FLAGS_bthread_min_concurrency > 0) { if (num < FLAGS_bthread_min_concurrency) return EINVAL; FLAGS_bthread_concurrency = num; return 0; } // 即时模式:检查 + 创建 TaskControl* c = get_task_control(); if (c && num < c->concurrency()) return EPERM; // 不能减少 if (c && num == c->concurrency()) return 0; // 已满足 BAIDU_SCOPED_LOCK(g_task_control_mutex); // ... double-check + add_workers_for_each_tag}
关键约束:并发数只能增加不能减少。这简化了实现——不需要处理 worker 正在执行任务时的回收问题。
两种模式:
- 延迟创建(
min_concurrency > 0):只修改 flag 上限,worker 在 signal_task 中按需创建 - 即时创建(
min_concurrency <= 0):直接调用 add_workers_for_each_tag 创建 worker
9.2 bthread_setconcurrency_by_tag
按 tag 设置并发数。先获取该 tag 当前的 worker 数量,计算差值,只增不减。新增的 worker 通过 add_workers(num, tag) 创建。
十、定时器 API
intbthread_timer_add(bthread_timer_t* id, timespec abstime, void (*on_timer)(void*), void* arg) { TaskControl* c = get_or_new_task_control(); TimerThread* tt = get_or_create_global_timer_thread(); bthread_timer_t tmp = tt->schedule(on_timer, arg, abstime); if (tmp != 0) { *id = tmp; return 0; } return ESTOP;}intbthread_timer_del(bthread_timer_t id){ TimerThread* tt = get_global_timer_thread(); return tt->unschedule(id);}
定时器 API 是 TimerThread(第十篇)的薄封装。schedule 将回调注册到全局 TimerThread,unschedule 通过版本号状态机取消调度。
十一、bthread_self 与 bthread_equal
bthread_tbthread_self(void){ TaskGroup* g = tls_task_group; if (g && !g->is_current_main_task()) { return g->current_tid(); } return INVALID_BTHREAD; // 0}intbthread_equal(bthread_t t1, bthread_t t2){ return t1 == t2;}
bthread_self 的一个特殊设计:worker 的主任务(_main_tid)返回 0 而非实际 tid。这是为了在日志中更容易区分主任务和用户 bthread——主任务返回 0 意味着日志中 tid 为 0 的都是内部调度操作。
十二、批量操作:bthread_list
struct TidStopper { voidoperator()(bthread_t id)const{ bthread_stop(id); }};struct TidJoiner { voidoperator()(bthread_t& id)const{ bthread_join(id, NULL); id = INVALID_BTHREAD; }};intbthread_list_stop(bthread_list_t* list){ static_cast<TidList*>(list->impl)->apply(TidStopper()); return 0;}intbthread_list_join(bthread_list_t* list){ static_cast<TidList*>(list->impl)->apply(TidJoiner()); return 0;}
bthread_list 系列提供批量管理 bthread 的能力。底层使用 ListOfABAFreeId(ABA-free 的 id 列表),通过 apply 对列表中每个 id 执行回调。TidStopper 和 TidJoiner 是仿函数(functor),分别封装了 bthread_stop 和 bthread_join。
十三、TLS:线程局部存储
intbthread_key_create(bthread_key_t* key, void (*destructor)(void*));intbthread_key_delete(bthread_key_t key);intbthread_setspecific(bthread_key_t key, void* data);void* bthread_getspecific(bthread_key_t key);
bthread 的 TLS API 语义与 pthread 完全一致,但实现不同:
- 在 bthread 中:数据存储在
TaskMeta::local_storage.keytable(第十二篇),通过 tls_bls 双存储设计实现快速访问 - 在 pthread 中:直接使用 pthread_key 系列函数
关键设计:在 pthread 和 bthread 中使用相同的 API。用户代码不需要区分当前线程类型——bthread 库内部通过 tls_task_group 自动判断。
十四、辅助 API
bthread_about_to_quit
intbthread_about_to_quit() { TaskGroup* g = tls_task_group; if (g) { TaskMeta* current_task = g->current_task(); if (!(current_task->attr.flags & BTHREAD_NEVER_QUIT)) { current_task->about_to_quit = true; } return 0; } return EPERM;}
标记当前 bthread 即将退出。调度器可据此优化——例如不再将当前 bthread 放入运行队列的优先位置。BTHREAD_NEVER_QUIT 标志的 bthread 不受影响。
bthread_set_worker_startfn / bthread_set_tagged_worker_startfn
intbthread_set_worker_startfn(void (*start_fn)());intbthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t));
设置 worker 线程启动时的回调。通常用于资源的线程局部初始化——例如在每个 worker 启动时初始化线程特定的缓存或连接池。
十五、pthread ↔ bthread 语义对照
bthread API 的设计目标是与 pthread API 尽可能一致,让用户可以轻松将 pthread 代码迁移到 bthread:
| | |
|---|
pthread_create | bthread_start_background | |
| bthread_start_urgent | |
pthread_join | bthread_join | |
pthread_exit | bthread_exit | |
pthread_self | bthread_self | |
pthread_equal | bthread_equal | |
sched_yield | bthread_yield | |
usleep | bthread_usleep | |
pthread_key_* | bthread_key_* | |
pthread_mutex_* | bthread_mutex_* | |
pthread_cond_* | bthread_cond_* | |
pthread_rwlock_* | bthread_rwlock_* | |
sem_* | bthread_sem_* | |
bthread 特有的扩展:
bthread_start_urgentbthread_flush:无 pthread 对应,NOSIGNAL 批量提交bthread_interrupt / bthread_stop:中断/停止指定 bthreadbthread_setconcurrencybthread_timer_add / bthread_timer_del:定时器(独立于 bthread 生命周期)
十六、API 层的设计模式
贯穿整个 bthread API 层的四个设计模式:
1. tls_task_group 分发。 几乎每个 API 都先检查 tls_task_group:不为空说明当前是 worker 线程,走 bthread 协程路径;为空说明是非 worker 线程,回退到 pthread 系统调用或通过 start_from_non_worker 投递。
2. 只增不减。setconcurrency 只能增加 worker 数量不能减少。这避免了 worker 正在执行 bthread 时的复杂回收逻辑。
3. 批量优化。BTHREAD_NOSIGNAL + bthread_flush 将多次 signal 合并为一次,减少 futex_wake 系统调用。
4. 统一接口。 无论在 bthread 还是 pthread 中调用,API 行为一致——内部自动适配,用户无需区分。
十七、总结
bthread 公共 API 层的设计可以归纳为一句话:以 pthread 兼容的接口封装 M:N 调度,通过 tls_task_group 自动适配 bthread/pthread 环境,NOSIGNAL 批量优化减少唤醒开销,只增不减的并发控制简化实现。
四个关键设计:
1. tls_task_group 双路径分发。 每个 API 先检查 tls_task_group:worker 线程走协程路径,非 worker 线程走 pthread 路径或通过 TaskControl 投递。这让同一套 API 在两种环境下都能正确工作。
2. urgent/background 两种启动模式。start_urgent 立即切换到新 bthread 执行(前台),适用于高优先级任务(如 RPC 响应)。start_background 仅入队不切换(后台),类似 pthread_create 的语义。
3. NOSIGNAL + flush 批量提交。 将 N 个 signal_task 延迟到一次 flush 中执行,将 N 次 futex_wake 减少为 1 次。非 worker 线程通过 tls_task_group_nosignal 缓存目标 TaskGroup,确保同一批次进入同一队列。
4. 异常实现 exit。bthread_exit 通过抛出 ExitException 实现,被 task_runner 的 try-catch 捕获。C++ 异常保证栈上 RAII 对象的析构函数被正确调用,比 longjmp 更安全。
本文基于 Apache brpc 源码(src/bthread/bthread.h、src/bthread/bthread.cpp)撰写。