bthread 底层实现深入分析

简介

bthread 是 Apache bRPC 框架中的 M:N 用户态线程库,设计目标是以极低的成本支撑百万级并发。它与 Go 的 goroutine 理念相似——将大量用户态任务映射到少量内核线程上执行,避免内核态上下文切换的开销。

bthread 的上下文切换参考了 boost.context 的设计,用手写汇编直接操作寄存器,切换耗时仅约 100-200 ns,比 pthread 切换快 10 倍以上。bRPC 的高并发能力很大程度上依赖 bthread 的高效调度。

整体架构

bthread 的 M:N 模型由三层结构组成:

bthread (百万级)  →  TaskGroup (每核一个)  →  pthread worker (少量)
   用户态任务              两级队列 +            内核线程
   (协程栈+上下文)          Work Stealing         (物理执行单元)

核心数据结构:

组件 生命周期 职责
TaskControl (TC) 进程全局唯一 管理所有 worker 线程,协调全局任务窃取
TaskGroup (TG) 每个 pthread 一个 worker 调度单元,维护任务队列,管理 bthread 执行
TaskMeta (TM) 每个 bthread 一个 保存 bthread 的上下文:回调函数、栈、寄存器、状态等

上下文切换:汇编级实现

上下文切换是 bthread 性能的核心。bthread 用汇编直接操作寄存器,仅保存最少的必要状态,实现了纳秒级的协程切换。

协程上下文初始化:bthread_make_fcontext

make_fcontext 初始化一个新协程的栈空间,设置好入口函数和执行完毕后跳转的 finish 函数:

bthread_make_fcontext:
    movq  %rdi, %rax        ; %rdi = 栈底地址
    andq  $-16, %rax         ; 16 字节对齐(x86-64 ABI 要求)
    leaq  -0x48(%rax), %rax  ; 预留 72 字节上下文空间
    movq  %rdx, 0x38(%rax)   ; 偏移 0x38: 保存入口函数 fn 的地址
    stmxcsr (%rax)           ; 偏移 0x00: 保存 MXCSR 浮点状态
    fnstcw  0x4(%rax)        ; 偏移 0x04: 保存 FPU 控制字
    leaq  finish(%rip), %rcx   ; 加载 finish 地址
    movq  %rcx, 0x40(%rax)   ; 偏移 0x40: 设置返回地址为 finish
    ret                      ; 返回初始化后的上下文指针

初始化后的栈布局(从高地址到低地址):

+-----------------+  高地址(栈底)
|   未使用空间      |
+-----------------+
|   finish 地址    |  ← 0x40(%rax)  — fn 返回后跳转到这里(调用 _exit)
+-----------------+
|   入口函数 fn    |  ← 0x38(%rax)  — bthread 要执行的函数指针
+-----------------+
|   FPU 控制字     |  ← 0x04(%rax)
+-----------------+
|   MXCSR 状态     |  ← 0x00(%rax)  — context 指针指向这里
+-----------------+  低地址(栈顶)

关键在于:栈上预置了 finish 作为返回地址。当 fn 执行完毕执行 ret 指令时,会把 finish 的地址弹出到 %rip,从而自动跳转到 finish_exit,安全结束协程。

上下文切换:bthread_jump_fcontext

切换的核心流程分为两步:保存当前协程的寄存器到栈上,然后从目标协程的栈上恢复寄存器并跳转。整个过程是一个对称操作:

bthread_jump_fcontext:
    ; ===== 第一步:保存当前协程的寄存器 =====
    pushq  %rbp              ; 被调用者保存寄存器
    pushq  %rbx
    pushq  %r15
    pushq  %r14
    pushq  %r13
    pushq  %r12

    leaq  -0x8(%rsp), %rsp   ; 预留 8 字节给浮点状态

    cmp   $0, %rcx           ; preserve_fpu 参数
    je    1f
    stmxcsr  (%rsp)          ; 保存 MXCSR
    fnstcw   0x4(%rsp)       ; 保存 FPU 控制字
1:
    movq  %rsp, (%rdi)       ; 将 %rsp 保存到 from->context

    ; ===== 第二步:切换到目标协程的栈 =====
    movq  %rsi, %rsp         ; 将 to->context 加载到 %rsp(栈切换!)

    cmp   $0, %rcx           ; 恢复浮点状态
    je    2f
    ldmxcsr  (%rsp)          ; 恢复 MXCSR
    fldcw   0x4(%rsp)        ; 恢复 FPU 控制字
2:
    leaq  0x8(%rsp), %rsp

    popq  %r12               ; 恢复被调用者保存寄存器(逆序)
    popq  %r13
    popq  %r14
    popq  %r15
    popq  %rbx
    popq  %rbp

    popq  %r8                ; 弹出返回地址
    movq  %rdx, %rax         ; 设置返回值
    movq  %rdx, %rdi         ; 设置第一个参数
    jmp   *%r8               ; 跳转到目标协程!

两种切换场景

切换到新创建的 bthread 切换到已运行过的 bthread
%r8 弹出的内容 fn 入口函数地址 上次切换时压入的下一条指令地址
执行效果 开始执行 fn;fn 返回后跳转到 finish 从上次中断处继续执行

性能优化要点

  1. 只保存被调用者保存寄存器:%rbx, %rbp, %r12-%r15,调用者保存寄存器(%rax, %rcx, %rdx, %rsi, %rdi, %r8-%r11)由编译器自动管理,无需保存
  2. 浮点状态按需加载:通过 preserve_fpu 参数控制是否保存/恢复浮点状态。大部分 bthread 任务不涉及浮点运算,跳过这步可以节省数十纳秒
  3. 16 字节栈对齐:符合 x86-64 ABI,保证 SIMD 指令正常执行

调度模型

两级任务队列

每个 TaskGroup 维护两个队列,设计上刻意区分了「来源」:

队列 类型 用途
_rq (WorkStealingQueue) 无锁环形队列 本 worker 自身创建的 bthread
_remote_rq (RemoteTaskQueue) 带锁队列 非 worker 线程(主线程等)提交的 bthread

关键设计:worker 不从自己的 _rq 取任务——自己的 _rq 只在 ending_sched()task_runner() 的过程中被消费。wait_task() 只从 _remote_rq 取,或通过 Work Stealing 从其他 worker 的队列窃取。

这种设计避免了同一个 _rq 被多个 TaskGroup 同时访问产生锁竞争。

任务获取与 Work Stealing

Worker 在无任务可执行时,按以下优先级获取新任务:

wait_task()
 ├── 1. 本 TG._remote_rq.pop()       ← 优先级最高,处理外部提交的任务
 ├── 2. _control.steal_task()        ← 全局 Work Stealing
 │    ├── 其他 TG._rq.steal()        ← 从队尾窃取(无锁 CAS,缓存友好)
 │    └── 其他 TG._remote_rq.pop()   ← 窃取外部提交的任务
 └── 3. _pl->wait()                  ← 所有尝试失败,休眠等待唤醒

Work Stealing 的核心逻辑是随机遍历,配合 offset 步进,避免每次从同一个 worker 窃取产生热点:

bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
    for (size_t i = 0; i < ngroup; ++i, *seed += offset) {
        TaskGroup* g = _groups[*seed % ngroup];
        if (g && g->_rq.steal(tid))   // 从队尾窃取,减少 CAS 冲突
            return true;
        if (g && g->_remote_rq.pop(tid))
            return true;
    }
    return false;
}

窃取队尾而不是队头的原因:push 和 pop 都在队头操作,如果窃取也走队头,会产生频繁的 CAS 竞争。从队尾窃取与 push/pop 错开,降低冲突。

协作式调度

bthread 采用非抢占式调度,只在特定时机主动让出 CPU:

时机 行为
bthread_yield() / bthread_usleep() 立即切换,挂起当前 bthread
bthread_mutex_lock() 等待锁 在 butex 上挂起,让出 worker
I/O 等待(epoll 结合) EventDispatcher 检测到事件未就绪时自动 yield
阻塞系统调用(如 read() 整个 worker 被 OS 挂起,其他 worker 偷取该 worker 队列的任务

当 bthread 执行阻塞系统调用时,整个 pthread worker 都会被 OS 挂起。这时其他空闲的 worker 通过 Work Stealing 会偷走被阻塞 worker 的队列中的 bthread 继续执行,保证整体吞吐不受影响。

栈管理

每个 bthread 需要独立的栈空间。bthread 通过 StackPool 和 mmap 实现高效的栈管理。

StackPool:栈空间复用

bthread 分配栈时不直接调用 malloc,而是优先从对象池中复用已分配的栈,避免频繁内存分配:

// 全局栈池,按栈大小分级管理
void* StackPool::alloc() {
    if (!cached_stacks.empty())
        return cached_stacks.pop_back();  // 复用已有栈
    return mmap_for_stack(STACK_SIZE);    // 首次分配
}

void StackPool::free(void* stack) {
    cached_stacks.push_back(stack);       // 归还池中,下次复用
}

Guard Page:栈溢出保护

每个 bthread 的栈通过 mmap 匿名映射分配,两端设置保护页,栈溢出时立即触发 SIGSEGV,便于定位问题:

void* alloc_stack(size_t stacksize, size_t guardsize) {
    size_t pagesize = getpagesize();
    size_t alloc_size = stacksize + 2 * guardsize * pagesize;

    void* base = mmap(NULL, alloc_size, PROT_READ|PROT_WRITE,
                      MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);

    // 两端设置为不可读写
    mprotect(base, guardsize * pagesize, PROT_NONE);                    // 低端保护页
    mprotect((char*)base + alloc_size - guardsize * pagesize,
             guardsize * pagesize, PROT_NONE);                          // 高端保护页

    return (char*)base + alloc_size;  // 返回栈底(高地址)
}

栈底在高地址,栈向低地址增长。一旦越界触碰保护页,立即 core dump,不会静默地破坏相邻内存。

栈大小分级

bthread 通常提供小/中/大三种栈规格,用户可以根据 bthread 的实际需要选择,减少内存浪费。对于只做简单转发的 bthread,几 KB 的栈就已足够,无需默认分配数百 KB。

同步原语:Butex

为什么需要 Butex

在 bthread 中如果直接用 pthread_mutex,锁等待时被挂起的是整个 pthread worker,导致该 worker 无法调度其他 bthread,严重影响并发能力。Butex 的设计目标是:在 bthread 等待时只挂起这个 bthread,让 pthread worker 继续执行其他 bthread

Butex 的设计借鉴了 Linux 内核的 futex 机制:

pthread : futex = bthread : butex

Butex 核心结构

struct Butex {
    butil::atomic<int> value;         // 用户态原子变量(锁状态)
    ButexWaiterList waiters;          // 等待者链表(侵入式双向链表)
    FastPthreadMutex waiter_lock;     // 保护等待队列的 pthread 锁
};

struct ButexBthreadWaiter : public ButexWaiter {
    TaskMeta* task_meta;              // 当前 bthread 的 TaskMeta
    TimerThread::TaskId sleep_id;     // 超时定时任务 ID
    WaiterState waiter_state;         // 等待状态
    int expected_value;               // 挂起前期望的 value 值
    TaskControl* control;             // 全局 TaskControl
};

butex_wait:挂起当前 bthread

int butex_wait(void* arg, int expected_value, const timespec* abstime) {
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);

    // 快速检查:value 不等于期望值,直接返回(已被修改,不需等待)
    if (b->value.load(memory_order_relaxed) != expected_value) {
        return -1;  // EWOULDBLOCK
    }

    TaskGroup* g = tls_task_group;
    if (g == NULL || g->is_current_pthread_task()) {
        // 当前是 pthread(非 worker 线程),走 pthread 挂起逻辑
        return butex_wait_from_pthread(g, b, expected_value, abstime);
    }

    // 当前是 bthread:在 bthread 栈上创建 waiter
    ButexBthreadWaiter bbw;
    bbw.tid = g->current_tid();
    bbw.expected_value = expected_value;

    // 设置超时
    if (abstime != NULL)
        bbw.sleep_id = get_global_timer_thread()->schedule(...);

    // 注册 remain 回调:在 worker 切换前原子地完成「检查 + 入队」
    g->set_remained(wait_for_butex, &bbw);

    // 让出 CPU,切换到其他 bthread
    g->yield(&bbw.task_meta->tid);

    // 被唤醒后从这里继续执行
    unsleep_if_necessary(&bbw, get_global_timer_thread());
    return 0;
}

防止信号丢失

Butex 实现中最关键的细节是防止唤醒信号丢失。如果「检查 value」和「将 waiter 入队」分别在两个临界区内完成,中间存在窗口:另一个线程可能在检查之后、入队之前调用 butex_wake,导致唤醒信号丢失。

Butex 的解决方案是 wait_for_butex——在持有 waiter_lock 的临界区内原子地完成检查和入队:

void wait_for_butex(void* arg) {
    ButexBthreadWaiter* bw = static_cast<ButexBthreadWaiter*>(arg);

    std::lock_guard<FastPthreadMutex> guard(bw->initial_butex->waiter_lock);

    if (bw->initial_butex->value.load() != bw->expected_value) {
        // value 已变化!不挂起,重新加入就绪队列
        bw->control->ready_to_run(bw->task_meta);
    } else {
        // value 未变,安全地将 waiter 加入等待队列
        bw->container.store(bw->initial_butex);
        bw->initial_butex->waiters.push_back(bw);
    }
}

wait_for_butex 作为 remain 回调,在 worker 切换到下一个 bthread 之前执行。它持有 waiter_lock,保证与 butex_wake 的互斥,杜绝唤醒丢失。

butex_wake:唤醒等待者

int butex_wake(void* arg) {
    Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
    ButexWaiter* front = NULL;
    {
        std::lock_guard<FastPthreadMutex> guard(b->waiter_lock);
        if (!b->waiters.empty()) {
            front = b->waiters.pop_front();
            front->container.store(NULL);
        }
    }

    if (front == NULL) return 0;

    if (front->tid == 0) {
        // 唤醒的是 pthread
        wakeup_pthread(static_cast<ButexPthreadWaiter*>(front));
    } else {
        // 唤醒的是 bthread:加入就绪队列
        ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
        unsleep_if_necessary(bbw, get_global_timer_thread());

        // 调用者是 worker 线程:直接放入本地队列(nosignal=true,避免通知已有 worker 自身)
        // 调用者是非 worker:放入 remote_rq
        if (current_worker)
            current_worker->ready_to_run(bbw->task_meta, true);
        else
            bbw->control->ready_to_run_remote(bbw->task_meta);
    }
    return 1;
}

bthread::Mutex 与 ConditionVariable

bthread::Mutexbthread::ConditionVariable 是 Butex 的上层封装,接口与 std::mutex / std::condition_variable 兼容,但挂起的是 bthread 而非 pthread:

class Mutex {
    butil::atomic<int> _butex;  // 底层基于 butex value

    void lock() {
        // 快速路径:CAS 获取锁
        if (_butex.exchange(1, memory_order_acquire) == 0)
            return;
        // 慢速路径:在 butex 上等待
        butex_wait(&_butex, 1, NULL);
    }

    void unlock() {
        _butex.store(0, memory_order_release);
        butex_wake(&_butex);  // 唤醒一个等待者
    }
};

整体同步体系的分层结构:

bthread::Mutex / bthread::ConditionVariable    ← 对外接口
butex (butex_wait / butex_wake / wake_all)    ← bthread 粒度挂起/唤醒
FastPthreadMutex (futex_wait / futex_wake)    ← pthread 粒度(保护 waiter 队列)
Linux futex 系统调用                            ← 内核态(竞争激烈时陷入)

关键要点

1. 上下文切换是 bthread 的性能基石

仅保存 6 个被调用者保存寄存器 + 可选的浮点状态,浮点状态按需加载,切换耗时 100-200ns。这是比 pthread(1-5μs)快 10 倍以上的根本原因。

2. Work Stealing 从队尾窃取

push/pop 在队头,窃取在队尾——错开操作位置,减少 CAS 竞争,同时提升了 CPU 缓存命中率。

3. 防止 Butex 信号丢失

「检查 value + 入队」必须在同一临界区内原子完成。Butex 通过 remain 回调机制在持有 waiter_lock 时执行这两个操作,与 butex_wake 互斥,保证不会丢失唤醒。

4. 阻塞系统调用会拖垮整个 worker

bthread 是协作式调度,如果 bthread 中执行了阻塞的系统调用(如同步 read()),整个 pthread worker 会被 OS 挂起。但其他 worker 会通过 Work Stealing 偷走被阻塞 worker 的任务,不会造成全局阻塞。最佳实践是:在 bthread 中始终使用异步 I/O,避免直接调用阻塞系统调用。

5. 栈溢出有 Guard Page 兜底

bthread 栈通过 mmap + mprotect 设置保护页,栈溢出时立即触发 SIGSEGV 产生 core dump,不会静默破坏相邻内存。定位问题时可以直接从 core 文件看到溢出位置。

总结

bthread 通过手写汇编实现寄存器级的上下文切换(~100ns),结合 M:N 调度和 Work Stealing 负载均衡,在用户态支撑百万级并发任务。Butex 提供了 bthread 粒度的同步原语,确保锁等待只挂起 bthread 而非整个 pthread。这些设计共同构成了 bRPC 高性能的基石。

参考

  • bRPC 源码
  • bRPC v1.7 中文文档 — bthread 机制
  • boost.context 文档