bthread 底层实现深入分析
简介
bthread 是 Apache bRPC 框架中的 M:N 用户态线程库,设计目标是以极低的成本支撑百万级并发。它与 Go 的 goroutine 理念相似——将大量用户态任务映射到少量内核线程上执行,避免内核态上下文切换的开销。
bthread 的上下文切换参考了 boost.context 的设计,用手写汇编直接操作寄存器,切换耗时仅约 100-200 ns,比 pthread 切换快 10 倍以上。bRPC 的高并发能力很大程度上依赖 bthread 的高效调度。
整体架构
bthread 的 M:N 模型由三层结构组成:
bthread (百万级) → TaskGroup (每核一个) → pthread worker (少量)
用户态任务 两级队列 + 内核线程
(协程栈+上下文) Work Stealing (物理执行单元)
核心数据结构:
| 组件 | 生命周期 | 职责 |
|---|---|---|
| TaskControl (TC) | 进程全局唯一 | 管理所有 worker 线程,协调全局任务窃取 |
| TaskGroup (TG) | 每个 pthread 一个 | worker 调度单元,维护任务队列,管理 bthread 执行 |
| TaskMeta (TM) | 每个 bthread 一个 | 保存 bthread 的上下文:回调函数、栈、寄存器、状态等 |
上下文切换:汇编级实现
上下文切换是 bthread 性能的核心。bthread 用汇编直接操作寄存器,仅保存最少的必要状态,实现了纳秒级的协程切换。
协程上下文初始化:bthread_make_fcontext
make_fcontext 初始化一个新协程的栈空间,设置好入口函数和执行完毕后跳转的 finish 函数:
bthread_make_fcontext:
movq %rdi, %rax ; %rdi = 栈底地址
andq $-16, %rax ; 16 字节对齐(x86-64 ABI 要求)
leaq -0x48(%rax), %rax ; 预留 72 字节上下文空间
movq %rdx, 0x38(%rax) ; 偏移 0x38: 保存入口函数 fn 的地址
stmxcsr (%rax) ; 偏移 0x00: 保存 MXCSR 浮点状态
fnstcw 0x4(%rax) ; 偏移 0x04: 保存 FPU 控制字
leaq finish(%rip), %rcx ; 加载 finish 地址
movq %rcx, 0x40(%rax) ; 偏移 0x40: 设置返回地址为 finish
ret ; 返回初始化后的上下文指针
初始化后的栈布局(从高地址到低地址):
+-----------------+ 高地址(栈底)
| 未使用空间 |
+-----------------+
| finish 地址 | ← 0x40(%rax) — fn 返回后跳转到这里(调用 _exit)
+-----------------+
| 入口函数 fn | ← 0x38(%rax) — bthread 要执行的函数指针
+-----------------+
| FPU 控制字 | ← 0x04(%rax)
+-----------------+
| MXCSR 状态 | ← 0x00(%rax) — context 指针指向这里
+-----------------+ 低地址(栈顶)
关键在于:栈上预置了 finish 作为返回地址。当 fn 执行完毕执行 ret 指令时,会把 finish 的地址弹出到 %rip,从而自动跳转到 finish → _exit,安全结束协程。
上下文切换:bthread_jump_fcontext
切换的核心流程分为两步:保存当前协程的寄存器到栈上,然后从目标协程的栈上恢复寄存器并跳转。整个过程是一个对称操作:
bthread_jump_fcontext:
; ===== 第一步:保存当前协程的寄存器 =====
pushq %rbp ; 被调用者保存寄存器
pushq %rbx
pushq %r15
pushq %r14
pushq %r13
pushq %r12
leaq -0x8(%rsp), %rsp ; 预留 8 字节给浮点状态
cmp $0, %rcx ; preserve_fpu 参数
je 1f
stmxcsr (%rsp) ; 保存 MXCSR
fnstcw 0x4(%rsp) ; 保存 FPU 控制字
1:
movq %rsp, (%rdi) ; 将 %rsp 保存到 from->context
; ===== 第二步:切换到目标协程的栈 =====
movq %rsi, %rsp ; 将 to->context 加载到 %rsp(栈切换!)
cmp $0, %rcx ; 恢复浮点状态
je 2f
ldmxcsr (%rsp) ; 恢复 MXCSR
fldcw 0x4(%rsp) ; 恢复 FPU 控制字
2:
leaq 0x8(%rsp), %rsp
popq %r12 ; 恢复被调用者保存寄存器(逆序)
popq %r13
popq %r14
popq %r15
popq %rbx
popq %rbp
popq %r8 ; 弹出返回地址
movq %rdx, %rax ; 设置返回值
movq %rdx, %rdi ; 设置第一个参数
jmp *%r8 ; 跳转到目标协程!
两种切换场景
| 切换到新创建的 bthread | 切换到已运行过的 bthread | |
|---|---|---|
| %r8 弹出的内容 | fn 入口函数地址 | 上次切换时压入的下一条指令地址 |
| 执行效果 | 开始执行 fn;fn 返回后跳转到 finish | 从上次中断处继续执行 |
性能优化要点
- 只保存被调用者保存寄存器:%rbx, %rbp, %r12-%r15,调用者保存寄存器(%rax, %rcx, %rdx, %rsi, %rdi, %r8-%r11)由编译器自动管理,无需保存
- 浮点状态按需加载:通过
preserve_fpu参数控制是否保存/恢复浮点状态。大部分 bthread 任务不涉及浮点运算,跳过这步可以节省数十纳秒 - 16 字节栈对齐:符合 x86-64 ABI,保证 SIMD 指令正常执行
调度模型
两级任务队列
每个 TaskGroup 维护两个队列,设计上刻意区分了「来源」:
| 队列 | 类型 | 用途 |
|---|---|---|
_rq (WorkStealingQueue) |
无锁环形队列 | 本 worker 自身创建的 bthread |
_remote_rq (RemoteTaskQueue) |
带锁队列 | 非 worker 线程(主线程等)提交的 bthread |
关键设计:worker 不从自己的 _rq 取任务——自己的 _rq 只在 ending_sched() → task_runner() 的过程中被消费。wait_task() 只从 _remote_rq 取,或通过 Work Stealing 从其他 worker 的队列窃取。
这种设计避免了同一个 _rq 被多个 TaskGroup 同时访问产生锁竞争。
任务获取与 Work Stealing
Worker 在无任务可执行时,按以下优先级获取新任务:
wait_task()
├── 1. 本 TG._remote_rq.pop() ← 优先级最高,处理外部提交的任务
├── 2. _control.steal_task() ← 全局 Work Stealing
│ ├── 其他 TG._rq.steal() ← 从队尾窃取(无锁 CAS,缓存友好)
│ └── 其他 TG._remote_rq.pop() ← 窃取外部提交的任务
└── 3. _pl->wait() ← 所有尝试失败,休眠等待唤醒
Work Stealing 的核心逻辑是随机遍历,配合 offset 步进,避免每次从同一个 worker 窃取产生热点:
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
for (size_t i = 0; i < ngroup; ++i, *seed += offset) {
TaskGroup* g = _groups[*seed % ngroup];
if (g && g->_rq.steal(tid)) // 从队尾窃取,减少 CAS 冲突
return true;
if (g && g->_remote_rq.pop(tid))
return true;
}
return false;
}
窃取队尾而不是队头的原因:push 和 pop 都在队头操作,如果窃取也走队头,会产生频繁的 CAS 竞争。从队尾窃取与 push/pop 错开,降低冲突。
协作式调度
bthread 采用非抢占式调度,只在特定时机主动让出 CPU:
| 时机 | 行为 |
|---|---|
bthread_yield() / bthread_usleep() |
立即切换,挂起当前 bthread |
bthread_mutex_lock() 等待锁 |
在 butex 上挂起,让出 worker |
| I/O 等待(epoll 结合) | EventDispatcher 检测到事件未就绪时自动 yield |
阻塞系统调用(如 read()) |
整个 worker 被 OS 挂起,其他 worker 偷取该 worker 队列的任务 |
当 bthread 执行阻塞系统调用时,整个 pthread worker 都会被 OS 挂起。这时其他空闲的 worker 通过 Work Stealing 会偷走被阻塞 worker 的队列中的 bthread 继续执行,保证整体吞吐不受影响。
栈管理
每个 bthread 需要独立的栈空间。bthread 通过 StackPool 和 mmap 实现高效的栈管理。
StackPool:栈空间复用
bthread 分配栈时不直接调用 malloc,而是优先从对象池中复用已分配的栈,避免频繁内存分配:
// 全局栈池,按栈大小分级管理
void* StackPool::alloc() {
if (!cached_stacks.empty())
return cached_stacks.pop_back(); // 复用已有栈
return mmap_for_stack(STACK_SIZE); // 首次分配
}
void StackPool::free(void* stack) {
cached_stacks.push_back(stack); // 归还池中,下次复用
}
Guard Page:栈溢出保护
每个 bthread 的栈通过 mmap 匿名映射分配,两端设置保护页,栈溢出时立即触发 SIGSEGV,便于定位问题:
void* alloc_stack(size_t stacksize, size_t guardsize) {
size_t pagesize = getpagesize();
size_t alloc_size = stacksize + 2 * guardsize * pagesize;
void* base = mmap(NULL, alloc_size, PROT_READ|PROT_WRITE,
MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);
// 两端设置为不可读写
mprotect(base, guardsize * pagesize, PROT_NONE); // 低端保护页
mprotect((char*)base + alloc_size - guardsize * pagesize,
guardsize * pagesize, PROT_NONE); // 高端保护页
return (char*)base + alloc_size; // 返回栈底(高地址)
}
栈底在高地址,栈向低地址增长。一旦越界触碰保护页,立即 core dump,不会静默地破坏相邻内存。
栈大小分级
bthread 通常提供小/中/大三种栈规格,用户可以根据 bthread 的实际需要选择,减少内存浪费。对于只做简单转发的 bthread,几 KB 的栈就已足够,无需默认分配数百 KB。
同步原语:Butex
为什么需要 Butex
在 bthread 中如果直接用 pthread_mutex,锁等待时被挂起的是整个 pthread worker,导致该 worker 无法调度其他 bthread,严重影响并发能力。Butex 的设计目标是:在 bthread 等待时只挂起这个 bthread,让 pthread worker 继续执行其他 bthread。
Butex 的设计借鉴了 Linux 内核的 futex 机制:
pthread : futex = bthread : butex
Butex 核心结构
struct Butex {
butil::atomic<int> value; // 用户态原子变量(锁状态)
ButexWaiterList waiters; // 等待者链表(侵入式双向链表)
FastPthreadMutex waiter_lock; // 保护等待队列的 pthread 锁
};
struct ButexBthreadWaiter : public ButexWaiter {
TaskMeta* task_meta; // 当前 bthread 的 TaskMeta
TimerThread::TaskId sleep_id; // 超时定时任务 ID
WaiterState waiter_state; // 等待状态
int expected_value; // 挂起前期望的 value 值
TaskControl* control; // 全局 TaskControl
};
butex_wait:挂起当前 bthread
int butex_wait(void* arg, int expected_value, const timespec* abstime) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
// 快速检查:value 不等于期望值,直接返回(已被修改,不需等待)
if (b->value.load(memory_order_relaxed) != expected_value) {
return -1; // EWOULDBLOCK
}
TaskGroup* g = tls_task_group;
if (g == NULL || g->is_current_pthread_task()) {
// 当前是 pthread(非 worker 线程),走 pthread 挂起逻辑
return butex_wait_from_pthread(g, b, expected_value, abstime);
}
// 当前是 bthread:在 bthread 栈上创建 waiter
ButexBthreadWaiter bbw;
bbw.tid = g->current_tid();
bbw.expected_value = expected_value;
// 设置超时
if (abstime != NULL)
bbw.sleep_id = get_global_timer_thread()->schedule(...);
// 注册 remain 回调:在 worker 切换前原子地完成「检查 + 入队」
g->set_remained(wait_for_butex, &bbw);
// 让出 CPU,切换到其他 bthread
g->yield(&bbw.task_meta->tid);
// 被唤醒后从这里继续执行
unsleep_if_necessary(&bbw, get_global_timer_thread());
return 0;
}
防止信号丢失
Butex 实现中最关键的细节是防止唤醒信号丢失。如果「检查 value」和「将 waiter 入队」分别在两个临界区内完成,中间存在窗口:另一个线程可能在检查之后、入队之前调用 butex_wake,导致唤醒信号丢失。
Butex 的解决方案是 wait_for_butex——在持有 waiter_lock 的临界区内原子地完成检查和入队:
void wait_for_butex(void* arg) {
ButexBthreadWaiter* bw = static_cast<ButexBthreadWaiter*>(arg);
std::lock_guard<FastPthreadMutex> guard(bw->initial_butex->waiter_lock);
if (bw->initial_butex->value.load() != bw->expected_value) {
// value 已变化!不挂起,重新加入就绪队列
bw->control->ready_to_run(bw->task_meta);
} else {
// value 未变,安全地将 waiter 加入等待队列
bw->container.store(bw->initial_butex);
bw->initial_butex->waiters.push_back(bw);
}
}
wait_for_butex 作为 remain 回调,在 worker 切换到下一个 bthread 之前执行。它持有 waiter_lock,保证与 butex_wake 的互斥,杜绝唤醒丢失。
butex_wake:唤醒等待者
int butex_wake(void* arg) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
ButexWaiter* front = NULL;
{
std::lock_guard<FastPthreadMutex> guard(b->waiter_lock);
if (!b->waiters.empty()) {
front = b->waiters.pop_front();
front->container.store(NULL);
}
}
if (front == NULL) return 0;
if (front->tid == 0) {
// 唤醒的是 pthread
wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
} else {
// 唤醒的是 bthread:加入就绪队列
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
unsleep_if_necessary(bbw, get_global_timer_thread());
// 调用者是 worker 线程:直接放入本地队列(nosignal=true,避免通知已有 worker 自身)
// 调用者是非 worker:放入 remote_rq
if (current_worker)
current_worker->ready_to_run(bbw->task_meta, true);
else
bbw->control->ready_to_run_remote(bbw->task_meta);
}
return 1;
}
bthread::Mutex 与 ConditionVariable
bthread::Mutex 和 bthread::ConditionVariable 是 Butex 的上层封装,接口与 std::mutex / std::condition_variable 兼容,但挂起的是 bthread 而非 pthread:
class Mutex {
butil::atomic<int> _butex; // 底层基于 butex value
void lock() {
// 快速路径:CAS 获取锁
if (_butex.exchange(1, memory_order_acquire) == 0)
return;
// 慢速路径:在 butex 上等待
butex_wait(&_butex, 1, NULL);
}
void unlock() {
_butex.store(0, memory_order_release);
butex_wake(&_butex); // 唤醒一个等待者
}
};
整体同步体系的分层结构:
bthread::Mutex / bthread::ConditionVariable ← 对外接口
butex (butex_wait / butex_wake / wake_all) ← bthread 粒度挂起/唤醒
FastPthreadMutex (futex_wait / futex_wake) ← pthread 粒度(保护 waiter 队列)
Linux futex 系统调用 ← 内核态(竞争激烈时陷入)
关键要点
1. 上下文切换是 bthread 的性能基石
仅保存 6 个被调用者保存寄存器 + 可选的浮点状态,浮点状态按需加载,切换耗时 100-200ns。这是比 pthread(1-5μs)快 10 倍以上的根本原因。
2. Work Stealing 从队尾窃取
push/pop 在队头,窃取在队尾——错开操作位置,减少 CAS 竞争,同时提升了 CPU 缓存命中率。
3. 防止 Butex 信号丢失
「检查 value + 入队」必须在同一临界区内原子完成。Butex 通过 remain 回调机制在持有 waiter_lock 时执行这两个操作,与 butex_wake 互斥,保证不会丢失唤醒。
4. 阻塞系统调用会拖垮整个 worker
bthread 是协作式调度,如果 bthread 中执行了阻塞的系统调用(如同步 read()),整个 pthread worker 会被 OS 挂起。但其他 worker 会通过 Work Stealing 偷走被阻塞 worker 的任务,不会造成全局阻塞。最佳实践是:在 bthread 中始终使用异步 I/O,避免直接调用阻塞系统调用。
5. 栈溢出有 Guard Page 兜底
bthread 栈通过 mmap + mprotect 设置保护页,栈溢出时立即触发 SIGSEGV 产生 core dump,不会静默破坏相邻内存。定位问题时可以直接从 core 文件看到溢出位置。
总结
bthread 通过手写汇编实现寄存器级的上下文切换(~100ns),结合 M:N 调度和 Work Stealing 负载均衡,在用户态支撑百万级并发任务。Butex 提供了 bthread 粒度的同步原语,确保锁等待只挂起 bthread 而非整个 pthread。这些设计共同构成了 bRPC 高性能的基石。
参考
- bRPC 源码
- bRPC v1.7 中文文档 — bthread 机制
- boost.context 文档