advanced-usage

高级主题深度解析

本文档对 NV Embedding Cache SDK 中的高级主题进行深度分析,涵盖执行上下文、缓存管理、插入启发式策略、多设备部署、线程池与分配器、AOTInductor C++ 部署等内容。每个主题先介绍概念,再结合源码分析实现细节。


一、执行上下文(ExecutionContext)

概念

执行上下文(ExecutionContext)是 NVE 中管理 GPU 操作所需状态和资源的容器。每个上下文代表一个可重用的并行执行环境——持有独立的 CUDA 流对(lookup stream + modify stream)、临时缓冲区、线程池和分配器引用。应用应创建固定数量的上下文,在初始化时完成分配,然后在运行时以轮询等方式循环使用。

类层次

ExecutionContext  (基类, include/execution_context.hpp)
  ├── GPUEmbeddingTableExecutionContext  (GPUEmbeddingLayer 专用)
  ├── GPUTableExecutionContext           (GpuTable 专用)
  └── HostTableExecutionContext           (HostTable 专用)

核心成员

class ExecutionContext {
protected:
    cudaStream_t lookup_stream_;     // 查找操作流
    cudaStream_t modify_stream_;     // 修改操作流
    thread_pool_ptr_t thread_pool_;  // CPU 线程池引用
    allocator_ptr_t allocator_;      // 内存分配器引用
    std::unordered_map<std::string, std::shared_ptr<ResizeableBuffer>> buffer_storage_;
    // 按名称索引的临时缓冲区池
    std::unordered_map<std::string, std::vector<cudaStream_t>> aux_streams_storage_;
    // 辅助 CUDA 流
};

关键实现细节

get_buffer() — 懒分配临时缓冲区

void* ExecutionContext::get_buffer(const std::string& name, size_t size, bool host_alloc) {
    auto internal_name = internal_name(name, host_alloc);
    auto it = buffer_storage_.find(internal_name);
    if (it == buffer_storage_.end()) {
        // 按需分配,下次同名的 get_buffer 直接复用
        auto buf = std::make_shared<ResizeableBuffer>(size, host_alloc, allocator_);
        buffer_storage_.emplace(internal_name, buf);
        return buf->data();
    }
    // 已存在则确保容量足够
    it->second->reserve(size);
    return it->second->data();
}

ResizeableBuffer 按需增长,但不会自动缩小。因此首次用到某个大尺寸操作时会触发分配,后续同尺寸操作复用已分配内存。

get_aux_streams() — 辅助流

std::vector<cudaStream_t> ExecutionContext::get_aux_streams(
    const std::string& name, size_t num_streams) {
    auto it = aux_streams_storage_.find(name);
    if (it == aux_streams_storage_.end()) {
        std::vector<cudaStream_t> streams(num_streams);
        for (auto& s : streams) cudaStreamCreate(&s);
        aux_streams_storage_.emplace(name, std::move(streams));
    }
    return it->second;
}

wait() — 同步全部工作

virtual void wait() {
    cudaStreamSynchronize(lookup_stream_);
    cudaStreamSynchronize(modify_stream_);
    for (auto& kv : aux_streams_storage_) {
        for (auto& stream : kv.second) {
            cudaStreamSynchronize(stream);
        }
    }
}

使用模式

// 初始化阶段创建固定数量的上下文
std::vector<context_ptr_t> contexts;
for (int i = 0; i < num_threads; i++) {
    cudaStream_t ls, ms;
    cudaStreamCreate(&ls);
    cudaStreamCreate(&ms);
    contexts.push_back(layer->create_execution_context(ls, ms, thread_pool, allocator));
}

// 运行时循环使用
int ctx_idx = 0;
for (auto& batch : batches) {
    auto ctx = contexts[ctx_idx++ % contexts.size()];
    layer->lookup(ctx, batch.num_keys, batch.keys, batch.output, ...);
}

// 销毁前必须先释放所有上下文
contexts.clear();
layer.reset();

二、缓存管理

Modify 操作与 ModifyContext

所有改变缓存内容或驻留状态的操作(insert / update / update_accumulate / erase / clear)统称为 Modify 操作。每个 Modify 操作需要一个 ModifyContext——由 EmbedCacheSAmodify_context_create() 时分配,持有批量操作所需的临时缓冲区。

约束:GPU 端不支持多个 Modify 操作同时进行。用户必须确保 Modify 操作的串行化(例如通过单个 CUDA 流启动所有 Modify 操作)。NVE 的 Layer 层在内部通过 kernel_launch_mutex_ + modify_in_progress_ event + private_modify_stream_ 实现了这个约束。

Invalidate and Commit 范式

为了支持 Lookup 和 Modify 操作并行执行,NVE 采用”失效并提交”范式:

阶段 1 — Invalidate(失效):
  Modify 操作首先启动 CUDA kernel 将相关 cache 条目标记为"失效中"
  → 后续 Lookup 对这些条目的访问会 miss(转到后备表读取)
  ↓
阶段 2 — Wait(等待):
  等待所有正在进行中的 Lookup 操作完成
  → 通过 cudaEvent + cudaStreamWaitEvent 实现异步等待
  → 通过 ContextRegistry 收集所有活跃的 lookup stream
  ↓
阶段 3 — Commit(提交):
  安全地修改缓存内容(写新数据、更新 tag)
  重新启用失效的条目
  → 后续 Lookup 可以命中新写入的数据

Custom Flow API

对于使用多个 CUDA kernel 实现自定义查找流程的用户,Invalidate and Commit 需要额外的协调。如果用户使用超过一个 kernel 的流水线实现查找,需要调用 start_custom_flow() / end_custom_flow() API 来维持所需的原子性视图。

流同步细节

GPUTable 中 update/accumulate 操作的同步序列:

// Step 1: 等待所有进行中的查找完成
sync = contexts_->create_sync_event();
sync->event_record();
sync->event_wait_stream(update_stream);
// 此时所有 lookup stream 上的 work 已完成

// Step 2: 执行修改
UpdateTable<KeyType>(... , update_stream);

// Step 3: 标记完成
cudaEventRecord(uvm_update_event, update_stream);
cudaStreamWaitEvent(modify_stream, uvm_update_event);

三、插入启发式策略(InsertHeuristic)

接口

// include/insert_heuristic.hpp
class InsertHeuristic {
public:
    virtual ~InsertHeuristic() = default;
    virtual bool insert_needed(const float hitrate, const size_t table_id) = 0;
};

hitrate 是当前查找操作的命中率(范围 0.0 ~ 1.0),table_id 是层级中表的索引。返回值决定是否触发一次自动插入。

DefaultInsertHeuristic

默认插入策略,使用阈值比较

class DefaultInsertHeuristic : public InsertHeuristic {
public:
    static constexpr float DEFAULT_THRESHOLD = 0.75f;

    DefaultInsertHeuristic(const std::vector<float> thresholds) : thresholds_(thresholds) {}

    bool insert_needed(const float hitrate, const size_t table_id) override {
        if (hitrate >= thresholds_.at(table_id)) {
            return false;  // 命中率足够高,不需要插入
        }
        // 加入随机抖动,避免多表同步触发插入
        return dis_(gen_) < (thresholds_.at(table_id) - hitrate);
    }
};

关键细节:

  • 随机抖动dis_(gen_) < (threshold - hitrate) 使得插入概率正比于命中率缺口。命中率远低于阈值时几乎必然触发插入,略低于阈值时概率较低。这避免了所有表同时触发插入导致的性能毛刺
  • 层级默认阈值:对于 HierarchicalEmbeddingLayer,最后一张 host 表的阈值设为 0.0(永不触发自动插入,因为该层已有全量数据)

NeverInsertHeuristic

class NeverInsertHeuristic : public InsertHeuristic {
public:
    bool insert_needed(const float, const size_t) override { return false; }
};

完全禁用自动插入。插入操作仅通过用户显式调用 layer->insert() 发生。

FSMInsertHeuristic

有限状态机驱动的插入策略,维护每张表的 prev_hitrate_state_

States: Start → Steady

逻辑(从命名推断):
  Start: 初始状态,连续低命中率时触发插入
  Steady: 稳定状态,根据命中率变化趋势决定是否插入

目前标注为 // TODO: TRTREC-88,可能尚未完成。

StatisticalInsertHeuristic

基于统计概率的插入策略,使用 Chebyshev 不等式判断命中率是否显著偏离期望:

class StatisticalInsertHeuristic : public InsertHeuristic {
    // 维护每个表的命中率滑动窗口(默认 14 个样本)
    std::vector<std::deque<float>> hitrate_window_;
    std::vector<float> hitrate_mean_;
    std::vector<float> hitrate_var_;

    // 使用切比雪夫不等式计算概率
    float getChebyshevProb(const size_t table_id);

    // 状态机: Start → Insert → Steady → Unstable
    enum class State { Start, Insert, Steady, Unstable };
};

工作机制

  1. 在每个查找操作后更新命中率的滑动窗口均值 mean 和方差 var
  2. 使用切比雪夫不等式计算当前命中率偏离均值的概率上限:P(|X - μ| ≥ kσ) ≤ 1/k²
  3. 如果概率超过阈值且偏离方向为负(命中率下降),触发插入
  4. 状态机管理插入后的稳定期,避免频繁触发

配置参数:

参数 默认值 说明
k_factor 2.6 标准差倍数因子。越大越不敏感,越小越容易触发插入
window_size 14 滑动窗口大小,用于计算均值和方差
num_inserts_needed 50 需要连续插入多少次才认为状态稳定
max_unsteady_samples 3 允许的最大连续不稳定样本数

四、多设备与分布式支持

多 GPU 共享线性表

当嵌入表跨多块 GPU 时,有两种方案:

方案 1:UVM 共享表(单节点多 GPU)

使用 cudaMallocManaged 分配 UVM 内存,通过 cudaMemAdvise 优化访问:

void* linear_table = nullptr;
cudaMallocManaged(&linear_table, linear_table_size);

// 建议 GPU 0 频繁访问此区域
cudaMemAdvise(linear_table, linear_table_size,
              cudaMemAdviseSetAccessedBy, 0);
// 将首选位置设为 CPU,保证所有 GPU 公平访问
cudaMemAdvise(linear_table, linear_table_size,
              cudaMemAdviseSetPreferredLocation, cudaCpuDeviceId);

每个 GPU 创建独立的 GpuTable,共享同一个 uvm_table 指针:

auto gpu_tab1 = std::make_shared<GpuTable<int64_t>>(cfg);  // device 0
cfg.device_id = 1;
auto gpu_tab2 = std::make_shared<GpuTable<int64_t>>(cfg);  // device 1

// 独立 GPU cache,共享 UVM 后备表

UVM 更新冲突通过 disable_uvm_update 控制——只需一个 GpuTable 负责 UVM 写入,其余只读。

CUDADistributedBuffer(多节点)

CUDADistributedBufferinclude/distributed.hpp)在多节点场景下创建跨设备共享的 CUDA 虚拟地址映射缓冲区:

class CUDADistributedBuffer {
    std::byte* ptr();          // 本地映射的缓冲区指针
    uint64_t shard_size();     // 每个分片的大小
    uint64_t num_shards();     // 总分片数 = world_size
};

初始化流程

init_single_host()  — 单节点多 GPU:
  1. 每个 GPU 分配物理内存
  2. 通过 CUDA Virtual Memory Management API
     (cuMemCreate → cuMemMap → cuMemSetAccess)
     将所有分片映射到所有 GPU 的虚拟地址空间
  3. 每个 rank 通过 all_gather 交换分配句柄

init_multi_host()  — 多节点:
  1. 每节点分配本地 GPU 内存
  2. 通过 DistributedEnv(MPI)交换句柄
  3. 使用 NVLink/NVSwitch 或网络实现跨节点访问

DistributedEnv

抽象了分布式环境接口:

方法 作用
rank() 当前进程编号
world_size() 总进程数
device_count() 每节点的 GPU 数
local_device() 当前进程的本地 GPU 编号
single_host() 是否单节点
barrier() 同步屏障
broadcast() 广播
all_gather() 全收集

Python 端通过 pynve.nve.MPIMemBlock 封装,使用 mpi4py 进行进程间通信。

实现细节:CUDA Virtual Memory

CUDADistributedBuffer::init_single_host() 使用 CUDA Virtual Memory Management API:

// 1. 创建物理内存块
CUmemAllocationProp prop = {};
prop.type = CU_MEM_ALLOCATION_TYPE_PINNED;
prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
prop.location.id = device_id;
cuMemCreate(&alloc_handle_, shard_size_, &prop, 0);

// 2. 映射到虚拟地址空间
cuMemAddressReserve(&ptr_, total_size_, 0, 0, 0);
cuMemMap(ptr_ + i * shard_size_, shard_size_, 0, alloc_handle_, 0);

// 3. 设置访问权限
CUmemAccessDesc access = {};
access.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
access.location.id = target_device_id;
access.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE;
cuMemSetAccess(ptr_ + i * shard_size_, shard_size_, &access, 1);

这种方式实现的是真正的跨 GPU 零拷贝共享——每块 GPU 通过自己的物理内存访问远端 GPU 的缓冲区,由 NVLink/NVSwitch 提供硬件转发。


五、线程池(ThreadPool)

接口

// include/thread_pool_base.hpp
class ThreadPool {
public:
    using task_type = std::function<void()>;
    using result_type = std::future<void>;

    virtual result_type submit(task_type task) = 0;      // 提交单个任务
    virtual void execute_n(int64_t workgroup, int64_t num_tasks,
                           const std::function<void(int64_t)>& task) = 0;  // 批量并行
    virtual int64_t num_workers() const = 0;
};

SimpleThreadPool

SimpleThreadPoolsrc/thread_pool.cpp)是默认实现:

  • 创建时启动固定数量的 worker 线程
  • submit() 将任务加入工作队列,线程池竞争获取
  • execute_n() 批量提交索引化任务并等待全部完成,通过 std::promise/std::future 同步

核心用途:CPU 端 HostTable 的并行查找、默认嵌入填充、CPU UVM accumulate。

使用模式

thread_pool_ptr_t pool = create_thread_pool({{"num_workers", 16}});

const int64_t num_tasks = (n + chunk_size - 1) / chunk_size;
pool->execute_n(0, num_tasks, [&](int64_t idx) {
    int64_t start = idx * chunk_size;
    int64_t end = std::min(start + chunk_size, n);
    for (int64_t i = start; i < end; i++) {
        process(i);
    }
});
// execute_n 在此处阻塞直到所有任务完成

六、分配器(Allocator)

接口

class Allocator {
public:
    virtual cudaError_t device_allocate(void** ptr, size_t sz, int device_id = -1) noexcept = 0;
    virtual cudaError_t device_free(void* ptr, int device_id = -1) noexcept = 0;
    virtual cudaError_t host_allocate(void** ptr, size_t sz) noexcept = 0;
    virtual cudaError_t host_free(void* ptr) noexcept = 0;
    virtual cudaError_t set_device(int device_id) noexcept = 0;
};

DefaultAllocator

DefaultAllocator 使用 cudaMalloc / cudaFree 分配 GPU 内存,使用 cudaMallocHost / cudaFreeHost 分配主机固定内存(page-locked memory,支持 GPU 直接访问)。

大页支持:对于大块主机内存分配(≥2MB),DefaultAllocator 会尝试使用 mmap 分配透明大页(Transparent Huge Pages),减少 TLB miss:

size_t get_largest_hugepage_bits(size_t alloc_size) {
    // 检查 1GB 和 2MB 大页的可用数量
    // 返回足够支撑分配大小的最大页大小(bit 数)
    // 如 2MB = 21 bits, 1GB = 30 bits
}

七、AOTInductor C++ 部署

完整流程

NVE 支持将包含 NVEmbedding / NVEmbeddingBag 的 PyTorch 模型导出为无 Python 运行时的 C++ 可执行程序。流程分为两步:

Step 1 — Python 导出

from pynve.torch.nve_export import export_aot

model = MyModel()
export_aot(model, (example_keys,), "save_dir/")

生成的文件结构:

save_dir/
├── model.pt2              # AOTInductor 编译后的计算图
├── metadata.json          # 每层配置(行数、维度、数据类型、cache 类型等)
└── weights/
    ├── embedding_0.nve    # 嵌入权重二进制文件
    └── embedding_1.nve

Step 2 — C++ 加载与推理

#include <torch/torch.h>
#include <torch/csrc/inductor/aoti_package/model_package_loader.h>
#include "python/pynve/torch_bindings/nve_loader.hpp"

// LayerDirectory 读取 metadata.json,创建嵌入层,加载权重
nve::LayerDirectory dir("save_dir/");

// 加载 AOT 编译后的模型
torch::inductor::AOTIModelPackageLoader loader("save_dir/model.pt2");

// 推理(无 Python 运行时)
auto keys = torch::tensor({0L, 1L, 5L, 10L},
    torch::TensorOptions().dtype(torch::kInt64).device(torch::kCUDA));
c10::InferenceMode mode;
auto outputs = loader.run({keys});

架构

Python 导出阶段:
  PyTorch 模型 (NVEmbedding)
    ↓ torch.export
  ExportedProgram
    ↓ AOTInductor compile
  model.pt2 (so 文件 + nve_ops::embedding_lookup 算子)
    + metadata.json + .nve weights

C++ 推理阶段:
  AOTIModelPackageLoader::run({keys})
    ↓ C10 Dispatcher
  nve_ops::embedding_lookup (STABLE_TORCH_LIBRARY)
    ↓ NVELayerRegistry (单例)
  LinearUVMEmbeddingLayer::lookup()
    ↓
  GPU Embedding Cache lookup

关键组件

  • libnve-torch-ops.so:注册自定义算子 nve_ops::embedding_lookup 到 C10 dispatcher,使用 STABLE_TORCH_LIBRARY 宏实现 LibTorch Stable ABI 兼容
  • NVELayerRegistry:全局单例,在 LayerDirectory 构造时将层 ID → 层指针的映射注册到 registry,AOT 模型通过层 ID 查找
  • LayerDirectory:RAII 类,构造函数创建层、加载权重、注册到 registry;析构函数注销并销毁
  • .nve 权重格式:NVE 自定义二进制格式,包含层元数据头 + 嵌入向量数据,支持快速内存映射加载

链接要求

g++ inference.cpp -o inference \
    -L/path/to/nve/build/lib -lnve-torch-ops -lnve-common \
    -L$(python -c "import torch; print(torch.utils.cmake_prefix_path)")/../lib \
    -ltorch -ltorch_cpu -ltorch_cuda -lc10 \
    -Wl,-rpath,/path/to/nve/build/lib

libnve-torch-ops.so 构建条件:必须启用 torch bindings(默认),且 PyTorch ≥ 2.10(Stable ABI 兼容性通过 cmake/check_torch_stable_abi.cpp 验证)。


八、可插拔组件总结

NVE 的三大可插拔基础设施:

组件 接口 默认实现 插件化方式
ThreadPool ThreadPool SimpleThreadPool create_thread_pool(json) + configure_default_thread_pool(json)
Allocator Allocator DefaultAllocator(cudaMalloc + 大页 mmap) 构造函数参数传入
InsertHeuristic InsertHeuristic DefaultInsertHeuristic(阈值 0.75) Layer Config 传入

使用示例

// 自定义分配器
class MyAllocator : public Allocator { ... };
auto allocator = std::make_shared<MyAllocator>();
auto layer = std::make_shared<GPUEmbeddingLayer<int64_t>>(config, allocator);

// 自定义线程池
auto pool = create_thread_pool({ {"num_workers", 32} });
configure_default_thread_pool({ {"num_workers", 32} });

// 自定义插入策略
auto heuristic = std::make_shared<StatisticalInsertHeuristic>(
    num_keys, std::vector<float>{2.6f});
HierarchicalEmbeddingLayer<int64_t>::Config cfg;
cfg.insert_heuristic = heuristic;