Rust并行加速:4个实操案例,深度解析Rayon线程池的Fork-Join与广播机制在现代软件开发中,充分利用多核CPU的并行计算能力是提升应用性能的关键。Rust语言通过其零成本抽象和所有权系统,在并发编程方面提供了卓越的安全保障。而Rayon库,作为Rust生态中最
在现代软件开发中,充分利用多核 CPU 的并行计算能力是提升应用性能的关键。Rust 语言通过其零成本抽象和所有权系统,在并发编程方面提供了卓越的安全保障。而 Rayon 库,作为 Rust 生态中最受欢迎的并行数据处理工具,更是将复杂的多线程编程简化为几行代码。
本文将通过 4 个精选的 Rayon 线程池实操案例,从最基本的矩阵求和到高级的线程广播和 join
模式,深入浅出地解释 Rayon 如何在底层实现高效的任务并行(Task Parallelism)、保证结构化同步(Structured Concurrency),并揭示其并行输出的非确定性特征。无论您是 Rust 初学者还是希望进一步优化 CPU 密集型任务的开发者,都将从这些实战示例中获益。
Rust 多线程 - Rayon
fn main() {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(4)
.build()
.unwrap();
let matrix = [
vec![1, 2, 3],
vec![4, 5, 6],
vec![7, 8, 9],
vec![10, 11, 12],
];
pool.scope(|scope| {
for (i, row) in matrix.iter().enumerate() {
scope.spawn(move |_| {
let sum: i32 = row.iter().sum();
println!("Row {i} sum = {sum}");
});
}
});
println!("Main thread finished");
}
这段 Rust 代码使用了 Rayon 库来创建一个自定义的线程池并执行并行任务。
代码的详细解释如下:
创建线程池:
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(4)
.build()
.unwrap();
这行代码使用 rayon::ThreadPoolBuilder
构建了一个名为 pool
的自定义线程池。
.num_threads(4)
指定线程池中包含 4 个工作线程。.build()
尝试创建线程池。.unwrap()
处理可能出现的错误(例如线程创建失败),如果成功则返回 ThreadPool
实例。定义数据:
let matrix = [
vec![1, 2, 3],
vec![4, 5, 6],
vec![7, 8, 9],
vec![10, 11, 12],
];
定义了一个包含 4 个向量的数组 matrix
,可以将其视为一个 4 X 3 的矩阵。
使用作用域(Scoped Task)执行并行任务:
pool.scope(|scope| {
// ... 任务定义 ...
});
pool.scope(|scope| { ... })
创建了一个 "fork-join" 作用域。在这个作用域内部(即闭包 { ... }
内部)可以安全地启动并发任务,这些任务可以借用外部栈上的局部变量(例如 matrix
)。关键点在于,当 scope
闭包返回时,程序会阻塞,直到所有通过 scope.spawn()
启动的任务都完成。
分发任务:
for (i, row) in matrix.iter().enumerate() {
scope.spawn(move |_| {
let sum: i32 = row.iter().sum();
println!("Row {i} sum = {sum}");
});
}
matrix.iter().enumerate()
遍历 matrix
数组,同时获取行索引 i
和行数据 row
(一个向量的引用 &Vec<i32>
)。scope.spawn(move |_| { ... })
在线程池中创建一个新的异步任务。
move
关键字确保闭包获得了它所使用的变量(这里是 i
和 row
)的所有权或所有必要的拷贝。由于 row
是对 matrix
元素的引用,Rayon 的 scoped task 机制保证了在任务完成之前 matrix
不会被释放,从而使这个引用是安全的。row.iter().sum()
),并打印结果。主线程继续执行:
println!("Main thread finished");
这行代码会在 pool.scope(|...| { ... })
完成(即所有并行任务都执行完毕)之后,由主线程执行并打印出来,这证明了 scoped task 机制确保了任务的完成性。
总结:
这段代码利用 Rayon 库创建了一个 4 线程的线程池,然后使用一个 scoped task 机制将一个矩阵的 4 行数据的求和任务 分发给线程池中的线程进行 并行计算。程序保证在打印 "Main thread finished" 之前,所有行的求和任务都已经完成并打印了各自的结果。这是一种典型的 Fork-Join 并行模式的实现。
RustJourney/rayon_examples on main [?] is 📦 0.1.0 via 🦀 1.90.0 took 2.7s
➜ cargo run
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.00s
Running `target/debug/rayon_examples`
Row 3 sum = 33
Row 1 sum = 15
Row 2 sum = 24
Row 0 sum = 6
Main thread finished
RustJourney/rayon_examples on main [?] is 📦 0.1.0 via 🦀 1.90.0
➜ cargo run
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.01s
Running `target/debug/rayon_examples`
Row 3 sum = 33
Row 0 sum = 6
Row 1 sum = 15
Row 2 sum = 24
Main thread finished
这段运行结果清晰地展示了 Rayon 库进行并行计算的两个关键特性:并行性(Parallelism) 和 确定性同步(Deterministic Synchronization)。
Row 0
到 Row 3
的和。Row 0 sum
到 Row 3 sum
的输出顺序是不同的(第一次是 3, 1, 2, 0;第二次是 3, 0, 1, 2)。这正是使用 Rayon 线程池 进行并行计算的直接体现。由于这四个求和任务是并发执行的,它们完成的顺序取决于操作系统对线程的调度以及线程池中 4 个工作线程的可用性,因此 输出顺序是不确定的。Main thread finished
总是最后打印。pool.scope(|...| { ... })
机制的有效性:主线程会阻塞并等待 scope
闭包内所有通过 scope.spawn()
启动的并行任务(即所有行的求和任务)全部完成后,才会继续执行 scope
之后的代码 (println!("Main thread finished")
)。这确保了主程序的正确同步,即所有并行工作都已完成。fn main() {
let outer_pool = rayon::ThreadPoolBuilder::new()
.num_threads(2)
.build()
.unwrap();
outer_pool.scope(|scope| {
for stage in 0..2 {
scope.spawn(move |_scope| {
println!("Stage {stage} started.");
let inner_pool = rayon::ThreadPoolBuilder::new()
.num_threads(2)
.build()
.unwrap();
inner_pool.scope(|inner_scope| {
for task in 0..2 {
inner_scope.spawn(move |_inner_scope| {
println!("\t-> Inner task {task} of stage {stage} started.");
});
}
});
println!("\t-> Stage {stage} completed.");
});
}
});
println!("-> All stages completed.");
}
这段 Rust 代码演示了 Rayon 线程池的嵌套使用,但其实现方式在性能上是低效且不推荐的,因为它在并行任务内部反复创建新的线程池。
外部线程池创建:
代码首先创建了一个名为 outer_pool 的 2 线程 Rayon 线程池。这个线程池用于执行顶层的并行任务。
外部作用域(Outer Scope):
outer_pool.scope(|scope| { ... }) 创建了一个外部 "fork-join" 作用域。在这个作用域内,代码通过循环执行了两次 scope.spawn(),启动了 2 个并行任务,分别对应 stage 0 和 stage 1。
内部任务逻辑(低效部分):
在每个 stage 的任务内部,代码执行了以下操作:
Stage {stage} started.
。inner_pool
的 新的 2 线程 Rayon 线程池。inner_pool.scope(|inner_scope| { ... })
再次创建了一个作用域,并在其中启动了 2 个更小的并行任务(inner task 0
和 inner task 1
)。inner_pool.scope
会阻塞,直到这两个内部任务完成并打印 -> Inner task ... started.
。inner_pool
被销毁(当 stage
任务结束时),然后打印 -> Stage {stage} completed.
。同步机制:
最外层的 outer_pool.scope 会阻塞主线程,直到 stage 0 和 stage 1 这两个并行任务全部完成。当所有工作完成后,主线程才会打印 -> All stages completed.。
这段代码的核心问题在于它没有利用 Rayon 的工作窃取(Work Stealing)机制。Rayon 的设计宗旨是使用一个全局线程池,通过 rayon::scope
或并行迭代器 (par_iter
) 在这个单一线程池内高效地调度任务。
然而,这段代码的实现方式是:
Stage 0
任务启动后,它会在 outer_pool
的一个线程上运行。inner_pool
)来处理内部任务。outer_pool
2 个 + 两个 inner_pool
各 2 个),这造成了额外的线程创建和销毁开销,浪费了资源。正确的 Rayon 实践是在一个线程池内部,直接使用 rayon::scope
或 rayon::spawn
来分发任务,而不是在任务内部创建新的线程池。
RustJourney/rayon_examples on main [?] is 📦 0.1.0 via 🦀 1.90.0
➜ cargo run
Compiling rayon_examples v0.1.0 (/Users/qiaopengjun/Code/Rust/RustJourney/rayon_examples)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.63s
Running `target/debug/rayon_examples`
Stage 1 started.
Stage 0 started.
-> Inner task 1 of stage 1 started.
-> Inner task 0 of stage 1 started.
-> Inner task 1 of stage 0 started.
-> Inner task 0 of stage 0 started.
-> Stage 0 completed.
-> Stage 1 completed.
-> All stages completed.
RustJourney/rayon_examples on main [?] is 📦 0.1.0 via 🦀 1.90.0
➜ cargo run
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.06s
Running `target/debug/rayon_examples`
Stage 1 started.
Stage 0 started.
-> Inner task 1 of stage 1 started.
-> Inner task 0 of stage 1 started.
-> Inner task 1 of stage 0 started.
-> Inner task 0 of stage 0 started.
-> Stage 0 completed.
-> Stage 1 completed.
-> All stages completed.
这段运行结果清晰地展示了 Rayon 线程池的并行和同步机制,即使是在这种不推荐的线程池嵌套场景中。
顶层任务并行性(不确定顺序):
两次运行的输出都以 Stage 1 started. 和 Stage 0 started. 交替开始,例如第一次运行是 Stage 1 先于 Stage 0 启动。这表明最外层 outer_pool 将 stage 0 和 stage 1 任务作为并行任务分发给了其两个工作线程,它们的启动顺序是不确定的,体现了并发执行。
内部任务并行性:
一旦某个 stage 启动,它会立即在其内部创建并激活一个新的 inner_pool,然后并行地启动 inner task 0 和 inner task 1。因此,可以看到来自不同 Stage 的内部任务(如 Inner task 1 of stage 1 和 Inner task 1 of stage 0)的启动信息是混合交错在一起的,证实了它们也是并发执行的。
Scoped Task 的同步保证(确定性完成):
尽管所有的 started 消息都是不确定的交错输出,但程序的完成顺序是严格确定的:
inner_pool.scope
保证其内部的两个 Inner task
结束后,才能打印相应的 -> Stage X completed.
。outer_pool.scope
保证所有 Stage
任务(Stage 0
和 Stage 1
)都打印了 completed
消息后,主线程才会继续执行,最终打印 -> All stages completed.
。因此,结果表明:并行任务的执行顺序是随机的,但 Rayon 的 "fork-join" 作用域机制保证了程序会等待所有子任务完成后,才允许流程进入下一阶段,从而实现正确的程序同步。
线程广播
fn main() {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(4)
.build()
.unwrap();
pool.scope(|scope| {
scope.spawn_broadcast(|_scope, ctx| {
let id = ctx.index();
println!("Thread {id}.");
});
});
}
这段 Rust 代码利用 Rayon 库展示了线程广播(Thread Broadcast)这一高级功能,它用于在自定义线程池的所有工作线程上运行相同的任务,通常用于线程本地初始化或状态同步。
线程池创建:
代码首先使用 rayon::ThreadPoolBuilder::new().num_threads(4).build().unwrap()
创建了一个包含 4 个 工作线程的自定义线程池 pool。
Scoped Task(作用域):
pool.scope(|scope| { ... })
创建了一个 "fork-join" 作用域,确保在主线程继续执行之前,作用域内所有派生的并行任务都将完成。
线程广播任务:
核心是 scope.spawn_broadcast(|_scope, ctx| { ... })
。这个方法不是像 spawn 那样创建一个任务让任一空闲线程去执行,而是特意为线程池中的 每一个 工作线程都安排一个相同的任务去执行。
BroadcastContext
结构体 ctx
。ctx.index()
方法会返回当前正在执行此广播任务的线程在线程池中的唯一索引(从 0 到线程数减 1)。总结:
这段代码的功能是:在创建的 4 线程线程池的每个线程上运行一个任务,每个任务打印出自己线程的索引。因此,代码的运行结果会打印出 4 行消息,内容是 Thread 0.
、Thread 1.
、Thread 2.
、Thread 3.
,但由于并行执行,这 4 行的输出顺序是不确定的。这个模式非常适合用于在并行工作开始前,对每个 Rayon 工作线程的本地状态进行设置或进行一次性操作。
➜ cargo run
Compiling rayon_examples v0.1.0 (/Users/qiaopengjun/Code/Rust/RustJourney/rayon_examples)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.57s
Running `target/debug/rayon_examples`
Thread 0.
Thread 1.
Thread 3.
Thread 2.
这段运行结果完美地证实了 scope.spawn_broadcast()
线程广播功能的行为以及 Rayon 的并行特性。
spawn_broadcast
精确地在线程池的每个工作线程上执行了一次任务。因此,程序输出了 4 条 Thread X.
消息,分别对应线程索引 0 到 3,这证明了广播任务确实在所有线程上运行了。0, 1, 3, 2
,而不是严格的升序 0, 1, 2, 3
,这充分体现了 Rayon 在多个核心上并行执行任务时,任务完成顺序的非确定性。结论: 运行结果表明,线程广播任务在线程池的所有 4 个线程上都成功执行,并且输出顺序的不确定性是并发编程的典型特征。
线程池 JOIN
fn main() {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(4)
.build()
.unwrap();
let func = || println!("Hello, world!");
pool.join(func, func);
}
这段 Rust 代码利用 Rayon 库展示了结构化并行中的 "分叉-汇合" (Fork-Join) 模式,旨在高效地并行执行两个独立的任务。
线程池初始化:
代码首先创建了一个名为 pool 的自定义 Rayon 线程池,并使用 .num_threads(4) 明确指定该线程池拥有 4 个 工作线程。这确保了后续的并行任务会在这个受控的环境中执行。
定义任务:
定义了一个简单的闭包 func,其唯一的副作用是打印 "Hello, world!"。
分叉-汇合操作:
核心操作是 pool.join(func, func)
。join 是 Rayon 提供的最基本且最高效的并行操作之一,它将两个闭包(oper_a 和 oper_b)作为参数:
join
函数会尝试同时启动这两个任务。具体来说,当前调用 join
的线程会立即执行第一个闭包 (func
),同时将第二个闭包 (func
) 作为一个新的并行任务提交给线程池。join
是一个阻塞式调用。它会一直等待,直到这两个闭包(无论它们在哪个线程上执行)都彻底完成并返回结果后,join
才会返回一个包含两个闭包返回值的元组。总结:
这段代码通过自定义的 4 线程 Rayon 线程池,并行执行了两次打印 "Hello, world!" 的操作。由于这两个任务是并发运行的,它们在控制台输出的顺序将是不确定和交错的(但最终会输出两次 "Hello, world!")。pool.join() 确保了主线程会等待这两个并行任务完成后,程序才继续向下或结束。这种模式常用于递归算法(如快速排序)的分治并行化,具有极高的效率,因为它主要利用栈分配来管理任务,避免了复杂的堆分配开销。
➜ cargo run
Compiling rayon_examples v0.1.0 (/Users/qiaopengjun/Code/Rust/RustJourney/rayon_examples)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.45s
Running `target/debug/rayon_examples`
Hello, world!
Hello, world!
这段运行结果是前述 Rust 代码执行 pool.join(func, func)
的直接体现,结果是程序输出了两行 "Hello, world!"
。
pool.join(func, func)
机制启动了两个完全相同的任务(即两次调用打印 "Hello, world!"
的闭包 func
),并在创建的 4 线程 Rayon 线程池中并行执行。join
操作保证了主程序会等待这两个并行任务都完成,因此确保了两次 "Hello, world!"
消息都成功输出。"Hello, world!"
。结论: 运行结果证实了 rayon::join
成功地在线程池中并行执行了两个任务,并且主线程在所有并行工作完成后才结束。
通过对这四个 Rayon 核心 API 的实操和结果分析,我们深刻理解了 Rayon 在 Rust 多线程编程中的强大和优雅:
scope
和 join
): Rayon 的 scope
和 join
API 实现了经典的 Fork-Join 模式。它保证了父任务在所有子并行任务彻底完成之前不会结束,从而消除了传统线程中常见的生命周期和数据安全问题。scope.spawn()
还是 spawn_broadcast
,任务在线程池中的执行顺序都由操作系统调度和 Rayon 的工作窃取机制决定,因此输出顺序是随机的,这是并行编程的固有特征。ThreadPoolBuilder
允许我们精确控制线程池大小;spawn_broadcast
则提供了在每个工作线程上执行一次任务的独特能力,非常适合复杂的线程本地状态初始化。Rayon 使得 Rust 开发者能够以安全、高效且易于理解的方式,释放现代多核 CPU 的全部潜能,是进行 CPU 密集型任务优化的首选工具。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!