advanced-usage
高级主题深度解析
本文档对 NV Embedding Cache SDK 中的高级主题进行深度分析,涵盖执行上下文、缓存管理、插入启发式策略、多设备部署、线程池与分配器、AOTInductor C++ 部署等内容。每个主题先介绍概念,再结合源码分析实现细节。
一、执行上下文(ExecutionContext)
概念
执行上下文(ExecutionContext)是 NVE 中管理 GPU 操作所需状态和资源的容器。每个上下文代表一个可重用的并行执行环境——持有独立的 CUDA 流对(lookup stream + modify stream)、临时缓冲区、线程池和分配器引用。应用应创建固定数量的上下文,在初始化时完成分配,然后在运行时以轮询等方式循环使用。
类层次
ExecutionContext (基类, include/execution_context.hpp)
├── GPUEmbeddingTableExecutionContext (GPUEmbeddingLayer 专用)
├── GPUTableExecutionContext (GpuTable 专用)
└── HostTableExecutionContext (HostTable 专用)
核心成员
class ExecutionContext {
protected:
cudaStream_t lookup_stream_; // 查找操作流
cudaStream_t modify_stream_; // 修改操作流
thread_pool_ptr_t thread_pool_; // CPU 线程池引用
allocator_ptr_t allocator_; // 内存分配器引用
std::unordered_map<std::string, std::shared_ptr<ResizeableBuffer>> buffer_storage_;
// 按名称索引的临时缓冲区池
std::unordered_map<std::string, std::vector<cudaStream_t>> aux_streams_storage_;
// 辅助 CUDA 流
};
关键实现细节
get_buffer() — 懒分配临时缓冲区
void* ExecutionContext::get_buffer(const std::string& name, size_t size, bool host_alloc) {
auto internal_name = internal_name(name, host_alloc);
auto it = buffer_storage_.find(internal_name);
if (it == buffer_storage_.end()) {
// 按需分配,下次同名的 get_buffer 直接复用
auto buf = std::make_shared<ResizeableBuffer>(size, host_alloc, allocator_);
buffer_storage_.emplace(internal_name, buf);
return buf->data();
}
// 已存在则确保容量足够
it->second->reserve(size);
return it->second->data();
}
ResizeableBuffer 按需增长,但不会自动缩小。因此首次用到某个大尺寸操作时会触发分配,后续同尺寸操作复用已分配内存。
get_aux_streams() — 辅助流
std::vector<cudaStream_t> ExecutionContext::get_aux_streams(
const std::string& name, size_t num_streams) {
auto it = aux_streams_storage_.find(name);
if (it == aux_streams_storage_.end()) {
std::vector<cudaStream_t> streams(num_streams);
for (auto& s : streams) cudaStreamCreate(&s);
aux_streams_storage_.emplace(name, std::move(streams));
}
return it->second;
}
wait() — 同步全部工作
virtual void wait() {
cudaStreamSynchronize(lookup_stream_);
cudaStreamSynchronize(modify_stream_);
for (auto& kv : aux_streams_storage_) {
for (auto& stream : kv.second) {
cudaStreamSynchronize(stream);
}
}
}
使用模式
// 初始化阶段创建固定数量的上下文
std::vector<context_ptr_t> contexts;
for (int i = 0; i < num_threads; i++) {
cudaStream_t ls, ms;
cudaStreamCreate(&ls);
cudaStreamCreate(&ms);
contexts.push_back(layer->create_execution_context(ls, ms, thread_pool, allocator));
}
// 运行时循环使用
int ctx_idx = 0;
for (auto& batch : batches) {
auto ctx = contexts[ctx_idx++ % contexts.size()];
layer->lookup(ctx, batch.num_keys, batch.keys, batch.output, ...);
}
// 销毁前必须先释放所有上下文
contexts.clear();
layer.reset();
二、缓存管理
Modify 操作与 ModifyContext
所有改变缓存内容或驻留状态的操作(insert / update / update_accumulate / erase / clear)统称为 Modify 操作。每个 Modify 操作需要一个 ModifyContext——由 EmbedCacheSA 在 modify_context_create() 时分配,持有批量操作所需的临时缓冲区。
约束:GPU 端不支持多个 Modify 操作同时进行。用户必须确保 Modify 操作的串行化(例如通过单个 CUDA 流启动所有 Modify 操作)。NVE 的 Layer 层在内部通过 kernel_launch_mutex_ + modify_in_progress_ event + private_modify_stream_ 实现了这个约束。
Invalidate and Commit 范式
为了支持 Lookup 和 Modify 操作并行执行,NVE 采用”失效并提交”范式:
阶段 1 — Invalidate(失效):
Modify 操作首先启动 CUDA kernel 将相关 cache 条目标记为"失效中"
→ 后续 Lookup 对这些条目的访问会 miss(转到后备表读取)
↓
阶段 2 — Wait(等待):
等待所有正在进行中的 Lookup 操作完成
→ 通过 cudaEvent + cudaStreamWaitEvent 实现异步等待
→ 通过 ContextRegistry 收集所有活跃的 lookup stream
↓
阶段 3 — Commit(提交):
安全地修改缓存内容(写新数据、更新 tag)
重新启用失效的条目
→ 后续 Lookup 可以命中新写入的数据
Custom Flow API
对于使用多个 CUDA kernel 实现自定义查找流程的用户,Invalidate and Commit 需要额外的协调。如果用户使用超过一个 kernel 的流水线实现查找,需要调用 start_custom_flow() / end_custom_flow() API 来维持所需的原子性视图。
流同步细节
GPUTable 中 update/accumulate 操作的同步序列:
// Step 1: 等待所有进行中的查找完成
sync = contexts_->create_sync_event();
sync->event_record();
sync->event_wait_stream(update_stream);
// 此时所有 lookup stream 上的 work 已完成
// Step 2: 执行修改
UpdateTable<KeyType>(... , update_stream);
// Step 3: 标记完成
cudaEventRecord(uvm_update_event, update_stream);
cudaStreamWaitEvent(modify_stream, uvm_update_event);
三、插入启发式策略(InsertHeuristic)
接口
// include/insert_heuristic.hpp
class InsertHeuristic {
public:
virtual ~InsertHeuristic() = default;
virtual bool insert_needed(const float hitrate, const size_t table_id) = 0;
};
hitrate 是当前查找操作的命中率(范围 0.0 ~ 1.0),table_id 是层级中表的索引。返回值决定是否触发一次自动插入。
DefaultInsertHeuristic
默认插入策略,使用阈值比较:
class DefaultInsertHeuristic : public InsertHeuristic {
public:
static constexpr float DEFAULT_THRESHOLD = 0.75f;
DefaultInsertHeuristic(const std::vector<float> thresholds) : thresholds_(thresholds) {}
bool insert_needed(const float hitrate, const size_t table_id) override {
if (hitrate >= thresholds_.at(table_id)) {
return false; // 命中率足够高,不需要插入
}
// 加入随机抖动,避免多表同步触发插入
return dis_(gen_) < (thresholds_.at(table_id) - hitrate);
}
};
关键细节:
- 随机抖动:
dis_(gen_) < (threshold - hitrate)使得插入概率正比于命中率缺口。命中率远低于阈值时几乎必然触发插入,略低于阈值时概率较低。这避免了所有表同时触发插入导致的性能毛刺 - 层级默认阈值:对于
HierarchicalEmbeddingLayer,最后一张 host 表的阈值设为 0.0(永不触发自动插入,因为该层已有全量数据)
NeverInsertHeuristic
class NeverInsertHeuristic : public InsertHeuristic {
public:
bool insert_needed(const float, const size_t) override { return false; }
};
完全禁用自动插入。插入操作仅通过用户显式调用 layer->insert() 发生。
FSMInsertHeuristic
有限状态机驱动的插入策略,维护每张表的 prev_hitrate_ 和 state_:
States: Start → Steady
逻辑(从命名推断):
Start: 初始状态,连续低命中率时触发插入
Steady: 稳定状态,根据命中率变化趋势决定是否插入
目前标注为 // TODO: TRTREC-88,可能尚未完成。
StatisticalInsertHeuristic
基于统计概率的插入策略,使用 Chebyshev 不等式判断命中率是否显著偏离期望:
class StatisticalInsertHeuristic : public InsertHeuristic {
// 维护每个表的命中率滑动窗口(默认 14 个样本)
std::vector<std::deque<float>> hitrate_window_;
std::vector<float> hitrate_mean_;
std::vector<float> hitrate_var_;
// 使用切比雪夫不等式计算概率
float getChebyshevProb(const size_t table_id);
// 状态机: Start → Insert → Steady → Unstable
enum class State { Start, Insert, Steady, Unstable };
};
工作机制:
- 在每个查找操作后更新命中率的滑动窗口均值
mean和方差var - 使用切比雪夫不等式计算当前命中率偏离均值的概率上限:
P(|X - μ| ≥ kσ) ≤ 1/k² - 如果概率超过阈值且偏离方向为负(命中率下降),触发插入
- 状态机管理插入后的稳定期,避免频繁触发
配置参数:
| 参数 | 默认值 | 说明 |
|---|---|---|
k_factor |
2.6 | 标准差倍数因子。越大越不敏感,越小越容易触发插入 |
window_size |
14 | 滑动窗口大小,用于计算均值和方差 |
num_inserts_needed |
50 | 需要连续插入多少次才认为状态稳定 |
max_unsteady_samples |
3 | 允许的最大连续不稳定样本数 |
四、多设备与分布式支持
多 GPU 共享线性表
当嵌入表跨多块 GPU 时,有两种方案:
方案 1:UVM 共享表(单节点多 GPU)
使用 cudaMallocManaged 分配 UVM 内存,通过 cudaMemAdvise 优化访问:
void* linear_table = nullptr;
cudaMallocManaged(&linear_table, linear_table_size);
// 建议 GPU 0 频繁访问此区域
cudaMemAdvise(linear_table, linear_table_size,
cudaMemAdviseSetAccessedBy, 0);
// 将首选位置设为 CPU,保证所有 GPU 公平访问
cudaMemAdvise(linear_table, linear_table_size,
cudaMemAdviseSetPreferredLocation, cudaCpuDeviceId);
每个 GPU 创建独立的 GpuTable,共享同一个 uvm_table 指针:
auto gpu_tab1 = std::make_shared<GpuTable<int64_t>>(cfg); // device 0
cfg.device_id = 1;
auto gpu_tab2 = std::make_shared<GpuTable<int64_t>>(cfg); // device 1
// 独立 GPU cache,共享 UVM 后备表
UVM 更新冲突通过 disable_uvm_update 控制——只需一个 GpuTable 负责 UVM 写入,其余只读。
CUDADistributedBuffer(多节点)
CUDADistributedBuffer(include/distributed.hpp)在多节点场景下创建跨设备共享的 CUDA 虚拟地址映射缓冲区:
class CUDADistributedBuffer {
std::byte* ptr(); // 本地映射的缓冲区指针
uint64_t shard_size(); // 每个分片的大小
uint64_t num_shards(); // 总分片数 = world_size
};
初始化流程:
init_single_host() — 单节点多 GPU:
1. 每个 GPU 分配物理内存
2. 通过 CUDA Virtual Memory Management API
(cuMemCreate → cuMemMap → cuMemSetAccess)
将所有分片映射到所有 GPU 的虚拟地址空间
3. 每个 rank 通过 all_gather 交换分配句柄
init_multi_host() — 多节点:
1. 每节点分配本地 GPU 内存
2. 通过 DistributedEnv(MPI)交换句柄
3. 使用 NVLink/NVSwitch 或网络实现跨节点访问
DistributedEnv
抽象了分布式环境接口:
| 方法 | 作用 |
|---|---|
rank() |
当前进程编号 |
world_size() |
总进程数 |
device_count() |
每节点的 GPU 数 |
local_device() |
当前进程的本地 GPU 编号 |
single_host() |
是否单节点 |
barrier() |
同步屏障 |
broadcast() |
广播 |
all_gather() |
全收集 |
Python 端通过 pynve.nve.MPIMemBlock 封装,使用 mpi4py 进行进程间通信。
实现细节:CUDA Virtual Memory
CUDADistributedBuffer::init_single_host() 使用 CUDA Virtual Memory Management API:
// 1. 创建物理内存块
CUmemAllocationProp prop = {};
prop.type = CU_MEM_ALLOCATION_TYPE_PINNED;
prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
prop.location.id = device_id;
cuMemCreate(&alloc_handle_, shard_size_, &prop, 0);
// 2. 映射到虚拟地址空间
cuMemAddressReserve(&ptr_, total_size_, 0, 0, 0);
cuMemMap(ptr_ + i * shard_size_, shard_size_, 0, alloc_handle_, 0);
// 3. 设置访问权限
CUmemAccessDesc access = {};
access.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
access.location.id = target_device_id;
access.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE;
cuMemSetAccess(ptr_ + i * shard_size_, shard_size_, &access, 1);
这种方式实现的是真正的跨 GPU 零拷贝共享——每块 GPU 通过自己的物理内存访问远端 GPU 的缓冲区,由 NVLink/NVSwitch 提供硬件转发。
五、线程池(ThreadPool)
接口
// include/thread_pool_base.hpp
class ThreadPool {
public:
using task_type = std::function<void()>;
using result_type = std::future<void>;
virtual result_type submit(task_type task) = 0; // 提交单个任务
virtual void execute_n(int64_t workgroup, int64_t num_tasks,
const std::function<void(int64_t)>& task) = 0; // 批量并行
virtual int64_t num_workers() const = 0;
};
SimpleThreadPool
SimpleThreadPool(src/thread_pool.cpp)是默认实现:
- 创建时启动固定数量的 worker 线程
submit()将任务加入工作队列,线程池竞争获取execute_n()批量提交索引化任务并等待全部完成,通过std::promise/std::future同步
核心用途:CPU 端 HostTable 的并行查找、默认嵌入填充、CPU UVM accumulate。
使用模式
thread_pool_ptr_t pool = create_thread_pool({{"num_workers", 16}});
const int64_t num_tasks = (n + chunk_size - 1) / chunk_size;
pool->execute_n(0, num_tasks, [&](int64_t idx) {
int64_t start = idx * chunk_size;
int64_t end = std::min(start + chunk_size, n);
for (int64_t i = start; i < end; i++) {
process(i);
}
});
// execute_n 在此处阻塞直到所有任务完成
六、分配器(Allocator)
接口
class Allocator {
public:
virtual cudaError_t device_allocate(void** ptr, size_t sz, int device_id = -1) noexcept = 0;
virtual cudaError_t device_free(void* ptr, int device_id = -1) noexcept = 0;
virtual cudaError_t host_allocate(void** ptr, size_t sz) noexcept = 0;
virtual cudaError_t host_free(void* ptr) noexcept = 0;
virtual cudaError_t set_device(int device_id) noexcept = 0;
};
DefaultAllocator
DefaultAllocator 使用 cudaMalloc / cudaFree 分配 GPU 内存,使用 cudaMallocHost / cudaFreeHost 分配主机固定内存(page-locked memory,支持 GPU 直接访问)。
大页支持:对于大块主机内存分配(≥2MB),DefaultAllocator 会尝试使用 mmap 分配透明大页(Transparent Huge Pages),减少 TLB miss:
size_t get_largest_hugepage_bits(size_t alloc_size) {
// 检查 1GB 和 2MB 大页的可用数量
// 返回足够支撑分配大小的最大页大小(bit 数)
// 如 2MB = 21 bits, 1GB = 30 bits
}
七、AOTInductor C++ 部署
完整流程
NVE 支持将包含 NVEmbedding / NVEmbeddingBag 的 PyTorch 模型导出为无 Python 运行时的 C++ 可执行程序。流程分为两步:
Step 1 — Python 导出
from pynve.torch.nve_export import export_aot
model = MyModel()
export_aot(model, (example_keys,), "save_dir/")
生成的文件结构:
save_dir/
├── model.pt2 # AOTInductor 编译后的计算图
├── metadata.json # 每层配置(行数、维度、数据类型、cache 类型等)
└── weights/
├── embedding_0.nve # 嵌入权重二进制文件
└── embedding_1.nve
Step 2 — C++ 加载与推理
#include <torch/torch.h>
#include <torch/csrc/inductor/aoti_package/model_package_loader.h>
#include "python/pynve/torch_bindings/nve_loader.hpp"
// LayerDirectory 读取 metadata.json,创建嵌入层,加载权重
nve::LayerDirectory dir("save_dir/");
// 加载 AOT 编译后的模型
torch::inductor::AOTIModelPackageLoader loader("save_dir/model.pt2");
// 推理(无 Python 运行时)
auto keys = torch::tensor({0L, 1L, 5L, 10L},
torch::TensorOptions().dtype(torch::kInt64).device(torch::kCUDA));
c10::InferenceMode mode;
auto outputs = loader.run({keys});
架构
Python 导出阶段:
PyTorch 模型 (NVEmbedding)
↓ torch.export
ExportedProgram
↓ AOTInductor compile
model.pt2 (so 文件 + nve_ops::embedding_lookup 算子)
+ metadata.json + .nve weights
C++ 推理阶段:
AOTIModelPackageLoader::run({keys})
↓ C10 Dispatcher
nve_ops::embedding_lookup (STABLE_TORCH_LIBRARY)
↓ NVELayerRegistry (单例)
LinearUVMEmbeddingLayer::lookup()
↓
GPU Embedding Cache lookup
关键组件
libnve-torch-ops.so:注册自定义算子nve_ops::embedding_lookup到 C10 dispatcher,使用STABLE_TORCH_LIBRARY宏实现 LibTorch Stable ABI 兼容NVELayerRegistry:全局单例,在 LayerDirectory 构造时将层 ID → 层指针的映射注册到 registry,AOT 模型通过层 ID 查找LayerDirectory:RAII 类,构造函数创建层、加载权重、注册到 registry;析构函数注销并销毁.nve权重格式:NVE 自定义二进制格式,包含层元数据头 + 嵌入向量数据,支持快速内存映射加载
链接要求
g++ inference.cpp -o inference \
-L/path/to/nve/build/lib -lnve-torch-ops -lnve-common \
-L$(python -c "import torch; print(torch.utils.cmake_prefix_path)")/../lib \
-ltorch -ltorch_cpu -ltorch_cuda -lc10 \
-Wl,-rpath,/path/to/nve/build/lib
libnve-torch-ops.so 构建条件:必须启用 torch bindings(默认),且 PyTorch ≥ 2.10(Stable ABI 兼容性通过 cmake/check_torch_stable_abi.cpp 验证)。
八、可插拔组件总结
NVE 的三大可插拔基础设施:
| 组件 | 接口 | 默认实现 | 插件化方式 |
|---|---|---|---|
| ThreadPool | ThreadPool |
SimpleThreadPool |
create_thread_pool(json) + configure_default_thread_pool(json) |
| Allocator | Allocator |
DefaultAllocator(cudaMalloc + 大页 mmap) |
构造函数参数传入 |
| InsertHeuristic | InsertHeuristic |
DefaultInsertHeuristic(阈值 0.75) |
Layer Config 传入 |
使用示例:
// 自定义分配器
class MyAllocator : public Allocator { ... };
auto allocator = std::make_shared<MyAllocator>();
auto layer = std::make_shared<GPUEmbeddingLayer<int64_t>>(config, allocator);
// 自定义线程池
auto pool = create_thread_pool({ {"num_workers", 32} });
configure_default_thread_pool({ {"num_workers", 32} });
// 自定义插入策略
auto heuristic = std::make_shared<StatisticalInsertHeuristic>(
num_keys, std::vector<float>{2.6f});
HierarchicalEmbeddingLayer<int64_t>::Config cfg;
cfg.insert_heuristic = heuristic;