摘要:前文拆解了TaskMeta。本文转向TaskGroup——bthread调度的核心单元,每个worker pthread一个。本地队列和远程队列管理就绪任务,工作窃取实现负载均衡。sched_to通过jump_stack完成上下文切换,RemainedFn实现延迟回调,nosignal机制优化批量唤醒。
在上一篇文章中,我们拆解了 TaskMeta——每个 bthread 的核心数据结构。我们看到它的字段分为 Not Reset 和 Reset 两类,version_butex 实现 join 等待。
现在我们转向 bthread 调度的核心:TaskGroup。
bthread 的 M:N 模型中,M 个 bthread 被映射到 N 个 pthread 上执行。每个 pthread 关联一个 TaskGroup,负责管理该 pthread 上所有 bthread 的调度。TaskGroup 是 bthread 调度的"引擎"——从队列中取任务、上下文切换、执行用户函数、回收资源,所有这些都在 TaskGroup 中完成。
一、TaskGroup 概览
TaskControl (全局调度器) ┌─────────────────────────┐ │ TaskGroup[0] │ │ TaskGroup[1] │ │ ... │ │ TaskGroup[N-1] │ └─────────────────────────┘ │ │ ┌────┘ └────┐ ▼ ▼ ┌─────────────┐ ┌─────────────┐ │ TaskGroup[0]│ │ TaskGroup[1]│ │ (pthread 0) │ │ (pthread 1) │ │ │ │ │ │ _rq (本地) │◄─steal─ _rq (本地) │ │ _remote_rq │ │ _remote_rq │ │ _main_stack │ │ _main_stack │ │ _cur_meta │ │ _cur_meta │ └─────────────┘ └─────────────┘
每个 TaskGroup 管理:
- 本地运行队列
_rq:WorkStealingQueue,存储待执行的 bthread - 远程任务队列
_remote_rq:其他线程提交到此 TaskGroup 的任务 - 当前 bthread
_cur_meta:正在此 worker 上执行的 bthread - 主栈
_main_stack:worker pthread 的原生栈 - CPU 时间统计
_cpu_time_stat:用于负载均衡
调度流程:run_main_task → wait_task → sched_to → task_runner → ending_sched → sched_to → ...
二、数据结构
2.1 TaskGroup 核心字段
class TaskGroup { TaskMeta* _cur_meta{NULL}; // 当前运行的 bthread TaskControl* _control{NULL}; // 所属的 TaskControl // === 任务队列 === WorkStealingQueue<bthread_t> _rq; // 本地运行队列(无锁) RemoteTaskQueue _remote_rq; // 远程任务队列(有锁) // === 主任务 === ContextualStack* _main_stack{NULL}; // 主栈(pthread 原生栈) bthread_t _main_tid{INVALID_BTHREAD}; // 主 bthread // === 调度控制 === RemainedFn _last_context_remained{NULL}; // 延迟回调 void* _last_context_remained_arg{NULL}; ParkingLot* _pl{NULL}; // worker 睡眠/唤醒 // === 工作窃取 === size_t _steal_seed; // 随机种子 size_t _steal_offset; // 窃取偏移(质数步长) // === nosignal 优化 === int _num_nosignal{0}; // 本地未发信号计数 int _nsignaled{0}; // 已发信号计数 int _remote_num_nosignal{0}; // 远程未发信号计数 // === 统计 === AtomicCPUTimeStat _cpu_time_stat; // CPU 时间统计(128位原子) size_t _nswitch{0}; // 上下文切换次数 bthread_tag_t _tag; // 任务标签(分组隔离) pthread_t _tid{}; // worker pthread ID};
字段按功能分为五组:任务队列、主任务、调度控制、工作窃取、统计。下面逐一分析关键设计。
2.2 bthread_t 编解码
// 高 32 位:版本号 低 32 位:ResourcePool slotinlinebthread_tmake_tid(uint32_t version, ResourceId<TaskMeta> slot){ return ((uint64_t)version << 32) | slot.value;}inline ResourceId<TaskMeta> get_slot(bthread_t tid){ return { tid & 0xFFFFFFFF };}inlineuint32_tget_version(bthread_t tid){ return tid >> 32;}
与 TimerThread 的 TaskId 编码(第十篇)和 ResourcePool 的 ResourceId(第三篇)设计一致。版本号解决 ABA 问题——TaskMeta 被复用后 slot 相同,但版本号已变,旧 tid 的操作会被拒绝。
三、初始化
intTaskGroup::init(size_t runqueue_capacity){ _rq.init(runqueue_capacity); // 本地队列 _remote_rq.init(runqueue_capacity / 2); // 远程队列(容量减半) ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL); // 主栈(空壳) ResourceId<TaskMeta> slot; TaskMeta* m = get_resource<TaskMeta>(&slot); // 从 ResourcePool 分配 // 重置 [Reset] 字段 m->fn = NULL; m->arg = NULL; m->local_storage = LOCAL_STORAGE_INIT; m->stat = EMPTY_STAT; m->attr = BTHREAD_ATTR_TASKGROUP; // 构建 tid 并关联栈 m->tid = make_tid(*m->version_butex, slot); m->set_stack(stk); _cur_meta = m; _main_tid = m->tid; _main_stack = stk; return 0;}
初始化创建一个"主任务"——一个没有用户入口函数的 bthread,运行在 pthread 原生栈上。它是 worker pthread 调度循环的"锚点":所有 bthread 执行完毕后都会回到主任务。
主任务的特点:fn = NULL(没有入口函数)、stack = _main_stack(使用 pthread 原生栈)、tid = _main_tid。
四、主循环
void TaskGroup::run_main_task() { TaskGroup* dummy = this; bthread_t tid; while (wait_task(&tid)) { // 等待/窃取一个任务 sched_to(&dummy, tid); // 切换到该任务执行 // sched_to 返回 = bthread 执行完毕或让出 if (_cur_meta->tid != _main_tid) { task_runner(1/*skip_remained*/); // 从 bthread 返回,执行收尾 } }}
主循环极简:等待任务 → 切换执行 → 回到主任务 → 重复。核心调度逻辑在 wait_task、sched_to 和 task_runner 中。
4.1 wait_task:等待与窃取
boolTaskGroup::wait_task(bthread_t* tid){ do { if (_last_pl_state.stopped()) return false; // TaskControl 停止 _pl->wait(_last_pl_state); // 在 ParkingLot 上等待 if (steal_task(tid)) return true; // 尝试窃取 } while (true);}boolTaskGroup::steal_task(bthread_t* tid){ if (_remote_rq.pop(tid)) return true; // 先从远程队列取 return _control->steal_task(tid, &_steal_seed, _steal_offset); // 从其他 TaskGroup 偷}
窃取策略:先本地远程队列,再跨 TaskGroup 窃取。_steal_seed 和 _steal_offset 确保偷取目标随机化(质数步长减少冲突),避免多个 worker 同时偷同一个 TaskGroup。
五、创建 bthread
5.1 start_background:后台启动
template <bool REMOTE>intTaskGroup::start_background(bthread_t* th, constbthread_attr_t* attr, void* (*fn)(void*), void* arg) { TaskMeta* m = get_resource<TaskMeta>(&slot); // 从 ResourcePool 分配 // 重置 [Reset] 字段 m->fn = fn; m->arg = arg; m->stack = NULL; // 栈在调度时分配 m->tid = make_tid(*m->version_butex, slot); *th = m->tid; _control->_nbthreads << 1; // 递增全局计数 if (REMOTE) { ready_to_run_remote(m, nosignal); // 远程队列 } else { ready_to_run(m, nosignal); // 本地队列 } return 0;}
后台启动:创建 TaskMeta → 入队 → 返回。调用者不切换,新 bthread 等待被调度。REMOTE 模板参数决定入本地队列还是远程队列。
5.2 start_foreground:前台启动
intTaskGroup::start_foreground(TaskGroup** pg, bthread_t* th, ...){ // 创建 TaskMeta(同 start_background) TaskMeta* m = ...; if (g->is_current_pthread_task()) { g->ready_to_run(m, nosignal); // pthread 模式不能切换,入队 } else { // 设置 remained:切换后将当前 bthread 放回队列 ReadyToRunArgs args = { g->tag(), g->_cur_meta, nosignal }; g->set_remained(ready_to_run_in_worker, &args); sched_to(pg, m->tid); // 立即切换到新 bthread } return 0;}
前台启动:创建新 bthread 后立即切换到它执行,调用者(当前 bthread)被放入运行队列等待。这通过 set_remained + sched_to 实现——remained 回调在切换到新栈后执行,将旧 bthread 放回队列。
pthread 模式的 bthread 不能切换栈(没有独立栈),所以只能入队。
六、sched_to:核心上下文切换
sched_to 是 bthread 调度的核心——从当前 bthread 切换到目标 bthread。
6.1 sched_to(by tid):栈分配
voidTaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid){ TaskMeta* next_meta = address_meta(next_tid); if (next_meta->stack == NULL) { // 首次调度:分配栈,入口函数为 task_runner ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner); if (stk) { next_meta->set_stack(stk); } else { // 分配失败:回退到 pthread 模式 next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD; next_meta->set_stack((*pg)->_main_stack); } } sched_to(pg, next_meta);}
首次调度的 bthread 没有栈(stack == NULL),此时从 ObjectPool 获取栈并设置入口函数 task_runner。栈分配失败时回退到 pthread 模式(使用 _main_stack)。
6.2 sched_to(by meta):实际切换
voidTaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta){ TaskGroup* g = *pg; // 1. 保存 bthread 私有状态 int saved_errno = errno; void* saved_unique_user_ptr = tls_unique_user_ptr; // 2. 更新 CPU 时间统计 TaskMeta* cur_meta = g->_cur_meta; int64_t now = cpuwide_time_ns(); cur_meta->stat.cputime_ns += now - last_run_ns; ++cur_meta->stat.nswitch; ++g->_nswitch; // 3. 切换当前任务指针 g->_cur_meta = next_meta; // 4. 切换 TLS cur_meta->local_storage = tls_bls; // 保存当前 TLS tls_bls = next_meta->local_storage; // 加载目标 TLS // 5. 栈切换 if (cur_meta->stack != NULL && cur_meta->stack != next_meta->stack) { jump_stack(cur_meta->stack, next_meta->stack); // 汇编级切换 g = tls_task_group; // 可能已迁移到其他 TaskGroup } // 6. 执行延迟回调 while (g->_last_context_remained) { RemainedFn fn = g->_last_context_remained; g->_last_context_remained = NULL; fn(g->_last_context_remained_arg); g = tls_task_group; } // 7. 恢复 bthread 私有状态 errno = saved_errno; tls_unique_user_ptr = saved_unique_user_ptr; *pg = g;}
七步操作,每步都有明确的目的:
步骤 1-2:保存状态和统计。 errno 和 tls_unique_user_ptr 是 bthread 私有的——切换时保存,切回时恢复。CPU 时间统计在每次切换时更新。
步骤 3-4:切换指针和 TLS。_cur_meta 指向新任务。TLS(tls_bls)从当前 TaskMeta 保存到目标 TaskMeta 加载——这是 local_storage 双存储设计(第十二篇)的实际使用场景。
步骤 5:栈切换。 只有当两个任务的栈不同时才执行 jump_stack(纯用户态汇编,不涉及系统调用)。jump_stack 返回意味着其他 bthread 切回了当前 bthread——此时 tls_task_group 可能已经改变(bthread 可能被迁移到其他 worker)。
步骤 6:执行延迟回调。 这是 TaskGroup 的核心设计之一——下一节详述。
步骤 7:恢复状态。 恢复当前 bthread 的 errno 和 unique_user_ptr。
七、RemainedFn:延迟回调机制
typedefvoid(*RemainedFn)(void*);voidset_remained(RemainedFn cb, void* arg);
RemainedFn 是 sched_to 中最精巧的设计。问题:在 bthread A 切换到 bthread B 之前,有些操作需要在"切换后"才能执行。例如,把 A 放回运行队列——如果在切换前放回,A 可能被其他 worker 立即拿走执行,而 A 还在当前栈上。
解决方案:set_remained(fn, arg) 注册一个回调,在 sched_to 的 jump_stack 返回后执行。此时已经在目标 bthread 的栈上,原 bthread 的栈不再使用。
典型用法:
// start_foreground:切换后将当前 bthread 放回队列ReadyToRunArgs args = { g->tag(), g->_cur_meta, nosignal };g->set_remained(ready_to_run_in_worker, &args);sched_to(pg, m->tid);// task_runner 结束时:切换后释放旧 bthread 的栈g->set_remained(_release_last_context, m);ending_sched(&g);// usleep:切换后注册定时器SleepArgs e = { timeout_us, g->current_tid(), g->current_task(), g };g->set_remained(_add_sleep_event, &e);sched(pg);
三种场景都遵循同一模式:先设置回调,再触发切换,回调在新栈上执行。
八、task_runner:执行入口
task_runner 是所有 bthread 的最终入口——bthread_make_fcontext 在新栈上设置的入口函数就是它。void TaskGroup::task_runner(intptr_t skip_remained) { TaskGroup* g = tls_task_group; // 1. 执行上一轮的延迟回调 if(!skip_remained) { while(g->_last_context_remained) { RemainedFn fn = g->_last_context_remained; g->_last_context_remained = NULL; fn(g->_last_context_remained_arg); g = tls_task_group; // 回调可能改变 TaskGroup } } do { TaskMeta* m = g->_cur_meta; // 2. 执行用户函数 void* thread_return = m->fn(m->arg); // 3. 清理 TLS(KeyTable、rpcz span) return_keytable(m->attr.keytable_pool, tls_bls.keytable); // 4. 递增版本号,唤醒 join 者 ++*m->version_butex; butex_wake_except(m->version_butex, 0); // 5. 递减 bthread 计数 g->_control->_nbthreads << -1; // 6. 设置 remained:释放栈和 TaskMeta g->set_remained(_release_last_context, m); // 7. 结束调度:切换到下一个 bthread ending_sched(&g); } while(g->_cur_meta->tid != g->_main_tid); // 循环直到回到主任务}
关键设计:do-while 循环。bthread 的 fn 执行完毕后,ending_sched 切换到下一个 bthread。如果下一个不是主任务,循环继续执行新 bthread 的 fn。这避免了 ending_sched → sched_to → jump_stack → task_runner 的递归调用链——所有 bthread 都在同一个 task_runner 调用栈上执行。
_release_last_context 在切换后释放旧 bthread 的栈和 TaskMeta:
void TaskGroup::_release_last_context(void* arg) { TaskMeta* m = static_cast<TaskMeta*>(arg); if (m->stack_type() != STACK_TYPE_PTHREAD) { return_stack(m->release_stack()); // 归还栈到 ObjectPool } return_resource(get_slot(m->tid)); // 归还 TaskMeta 到 ResourcePool}
为什么用 remained 而不是直接释放?因为此时还在当前 bthread 的栈上执行——不能释放自己的栈。remained 回调在 jump_stack 到新栈之后执行,旧栈不再使用,可以安全归还。
九、ending_sched:结束调度与栈复用
voidTaskGroup::ending_sched(TaskGroup** pg){ TaskGroup* g = *pg; bthread_t next_tid = 0; // 从本地队列或窃取获取下一个任务 const bool popped = g->_rq.pop(&next_tid); if (!popped && !g->steal_task(&next_tid)) { next_tid = g->_main_tid; // 没有任务,切回主任务 } TaskMeta* cur_meta = g->_cur_meta; TaskMeta* next_meta = address_meta(next_tid); // 栈分配/复用 if (next_meta->stack == NULL) { if (next_meta->stack_type() == cur_meta->stack_type()) { // 栈类型相同:直接复用当前 bthread 的栈 next_meta->set_stack(cur_meta->release_stack()); } else { // 栈类型不同:从 ObjectPool 获取新栈 ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner); next_meta->set_stack(stk); } } sched_to(pg, next_meta);}
与 sched 的关键区别:ending_sched 中当前 bthread 已经结束,它的栈可以被下一个 bthread 复用。如果栈类型相同(都是 Normal),直接把栈从旧 TaskMeta 转移到新 TaskMeta——避免了归还 ObjectPool 再重新获取的开销。
十、nosignal 优化:批量唤醒
void TaskGroup::ready_to_run(TaskMeta* meta, bool nosignal) { push_rq(meta->tid); // 放入本地队列 if (nosignal) { ++_num_nosignal; // 不唤醒 worker,只计数 } else { int additional = _num_nosignal; _num_nosignal = 0; _control->signal_task(1 + additional, _tag); // 一次性唤醒 }}
nosignal 优化:每次 signal_task 都需要 futex_wake 系统调用。如果在短时间内创建大量 bthread,每个都调用 signal_task 开销很大。解决方案:
nosignal=true:只入队不唤醒,增加 _num_nosignal 计数nosignal=false:入队 + 唤醒,一次性唤醒 1 + _num_nosignal 个 worker
这样 N 个 nosignal=true 后跟一个 nosignal=false,总共只调用一次 signal_task。
十一、join 与 exists
intTaskGroup::join(bthread_t tid, void** return_value){ if (tid == 0) return EINVAL; TaskMeta* m = address_meta(tid); if (g != NULL && g->current_tid() == tid) return EINVAL; // 不能 join 自己 const uint32_t expected_version = get_version(tid); while (*m->version_butex == expected_version) { butex_wait(m->version_butex, expected_version, NULL); // 阻塞等待版本变化 } return 0;}
join 通过 version_butex 实现(第十二篇已分析):bthread 结束时版本号递增,join 方通过 butex_wait 阻塞等待版本变化。
exists 通过版本号比对判断 bthread 是否仍然存活:
boolTaskGroup::exists(bthread_t tid){ TaskMeta* m = address_meta(tid); return m && (*m->version_butex == get_version(tid));}
十二、睡眠与中断
12.1 usleep
int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) { if (timeout_us == 0) { yield(pg); return 0; } SleepArgs e = { timeout_us, g->current_tid(), g->current_task(), g }; g->set_remained(_add_sleep_event, &e); // 切换后注册定时器 sched(pg); // 让出 CPU // 被定时器唤醒后继续执行 if (e.meta->sleep_failed) { ... return -1; } if (e.meta->interrupted) { ... return -1; } return 0;}
usleep 使用 RemainedFn 机制:先 set_remained 注册 _add_sleep_event,再 sched 让出 CPU。_add_sleep_event 在切换到新栈后执行,将当前 bthread 注册到 TimerThread(第十篇)。定时器到期时回调 ready_to_run_from_timer_thread 将 bthread 重新放入运行队列。
为什么不在 sched 之前注册定时器?如果先注册定时器再 sched,定时器可能在 sched 之前就到期——此时当前 bthread 还在运行,但 ready_to_run_from_timer_thread 可能已经尝试唤醒它,导致竞态。
12.2 interrupt
中断是"持久化"的:即使 bthread 当前未阻塞,中断标志也会被记住,在下次阻塞时检查。这简化了实现,减少了竞态导致的通知丢失。
中断分两种情况:
- bthread 正在 butex 上等待(
current_waiter != NULL):从 butex 等待队列中移除并唤醒 - bthread 正在 sleep(
current_sleep != 0):取消 TimerThread 定时器并重新入队
两者互斥:一个 bthread 不可能同时等待 butex 和处于 sleep 状态。
十三、总结
TaskGroup 的设计可以归纳为一句话:每个 worker pthread 一个 TaskGroup,本地队列 + 工作窃取管理就绪任务,sched_to 通过 jump_stack 完成上下文切换,RemainedFn 机制实现延迟回调。
五个关键设计:
1. 本地队列 + 远程队列 + 工作窃取。 本地队列(_rq)是无锁的 WorkStealingQueue,单生产者多消费者,push/pop 由 worker 自身操作,steal 由其他 worker 操作。远程队列(_remote_rq)有锁保护,供非 worker 线程提交任务。本地队列空时从其他 TaskGroup 窃取——质数步长减少偷取冲突。
2. RemainedFn 延迟回调。 在 sched_to 的 jump_stack 返回后执行回调。解决了一个核心问题:不能在当前栈上释放自己的栈。remained 机制将"释放旧栈"、"注册定时器"、"放回运行队列"等操作推迟到新栈上执行。
3. ending_sched 的栈复用。 bthread 结束时,如果下一个 bthread 的栈类型与当前相同,直接转移栈——避免了归还 ObjectPool 再重新获取的开销。栈复用是 bthread 高效调度的关键优化之一。
4. nosignal 批量唤醒。 N 个连续的 nosignal=true 入队操作只增加计数器,最后一个 nosignal=false 一次性唤醒所有需要的 worker。将 N 次 futex_wave 系统调用减少为 1 次。
5. sched_to 的完整上下文切换。 七步操作(保存 errno → CPU 统计 → 切换 _cur_meta → 切换 TLS → jump_stack → remained 回调 → 恢复 errno)确保每个 bthread 有独立的 errno、TLS 和 CPU 统计,同时支持跨 TaskGroup 迁移。
本文基于 Apache brpc 源码(src/bthread/task_group.h、src/bthread/task_group_inl.h、src/bthread/task_group.cpp)撰写。