前言现在开始讲内存和VM相关的内容,开始深入Solana底层字节码的执行.这篇文章会要求一点操作系统相关的内容,会难一点.内存对齐先讲下什么是内存对齐,以及为什么要内存对齐.是啥内存对齐指的是数据在内存中的存放地址(起始地址)必须是其自身大小的整数倍.例如一个4字节的int,要
现在开始讲内存和VM相关的内容,开始深入 Solana 底层字节码的执行.
这篇文章会要求一点操作系统相关的内容,会难一点.
先讲下什么是内存对齐,以及为什么要内存对齐.
内存对齐指的是数据在内存中的存放地址(起始地址)必须是其自身大小的整数倍.
例如一个 4 字节的int,要放在 4 的倍数的地址上, 0x1000, 0x1004, 0x1008.
8 字节的double放在 8 的倍数地址上, 0x1000, 0x1008.
数组类型,例如 int[], 是按 int 也就是 4 个字节,而不是整个数组的大小.
Struct类型,则按字节数最高的那个.
如果存放地址不满足这个条件,就叫未对齐访问.
这个模块就负责对齐内存的分配.Github
其实知道了内存对齐是啥以后, 这个模块就没啥讲的了, 按要求分配空间就完事.
在文件开头就能看到这堆很奇怪的东西,奇怪在于好像啥都没做.
这是因为后面的函数会限制参数类型要求, 要求必须实现 Pod Trait,所以在这里实现.
你只要记住, 他们限制哪些类型可以被当作原始字节在这块对齐内存里读写.
/// Scalar types, aka "plain old data"
pub trait Pod: Copy {}
impl Pod for u8 {}
impl Pod for u16 {}
impl Pod for u32 {}
impl Pod for u64 {}
impl Pod for i8 {}
impl Pod for i16 {}
impl Pod for i32 {}
impl Pod for i64 {}
先看下 AlignedVec, 他是后面 AlignedMemory 的底层实现.
内容及其简单,就三个字段,在哪,多长,用了多长.
struct AlignedVec<const ALIGN: usize> {
/// 指向对齐分配块起始地址的非空指针(分配时保证非空)
ptr: NonNull<u8>,
/// 当前已写入/初始化的字节数(逻辑长度)
length: usize,
/// 分配块的总字节数(物理容量)
capacity: usize,
}
看下分配的过程.
实际调用的 Rust 的 Layout::from_size_align(max_len, ALIGN) 进行分配, 只是多了一些检测.
ALIGN 是对齐的长度.
fn new(max_len: usize, zeroed: bool) -> Self {
assert!(ALIGN != 0, "Alignment must not be zero");
if max_len == 0 {
return Self::empty(); // 零容量,返回持有悬垂指针的空 Vec
}
unsafe {
// 构造 Layout:size=max_len, align=ALIGN
let layout = Layout::from_size_align(max_len, ALIGN).expect("invalid layout");
// 根据 zeroed 标志选择分配函数
let ptr = if zeroed {
alloc_zeroed(layout) // 分配并清零
} else {
alloc(layout) // 仅分配,内容未初始化
};
if ptr.is_null() {
handle_alloc_error(layout); // 分配失败 → abort
}
Self {
ptr: NonNull::new(ptr).unwrap_or_else(|| handle_alloc_error(layout)),
length: 0, // 初始已写入长度为 0
capacity: max_len, // 物理容量 = 请求的字节数
}
}
}
其他的部分都挺简单的,感兴趣的自己看下.
对 AlignedVec 的进一层封装.
继续看下 AlignedMemory 结构体.
内容很简单,就两个字段.
#[derive(Debug, PartialEq, Eq)]
pub struct AlignedMemory<const ALIGN: usize> {
/// 底层对齐内存存储, ALIGN 是对齐的长度
mem: AlignedVec<ALIGN>,
/// 若为 true,则 `[0..capacity]` 范围的内存已全部初始化为零
zero_up_to_max_len: bool,
}
继续往下看实现.
在上一篇文中的 elf.rs 中, 读取字节到内存中就是用的
AlignedMemory::<{ HOST_ALIGN }>::from_slice(bytes) 进行字节的加载.
因为还没讲到所以略过.ELF 用的16个字节的对齐, 以满足通用 CPU/SIMD 基本对齐需求.
这里的 copy_nonoverlapping 内部实现等价于 memcopy.
pub fn from_slice(data: &[u8]) -> Self {
let max_len = data.len();
let mut mem = AlignedVec::new(max_len, false); // 分配未初始化的对齐内存
unsafe {
// 拷贝数据到对齐内存中
core::ptr::copy_nonoverlapping(data.as_ptr(), mem.as_mut_ptr(), max_len);
mem.set_len(max_len); // 标记全部内容已初始化
}
Self {
mem,
zero_up_to_max_len: false, // 非零初始化模式
}
}
其他的函数都比较简单, 感兴趣的自己看下.
虚拟地址空间: 在多任务的操作系统中,为了让多个程序能够安全、高效地同时运行,操作系统会为每个进程创建一个私有的、连续的虚拟地址空间.
Solana 就运行在系统给他分配的虚拟地址空间(定为宿主地址)中,然后它的虚拟机又为合约的执行分配了一份虚拟地址空间(定为sBPPF虚拟地址).也就有了上一章中说的内存布局.
这个文件就负责虚拟地址空间的地址分配,地址转换各种.
先看下 MemoryRegion 的部分.MemoryRegion 并不负责内存的申请.
它是用于给已经申请的内存打标签, 做sBPF虚拟地址地址和宿主地址的转换.
在 Solana 中,用到 MemoryRegion 的地方主要是上一章讲过的内存布局.
下面是 cli 中的使用, cli 只是附带的调试工具.
实际使用是在 Agave-vm.rs. Agave 变化很快,下次你刷到这篇文章的时候不知道又重构成啥样了.
// https://github.com/anza-xyz/sbpf/blob/main/cli/src/main.rs
let regions: Vec<MemoryRegion> = vec![
executable.get_ro_region(),
MemoryRegion::new_writable_gapped(
stack.as_slice_mut(),
ebpf::MM_STACK_START,
if sbpf_version.stack_frame_gaps() && config.enable_stack_frame_gaps {
config.stack_frame_size as u64
} else {
0
},
),
MemoryRegion::new_writable(heap.as_slice_mut(), ebpf::MM_HEAP_START),
MemoryRegion::new_writable(&mut mem, ebpf::MM_INPUT_START),
];
#[repr(C, align(32))] 用于控制自定义数据类型(如结构体、枚举)的内存布局和对齐方式.
repr(C):强制使用 C 语言的内存布局。字段按照声明的顺序依次排列,编译器不会为了优化而重排字段。
align(32): 指定该类型的内存对齐要求为 32 字节。这意味着该类型的每个实例在内存中的起始地址必须是 32 的倍数,同时其整体大小也会被填充到 32 的倍数.
vm_gap_shift 用于在 MemoryRegion内部 添加间隙.等于63的时候代表无间隙
vm_addr 虚拟地址指的就是内存布局中的起始.例如代码区(0x100000000)
host_addr 实际在宿主进程(Solana)中给你分配的地址起始.
这里的地址一定要搞清楚,vm_addr是vm,也就是sBPF虚拟机用的,合约是跑在这个地址里面.
但是非合约, 例如你在项目中看到的其他 Rust 写的逻辑, 都是宿主地址中.不是整个sBPF都是虚拟地址,是只有合约的执行是在虚拟地址.
间隙是在虚拟地址中才有,在对应的宿主地址中是没有的.
为什么要有间隙?
是为了防止越界访问.对一块内存按指定字节进行划分并添加间隙.如果你跨块访问,立马报错.
#[derive(Default, Eq, PartialEq, Clone)]
#[repr(C, align(32))] // 32 字节对齐
pub struct MemoryRegion {
/// 宿主进程地址起始
pub host_addr: u64,
/// 对应的虚拟地址起始
pub vm_addr: u64,
/// 该区域的宿主机字节长度
pub len: u64,
/// 间隙大小的位移量
pub vm_gap_shift: u8,
/// 是否允许写入(Store 操作);false 则写入触发访问违规
pub writable: bool,
/// 访问违规处理器的用户自定义载荷(如帧索引),最大 u16
pub access_violation_handler_payload: Option<u16>,
}
看一下新建相关的实现,其他的容易理解,就 vm_gap_shift 的计算麻烦点.
host_addr 直接用传入进来的字节切片的指针转成地址.
std::mem::size_of::<u64>() = 8, vm_gap_shift = 8 * 8 - 1 = 63.
如果vm_gap_size = 0, 那么 vm_gap_shift = 63.
在前面 cli 可以看到只有 stack 区有 vm_gap_size 且等于 stack_frame_size.
stack_frame_size 在 Agave 中等于 4096 字节,也就是 4KB.
vm_gap_size.leading_zeros(): 4096 = 2^12,实际1在右起13位,在64位中有51位前导0
vm_gap_shift.saturating_sub(vm_gap_size.leading_zeros()) = 63-51 = 12.
fn new(slice: &[u8], vm_addr: u64, vm_gap_size: u64, writable: bool) -> Self {
let mut vm_gap_shift = (std::mem::size_of::<u64>() as u8)
.saturating_mul(8)
.saturating_sub(1);
if vm_gap_size > 0 {
vm_gap_shift = vm_gap_shift.saturating_sub(vm_gap_size.leading_zeros() as u8);
debug_assert_eq!(Some(vm_gap_size), 1_u64.checked_shl(vm_gap_shift as u32));
};
MemoryRegion {
host_addr: slice.as_ptr() as u64,
vm_addr,
len: slice.len() as u64,
vm_gap_shift,
writable,
access_violation_handler_payload: None,
}
}
/// 创建只读的连续内存区域。
pub fn new_readonly(slice: &[u8], vm_addr: u64) -> Self {
Self::new(slice, vm_addr, 0, false) // vm_gap_size=0(连续),writable=false
}
/// 创建可写的连续内存区域。
pub fn new_writable(slice: &mut [u8], vm_addr: u64) -> Self {
Self::new(&*slice, vm_addr, 0, true) // vm_gap_size=0(连续),writable=true
}
/// 创建可写的间隙内存区域(用于栈帧)。
/// `vm_gap_size` 指定每个间隙的字节大小(必须是 2 的幂)。
pub fn new_writable_gapped(slice: &mut [u8], vm_addr: u64, vm_gap_size: u64) -> Self {
Self::new(&*slice, vm_addr, vm_gap_size, true) // writable=true,有间隙
}
这里不理解的话再去看看刚才公式的计算结果和传入的参数.
pub fn vm_addr_range(&self) -> Range<u64> {
if self.vm_gap_shift == 63 {
self.vm_addr..self.vm_addr.saturating_add(self.len)
} else {
self.vm_addr..self.vm_addr.saturating_add(self.len.saturating_mul(2)) // 间隙:虚拟空间翻倍
}
}
看下地址转换的部分.只讲下有间隙部分.
前面说过虚拟地址中间隙在对应的宿主地址中是没有的.也就说对于长度为 4096 字节的块,加上4096字节的间隙一共是8192字节.在宿主地址中对应的是实际4096字节的块.
原始虚拟地址 (含间隙) 折叠后宿主地址 (连续)
+--------+--------+--------+ +--------+--------+
| Block0 | Gap | Block2 | → | Block0 | Block2 |
| (Data) |(跳过) | (Data) | | (Data) | (Data) |
+--------+--------+--------+ +--------+--------+
偏移: 0 4096 8192 宿主偏移: 0 4096
4096的二进制是 1000000000000,
8192的二进制是 10000000000000,
12288的二进制是11000000000000,
16384的二进制是100000000000000,
let is_in_gap = (begin_offset.checked_shr(self.vm_gap_shift as u32)& 1) == 1
之前计算出 vm_gap_shift 是 12 了,这里右移 12 位 相当于除以 4096,得到的是0,1,2,3...
再 &1 就能得到是奇数还是偶数.奇数就是在 Gap, 偶数不是.
-1i64 等于 11111111 11111111 11111111 11111111 11111111 11111111 11111111 11111111
右移12位 11111111 11111111 11111111 11111111 11111111 11111111 11110000 00000000
(begin_offset & gap_mask).checked_shr(1) 找出高12位并右移.
这里相当于裁掉了所有的低位,剩下的数值肯定是4096的倍数.
再右移1, 相当于除2, 两块合并一块.
pub fn vm_to_host(&self, access_type: AccessType, vm_addr: u64, len: u64) -> Option<u64> {
if access_type == AccessType::Store && !self.writable {
return None;
}
if vm_addr < self.vm_addr {
return None;
}
let begin_offset = vm_addr.saturating_sub(self.vm_addr);
if self.vm_gap_shift == 63 {
// fast path for non-gapped regions
if let Some(end_offset) = begin_offset.checked_add(len) {
if end_offset <= self.len {
return Some(self.host_addr.saturating_add(begin_offset));
}
}
return None;
}
let is_in_gap = (begin_offset
.checked_shr(self.vm_gap_shift as u32)
.unwrap_or(0)
& 1)
== 1;
let gap_mask = (-1i64).checked_shl(self.vm_gap_shift as u32).unwrap_or(0) as u64;
let gapped_offset =
(begin_offset & gap_mask).checked_shr(1).unwrap_or(0) | (begin_offset & !gap_mask);
if let Some(end_offset) = gapped_offset.checked_add(len) {
if end_offset <= self.len && !is_in_gap {
return Some(self.host_addr.saturating_add(gapped_offset));
}
}
None
}
虚拟内存映射的公共入口.可以把它当作路由器.
持有所有 MemoryRegion 的集合,负责把 sBPF 虚拟地址路由到正确的那块宿主内存。
pub struct MemoryMapping {
/// 访问违规时的回调处理器
access_violation_handler: AccessViolationHandler,
/// 最大调用深度(用于栈访问违规的错误信息生成)
max_call_depth: i64,
/// 每个栈帧的字节大小
stack_frame_size: i64,
/// 是否禁用地址翻译(true 时 vm_addr == host_addr)
disable_address_translation: bool,
/// 程序的 sBPF 版本(决定使用哪种映射类型)
sbpf_version: SBPFVersion,
/// 是否已经完成 initialize()(防止未初始化时使用)
initialized: bool,
/// 实际映射类型:Aligned(快速)或 Unaligned(灵活)
ty: MemoryMappingType,
}
看一下新建的部分.
实际使用的 new_with_access_violation_handler, new是测试用.
很简单自己看吧,就是会根本sBPF版本选择不同的方案.具体内容一会再讲.
pub fn new_with_access_violation_handler(
regions: Vec<MemoryRegion>,
config: &Config,
sbpf_version: SBPFVersion,
access_violation_handler: AccessViolationHandler,
) -> Result<Self, EbpfError> {
let mut mapping =
Self::new_uninitialized(regions, config, sbpf_version, access_violation_handler);
mapping.initialize()?;
Ok(mapping)
}
/// Creates an unitialized memory mapping
pub fn new_uninitialized(
regions: Vec<MemoryRegion>,
config: &Config,
sbpf_version: SBPFVersion,
access_violation_handler: AccessViolationHandler,
) -> Self {
let ty = if sbpf_version >= SBPFVersion::V4 || config.aligned_memory_mapping {
MemoryMappingType::Aligned(AlignedMemoryMapping::new_uninitialized(regions, config))
} else {
debug_assert!(
sbpf_version <= SBPFVersion::V3,
"SBPFv4 and later versions do not support unaligned memory"
);
MemoryMappingType::Unaligned(UnalignedMemoryMapping::new_uninitialized(regions))
};
Self {
access_violation_handler: Box::new(access_violation_handler),
max_call_depth: config.max_call_depth as i64,
stack_frame_size: config.stack_frame_size as i64,
disable_address_translation: !config.enable_address_translation,
sbpf_version,
initialized: false,
ty,
}
}
pub fn new(
regions: Vec<MemoryRegion>,
config: &Config,
sbpf_version: SBPFVersion,
) -> Result<Self, EbpfError> {
Self::new_with_access_violation_handler(
regions,
config,
sbpf_version,
Box::new(default_access_violation_handler),
)
}
pub fn initialize(&mut self) -> Result<(), EbpfError> {
let result = match &mut self.ty {
MemoryMappingType::Aligned(inner) => inner.initialize(),
MemoryMappingType::Unaligned(inner) => inner.initialize(),
};
self.initialized = result.is_ok(); // 仅成功时标记为已初始化
result
}
继续看下读写部分
Pod Trait 之前讲过了, 这里 load 会要求 Into<u64>, 也就是必须要能转换成 u64.
主要逻辑都在 map_with_access_violation_handler
这里的 find_region 直接调用底层类型的 find_region, 一会再具体讲.
暂时知道是找出 vm_addr 属于那个 MemoryRegion.
map_with_access_violation_handler 中如果转换地址失败会对 region 调用 access_violation_handler(new的时候传入的).
在 Agave 中, 传入的是 新的帐户数据写入访问处理程序(access_violation_handler)
pub fn map_with_access_violation_handler(
&mut self,
access_type: AccessType,
vm_addr: u64,
len: u64,
) -> ProgramResult {
debug_assert!(self.initialized);
if self.disable_address_translation {
return ProgramResult::Ok(vm_addr);
}
if let Some((index, region)) = self.find_region(vm_addr) {
if let Some(host_addr) = region.vm_to_host(access_type, vm_addr, len) {
return ProgramResult::Ok(host_addr); // 正常转换成功
}
// 转换失败,先,克隆区域,后面的handler可能会修改
let mut region = (*region).clone();
// 计算本区域的最大扩展长度,为了保证不重叠,最大只能到下一个region的起始地址.
let max_len = self
.get_regions()
.get(index.saturating_add(1))
.map_or(u64::MAX, |next_region| next_region.vm_addr)
.saturating_sub(region.vm_addr);
// 调用访问违规处理器(可能动态扩展栈帧,修改 region)
(self.access_violation_handler)(&mut region, max_len, access_type, vm_addr, len);
// handler 修改后再次尝试翻译
if let Some(host_addr) = region.vm_to_host(access_type, vm_addr, len) {
// 翻译成功,将修改后的区域写回
if let Err(err) = self.replace_region(index, region) {
return ProgramResult::Err(err);
}
return ProgramResult::Ok(host_addr);
}
}
self.generate_access_violation(access_type, vm_addr, len)
}
pub fn load<T: Pod + Into<u64>>(&mut self, vm_addr: u64) -> ProgramResult {
let len = mem::size_of::<T>() as u64;
debug_assert!(len <= mem::size_of::<u64>() as u64); // 确保 T 不超过 u64
debug_assert!(self.initialized);
match self.map_with_access_violation_handler(AccessType::Load, vm_addr, len) {
ProgramResult::Ok(host_addr) => {
// read_unaligned:即使地址未对齐也可安全读取
ProgramResult::Ok(unsafe { ptr::read_unaligned::<T>(host_addr as *const T) }.into())
}
err => err,
}
}
pub fn store<T: Pod>(&mut self, value: T, vm_addr: u64) -> ProgramResult {
let len = mem::size_of::<T>() as u64;
debug_assert!(len <= mem::size_of::<u64>() as u64);
debug_assert!(self.initialized);
match self.map_with_access_violation_handler(AccessType::Store, vm_addr, len) {
ProgramResult::Ok(host_addr) => {
// write_unaligned:即使地址未对齐也可安全写入
unsafe { ptr::write_unaligned(host_addr as *mut T, value) };
ProgramResult::Ok(host_addr)
}
err => err,
}
}
这是访问出错时的处理.
fn generate_access_violation(
&self,
access_type: AccessType,
vm_addr: u64,
len: u64,
) -> ProgramResult {
// 计算 vm_addr 落在哪个栈帧(负数表示在栈底以下,超出最大深度表示太深)
let stack_frame = (vm_addr as i64)
.saturating_sub(ebpf::MM_STACK_START as i64)
.checked_div(self.stack_frame_size)
.unwrap_or(0);
if !self.sbpf_version.manual_stack_frame_bump()
&& (-1..self.max_call_depth.saturating_add(1)).contains(&stack_frame)
{
// 在合法调用深度的栈区 → 栈访问违规(给出栈帧号方便调试)
ProgramResult::Err(EbpfError::StackAccessViolation(
access_type,
vm_addr,
len,
stack_frame,
))
} else {
// 不在栈区或超出深度 → 普通访问违规(给出区域名称方便调试)
let region_name = match vm_addr & (!ebpf::MM_BYTECODE_START.saturating_sub(1)) {
ebpf::MM_BYTECODE_START => "program",
ebpf::MM_STACK_START => "stack",
ebpf::MM_HEAP_START => "heap",
ebpf::MM_INPUT_START => "input",
_ => "unknown",
};
ProgramResult::Err(EbpfError::AccessViolation(
access_type,
vm_addr,
len,
region_name,
))
}
}
其他几个函数主要调用的底层的 MemoryMappingType 的处理函数.
直接去看 MemoryMappingType 的两个类型.
pub enum MemoryMappingType {
/// 对齐映射:利用地址高位直接定位,速度最快(SBPFv4+ 或配置强制启用)
Aligned(AlignedMemoryMapping),
/// 非对齐映射:使用 Eytzinger 布局二分搜索(仅 SBPFv3 及以下版本支持)
Unaligned(UnalignedMemoryMapping),
}
对齐的内存映射.
利用地址高位直接定位,速度最快(SBPFv4+ 或配置强制启用)
pub struct AlignedMemoryMapping {
/// 按虚拟地址排序的内存区域列表\
regions: Vec<MemoryRegion>,
/// 是否允许虚拟地址空间第 0 块(NULL 保护块)存在映射
allow_memory_region_zero: bool,
}
在内存布局中说过:
只读数据区(0x000000000): 存放硬编码常量.代码区(0x100000000) : 存放程序的指令.栈区(0x200000000) : 用于函数调用和局部变量堆区(0x300000000) : 用于程序的动态内存分配输入数据区(0x400000000) : 存放 Solana 传入的交易输入(如账户信息、指令数据).转成二进制是:
000000000000000000000000000000000
100000000000000000000000000000000
1000000000000000000000000000000000
1100000000000000000000000000000000
10000000000000000000000000000000000
右移 32 位后,变成 0, 1, 10, 11, 100
这是 initialize 中 allow_memory_region_zero 为 true 的部分.
这里其实是填充缺失的块,确保 regions 数组中的 index 恰好等于 vm_addr 右移 32 位后的值.
let mut expected_region_index = 0;
while expected_region_index < self.regions.len() {
// 计算当前区域实际应在哪个块(vm_addr >> 32)
let actual_region_index = self
.regions
.get(expected_region_index)
.unwrap()
.vm_addr
.checked_shr(ebpf::VIRTUAL_ADDRESS_BITS as u32)
.unwrap_or(0) as usize;
if actual_region_index > expected_region_index {
// 空隙:在期望位置插入空只读区域,填充缺失的块
self.regions.insert(
expected_region_index,
MemoryRegion::new_readonly(
&[],
(expected_region_index as u64).saturating_mul(ebpf::MM_REGION_SIZE),
),
);
} else if actual_region_index < expected_region_index {
// 区域块索引比期望小,说明有重叠或顺序错误
return Err(EbpfError::InvalidMemoryRegion(actual_region_index));
}
expected_region_index = expected_region_index.saturating_add(1);
}
allow_memory_region_zero 为 false 的部分.
跟上面的功能一样的,都是确保每个区域的块号等于其数组下标.
不通过的是上面缺失的自动填充,这里要求传入的不能缺失.
为什么要在 index 0 插入 NULL 保护区域.
因为内存布局就是这样规定,然后你去看 Agave 就会发现一开始就没插入这块.
// 在 index 0 插入 NULL 保护区域(vm_addr=0,空 slice)
self.regions.push(MemoryRegion::new_readonly(&[], 0));
self.regions.sort(); // 排序后 NULL 区域应在 index 0
// 验证每个区域的块号等于其数组下标
for (index, region) in self.regions.iter().enumerate() {
if region
.vm_addr
.checked_shr(ebpf::VIRTUAL_ADDRESS_BITS as u32)
.unwrap_or(0)
!= index as u64
{
return Err(EbpfError::InvalidMemoryRegion(index));
}
}
找出虚拟地址位于哪个块. 原理看上面的右移32位.
pub fn find_region(&self, vm_addr: u64) -> Option<(usize, &MemoryRegion)> {
// 高 32 位即为区域索引
let index = vm_addr.wrapping_shr(ebpf::VIRTUAL_ADDRESS_BITS as u32) as usize;
if index < self.regions.len() && (index > 0 || self.allow_memory_region_zero) {
// Safety: 上方已做边界检查
let region = unsafe { self.regions.get_unchecked(index) };
return Some((index, region));
}
None
}
替换指定块.
pub fn replace_region(&mut self, index: usize, region: MemoryRegion) -> Result<(), EbpfError> {
// 新区域起始地址的块号
let begin_index = region
.vm_addr
.checked_shr(ebpf::VIRTUAL_ADDRESS_BITS as u32)
.unwrap_or(0) as usize;
// 新区域末尾地址的块号
let end_index = region
.vm_addr
.saturating_add(region.len.saturating_sub(1))
.checked_shr(ebpf::VIRTUAL_ADDRESS_BITS as u32)
.unwrap_or(0) as usize;
// 起止地址必须在同一块,且等于目标索引
if begin_index != index || end_index != index {
return Err(EbpfError::InvalidMemoryRegion(index));
}
self.regions[index] = region;
Ok(())
}
基于 Eytzinger 布局的非对齐内存映射, 用于 SBPFv3 及以前版本.
Eytzinger 布局是一种将完全二叉树隐式存储为数组的方式.
Eytzinger 布局算法 的内容我就不就不讲了,感兴趣的自己去了解下Eytzinger.
新的版本也不用这个UnalignedMemoryMapping, 不想讲了.
pub struct UnalignedMemoryMapping {
/// 所有内存区域(按 vm_addr 排序)
regions: Box<[MemoryRegion]>,
/// 以 Eytzinger 顺序存储各区域的 vm_addr(用于缓存友好二分搜索)
region_addresses: Box<[u64]>,
/// Eytzinger 顺序 → 原始数组顺序的索引映射(搜索到节点后查找对应 region)
region_index_lookup: Box<[usize]>,
/// 最近 4 次 vm_addr → region_index 查找结果缓存(UnsafeCell 允许不可变引用下修改)
cache: UnsafeCell<MappingCache>,
}
关键在于,构造 eytzinger order.
fn construct_eytzinger_order(&mut self, mut in_index: usize, out_index: usize) -> usize {
if out_index >= self.regions.len() {
return in_index; // 超出数组边界,返回当前 in_index(递归终止)
}
// 先处理左子树(out_index 的左子 = 2*out_index + 1)
in_index =
self.construct_eytzinger_order(in_index, out_index.saturating_mul(2).saturating_add(1));
// 处理当前节点:将有序数组第 in_index 个元素的 vm_addr 放入 Eytzinger 位置
self.region_addresses[out_index] = self.regions[in_index].vm_addr;
self.region_index_lookup[out_index] = in_index; // 记录反向映射
// 再处理右子树(out_index 的右子 = 2*out_index + 2)
self.construct_eytzinger_order(
in_index.saturating_add(1),
out_index.saturating_mul(2).saturating_add(2),
)
}
查找流程
pub fn find_region(&self, vm_addr: u64) -> Option<(usize, &MemoryRegion)> {
// 获取缓存的可变引用(UnsafeCell 允许通过不可变 self 引用修改缓存)
// 安全:MemoryMapping 是 !Sync,无多线程并发访问问题
let cache = unsafe { &mut *self.cache.get() };
if let Some(index) = cache.find(vm_addr) {
// 缓存命中:直接返回缓存的区域(索引已经过验证,可以 unchecked)
Some((index, unsafe { self.regions.get_unchecked(index) }))
} else {
// 缓存未命中:Eytzinger 二分搜索
let mut index = 1; // 从 Eytzinger 根节点(1-indexed)开始
while index <= self.region_addresses.len() {
// Safety: index 从 1 开始并在 <= len 时进入循环,避免边界检查
// 根据当前节点 vm_addr 与目标的大小关系决定走右子(+1)还是左子(+0)
index = (index << 1)
+ unsafe { *self.region_addresses.get_unchecked(index - 1) <= vm_addr }
as usize;
}
// 利用 trailing_zeros 提取最后一个有效的父节点索引
index >>= index.trailing_zeros() + 1;
if index == 0 {
return None; // 未找到
}
// 将 Eytzinger 顺序索引转换为原始区域数组索引
index = unsafe { *self.region_index_lookup.get_unchecked(index - 1) };
let region = unsafe { self.regions.get_unchecked(index) };
cache.insert(region.vm_addr_range(), index); // 将结果插入缓存
Some((index, region))
}
}
sBPF 虚拟机模块, 提供顶层执行入口、VM 配置、上下文对象接口和调用栈帧定义.
关于 JIT 的部分先跳过, 下一章再讲.
这里面大部分都是配置相关的.
pub struct Config {
/// 最大函数调用嵌套深度(超过则抛出 CallDepthExceeded 错误)
pub max_call_depth: usize,
/// 每个栈帧的大小(字节),必须与 LLVM BPF 后端配置一致(默认 4096)
pub stack_frame_size: usize,
/// 启用地址翻译;禁用后 BPF 地址直接作为主机地址使用(不安全,仅测试)
pub enable_address_translation: bool,
/// 在相邻栈帧之间插入保护间隙页(防止栈溢出写入相邻帧)
pub enable_stack_frame_gaps: bool,
/// JIT 输出的指令计数器检查点之间最大 PC 距离(超过则插入 checkpoint)
pub instruction_meter_checkpoint_distance: usize,
/// 启用指令计数器(CU 计量);关闭后 get_remaining/consume 不受限制
pub enable_instruction_meter: bool,
/// 启用寄存器追踪
pub enable_register_tracing: bool,
/// 为 ELF 符号和节分配动态字符串标签(占用更多内存,但错误消息更友好)
pub enable_symbol_and_section_labels: bool,
/// 拒绝在旧版本验证器中漏网的格式异常 ELF 文件
pub reject_broken_elfs: bool,
#[cfg(feature = "jit")]
/// JIT 输出中随机注入 no-op 指令的频率(每 N 条主机指令一个,0=关闭)
pub noop_instruction_rate: u32,
#[cfg(feature = "jit")]
/// JIT 模式下对用户提供的立即数和偏移量进行消毒(防止用于攻击 JIT 代码)
pub sanitize_user_provided_values: bool,
/// 尽量避免复制只读节(减少内存分配,直接引用 ELF 字节)
pub optimize_rodata: bool,
/// 允许对齐内存映射中地址为零的内存区域
pub allow_memory_region_zero: bool,
/// 使用对齐内存映射(O(1) 地址查找,按虚拟地址高位直接索引)
pub aligned_memory_mapping: bool,
/// 允许的 sBPF 版本范围(可执行文件必须在此范围内)
pub enabled_sbpf_versions: std::ops::RangeInclusive<SBPFVersion>,
}
ContextObject 宿主环境上下文接口,BPF 程序通过系统调用与宿主交互时使用。
用户必须实现此 trait 以提供指令预算管理和内存映射访问。
pub trait ContextObject {
/// 消耗指定数量的指令预算(递减计数器)
fn consume(&mut self, amount: u64);
/// 获取剩余可用指令数(预算耗尽时返回 0 或负值的 u64)
fn get_remaining(&self) -> u64;
/// 返回当前活跃的 MemoryMapping 的可变指针
/// JIT 代码直接使用此指针进行内存访问,因此按裸指针传递
fn active_mapping_ptr(&mut self) -> ptr::NonNull<MemoryMapping>;
}
函数调用栈帧,在进入嵌套函数调用时由 push_frame() 填写、EXIT 时恢复。
pub struct CallFrame {
/// 调用者保存寄存器(r1-r5,即 SCRATCH_REGS 个)
pub caller_saved_registers: [u64; ebpf::SCRATCH_REGS],
/// 调用者的帧指针(r10)
pub frame_pointer: u64,
/// 函数调用返回后下一条指令的 PC(调用指令的 PC + 1)
pub target_pc: u64,
}
EbpfVm 结构体各字段在内存中的固定偏移量枚举
pub enum RuntimeEnvironmentSlot {
/// 用于 JIT 代码恢复宿主 C 栈
HostStackPointer = offset_of!(EbpfVm<DummyContextObject>, host_stack_pointer) as isize,
/// 当前调用嵌套深度
CallDepth = offset_of!(EbpfVm<DummyContextObject>, call_depth) as isize,
/// 指向 ContextObject 的裸指针
ContextObjectPointer = offset_of!(EbpfVm<DummyContextObject>, context_object_pointer) as isize,
/// 上次同步时的剩余指令预算
PreviousInstructionMeter =
offset_of!(EbpfVm<DummyContextObject>, previous_instruction_meter) as isize,
/// 待提交给 context.consume() 的指令数
DueInsnCount = offset_of!(EbpfVm<DummyContextObject>, due_insn_count) as isize,
/// 性能秒表的 CPU 周期累积量
StopwatchNumerator = offset_of!(EbpfVm<DummyContextObject>, stopwatch_numerator) as isize,
/// 性能秒表的采样次数
StopwatchDenominator = offset_of!(EbpfVm<DummyContextObject>, stopwatch_denominator) as isize,
/// 通用寄存器数组(r0-r10 + PC=r11)
Registers = offset_of!(EbpfVm<DummyContextObject>, registers) as isize,
/// 程序执行结果(成功值或错误)
ProgramResult = offset_of!(EbpfVm<DummyContextObject>, program_result) as isize,
/// 活跃内存映射的指针
MemoryMapping = offset_of!(EbpfVm<DummyContextObject>, memory_mapping) as isize,
/// 寄存器追踪记录
RegisterTrace = offset_of!(EbpfVm<DummyContextObject>, register_trace) as isize,
}
eBPF 虚拟机主结构体
pub struct EbpfVm<'a, C: ContextObject> {
/// JIT 代码返回宿主时恢复的 C 栈指针(仅 JIT 模式使用)
pub host_stack_pointer: *mut u64,
/// 当前函数调用嵌套深度(调用时加一,EXIT 时减一);
/// 达到 max_call_depth 时触发 CallDepthExceeded 错误;
/// 降到 0 时表示最外层函数退出,程序结束
pub call_depth: u64,
/// 指向 ContextObject 的裸指针(在 VM 生命周期内始终有效)
pub(crate) context_object_pointer: ptr::NonNull<C>,
/// 绑定 ContextObject 生命周期(Rust 借用检查器用,运行时无开销)
context_object_lifetime: PhantomData<&'a mut C>,
/// 上次从 context.get_remaining() 读取的剩余指令预算
pub previous_instruction_meter: u64,
/// 待提交给 context.consume() 的已消耗指令数(解释器每步加一,系统调用前提交)
pub due_insn_count: u64,
/// 性能计时器:累积的 CPU 周期数(用于 JIT 性能调试)
pub stopwatch_numerator: u64,
/// 性能计时器:计时器被触发的次数
pub stopwatch_denominator: u64,
/// 通用寄存器内联数组(r0-r10 + r11=PC),JIT 通过偏移量直接访问
pub registers: [u64; 12],
/// 程序执行结果(成功时为 r0,失败时为 EbpfError)
pub program_result: ProgramResult,
/// 活跃内存映射的裸指针(实际存储在 ContextObject 中,此处仅缓存指针)
pub(crate) memory_mapping: ptr::NonNull<MemoryMapping>,
/// 加载器(提供内置函数注册表和配置)
pub loader: Arc<BuiltinProgram<C>>,
/// 寄存器追踪记录(每条指令执行前的寄存器快照,仅在 enable_register_tracing 时填充)
pub register_trace: Vec<RegisterTraceEntry>,
/// 调试器监听端口(环境变量 VM_DEBUG_PORT 配置)
#[cfg(feature = "debugger")]
pub debug_port: Option<u16>,
/// 调试器元数据(程序名称等)
#[cfg(feature = "debugger")]
pub debug_metadata: Option<String>,
}
初始化部分,只看下帧指针要关注.
每个栈帧的默认大小是4096字节(4KB),所以这里是预留第一帧.
registers[ebpf::FRAME_PTR_REG] 是栈帧指针寄存器.
registers[ebpf::FRAME_PTR_REG] =
ebpf::MM_STACK_START.saturating_add(if !sbpf_version.manual_stack_frame_bump() {
config.stack_frame_size // 旧版:预留第一帧
} else {
stack_len // 新版:指向栈末尾
} as u64);
继续看下虚拟机字节码的执行 execute_program.
这里面初始化的时候会一个 self.registers[11] = executable.get_entrypoint_instruction_offset() as u64,
属于第 12 个寄存器,但是在上一章没有说.
因为这对于程序不可见,属于内部使用.用于保存PC的,指示当前程序执行到哪.
先跳过JIT的部分,下一章再讲.
直接调用的 run_interpreter , run_interpreter的内容更简单.主体在后面要讲的interpreter中.
先继续往下,一会再深入讲interpreter.
*mode = ExecutionMode::Interpreted;
// 构建解释器并进入解释执行循环(run_interpreter 内部循环调用 step())
let interpreter = Interpreter::new(self, executable, self.registers, call_frames);
break 'execute run_interpreter(interpreter);
fn run_interpreter<C: ContextObject>(mut interpreter: Interpreter<C>) {
while interpreter.step() {} // step() 返回 false 表示程序结束或出错
}
提交累计的指令消耗量
let instruction_count = if config.enable_instruction_meter {
let due_insn_count = self.due_insn_count;
let context = self.context();
context.consume(due_insn_count); // 提交待扣除量
// 净消耗 = 初始预算 - 剩余预算
initial_insn_count.saturating_sub(context.get_remaining())
} else {
0
};
// swap 出 program_result 避免 Clone(program_result 重置,结果由调用方处理)
let mut result = ProgramResult::Ok(0);
std::mem::swap(&mut result, &mut self.program_result);
(instruction_count, result)
字节码解释器
这部分的内容比较清晰,就挑几个来讲
push_frame 函数调用前调用,保存调用者上下文并进入新栈帧.
fn push_frame(&mut self, config: &Config) -> bool {
let frame = &mut self.call_frames[self.vm.call_depth as usize];
// 保存调用者保存寄存器(r1-r5)
frame.caller_saved_registers.copy_from_slice(
&self.reg[ebpf::FIRST_SCRATCH_REG..ebpf::FIRST_SCRATCH_REG + ebpf::SCRATCH_REGS],
);
// 保存帧指针(r10)
frame.frame_pointer = self.reg[ebpf::FRAME_PTR_REG];
// 保存返回地址(当前 PC + 1)
frame.target_pc = self.reg[11] + 1;
self.vm.call_depth += 1;
if self.vm.call_depth as usize == config.max_call_depth {
throw_error!(self, EbpfError::CallDepthExceeded);
}
if !self.executable.get_sbpf_version().manual_stack_frame_bump() {
// 旧版本:由解释器自动推进帧指针
// 若版本要求栈帧间距,则 num_frames=2(含隔离页)
let num_frames = if self.executable.get_sbpf_version().stack_frame_gaps()
&& config.enable_stack_frame_gaps
{
2
} else {
1
};
let stack_frame_size = config.stack_frame_size * num_frames;
// r10 向高地址偏移一帧大小,进入新帧的栈空间
self.reg[ebpf::FRAME_PTR_REG] =
self.reg[ebpf::FRAME_PTR_REG].wrapping_add(stack_frame_size as u64);
}
true
}
dispatch_syscall 分发系统调用到 Rust 主机函数
fn dispatch_syscall(&mut self, function: BuiltinFunction<C>) -> &ProgramResult {
// 将计数器转换格式,使系统调用内部不额外消耗 BPF 配额
self.vm.due_insn_count = self.vm.previous_instruction_meter - self.vm.due_insn_count;
// 传递调用参数 r0-r5 到 vm.registers
self.vm.registers[0..6].copy_from_slice(&self.reg[0..6]);
// 调用主机端 Rust 函数(内置函数或用户注册的系统调用)
self.vm.invoke_function(function);
// 系统调用本身不消耗 BPF 指令预算,重置计数
self.vm.due_insn_count = 0;
&self.vm.program_result
}
CALL_REG 间接调用,目标 PC 来自寄存器
ebpf::CALL_REG => {
let target_pc = if self.executable.get_sbpf_version().callx_uses_src_reg() {
self.reg[src] // V3+:目标地址存于 src 寄存器
} else if self.executable.get_sbpf_version().callx_uses_dst_reg() {
self.reg[dst] // V2:目标地址存于 dst 寄存器
} else {
self.reg[insn.imm as usize] // V0/V1:目标地址存于 imm 指定的寄存器
};
if !self.push_frame(config) {
return false;
}
// 将虚拟地址转换为指令计数索引(VM 地址减去基址,除以指令大小)
check_pc!(self, next_pc, target_pc.wrapping_sub(self.program_vm_addr) / ebpf::INSN_SIZE as u64);
},
CALL_IMM 直接调用,分为系统调用和程序内函数调用两类
ebpf::CALL_IMM => {
let mut resolved = false;
// 尝试匹配外部系统调用:查 loader 的函数注册表,找到则分发到 Rust 回调
if !self.executable.get_sbpf_version().static_syscalls() || insn.src == 0 {
if let Some((_, (callback, _))) = self.executable.get_loader().get_function_registry().lookup_by_key(insn.imm as u32) {
self.reg[0] = match self.dispatch_syscall(callback) {
ProgramResult::Ok(value) => *value,
ProgramResult::Err(_err) => return false,
};
resolved = true;
}
}
// 尝试匹配内部函数调用:V3+ 用相对偏移定位,旧版查 executable 注册表
if self.executable.get_sbpf_version().static_syscalls() {
// target_pc = next_pc + imm(相对偏移,已含 +1)
let target_pc = (next_pc as i64).saturating_add(insn.imm);
if ebpf::is_pc_in_program(self.program, target_pc as usize) && insn.src == 1 {
if !self.push_frame(config) {
return false;
}
next_pc = target_pc as u64;
resolved = true;
}
} else if let Some((_, target_pc)) =
self.executable
.get_function_registry()
.lookup_by_key(insn.imm as u32) {
if !self.push_frame(config) {
return false;
}
check_pc!(self, next_pc, target_pc as u64);
resolved = true;
}
if !resolved {
throw_error!(self, EbpfError::UnsupportedInstruction);
}
}
EXIT:从当前函数返回。
call_depth=0 时为程序顶层返回:将 r0 写入 program_result,返回 false 结束循环。call_depth>0 时为内部调用返回:从 call_frames 恢复调用者状态后继续。ebpf::EXIT => {
if self.vm.call_depth == 0 {
// 程序顶层退出:最后检查一次指令预算(可能在循环外未检查的最后一条)
if config.enable_instruction_meter && self.vm.due_insn_count > self.vm.previous_instruction_meter {
throw_error!(self, EbpfError::ExceededMaxInstructions);
}
// 将 r0 作为程序返回值写入 program_result
self.vm.program_result = ProgramResult::Ok(self.reg[0]);
return false;
}
// BPF→BPF 函数返回:弹出调用栈帧,恢复调用者寄存器和 PC
self.vm.call_depth -= 1;
let frame = &self.call_frames[self.vm.call_depth as usize];
// 恢复帧指针(r10)
self.reg[ebpf::FRAME_PTR_REG] = frame.frame_pointer;
// 恢复调用者保存寄存器(r1-r5)
self.reg[ebpf::FIRST_SCRATCH_REG
..ebpf::FIRST_SCRATCH_REG + ebpf::SCRATCH_REGS]
.copy_from_slice(&frame.caller_saved_registers);
// 恢复返回地址(调用前保存的 PC+1)
next_pc = frame.target_pc;
} 如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!