Rust

2025年10月15日更新 8 人订阅
原价: ¥ 6 限时优惠
专栏简介 Rust编程语言之错误处理 Rust 语言之 flod Rust编程语言之Cargo、Crates.io详解 Rust编程语言之枚举与模式匹配 Rust语言 - 接口设计的建议之受约束(Constrained) Rust编程语言之无畏并发 Rust语言 - 接口设计的建议之灵活(flexible) Rust语言 - 接口设计的建议之显而易见(Obvious) Rust语言 - 接口设计的建议之不意外(unsurprising) Rust 实战:构建实用的 CLI 工具 HTTPie Rust编程语言学习之高级特性 Rust内存管理揭秘:深度剖析指针与智能指针 解决Rust中数组和切片的编译时大小问题 《Rust编程之道》学习笔记一 Rust Async 异步编程 简易教程 使用 Async Rust 构建简单的 P2P 节点 Rust编程语言入门之模式匹配 Rust async 编程 Rust编程语言之编写自动化测试 Rust编程语言之函数式语言特性:迭代器和闭包 《Rust编程之道》学习笔记二 Rust Tips 比较数值 使用 Rust 开发一个微型游戏 Rust编程初探:深入理解Struct结构体 深入理解Rust中的内存管理:栈、堆与静态内存详解 深入理解 Rust 结构体:经典结构体、元组结构体和单元结构体的实现 深入掌握 Rust 结构体:从模板到实例化的完整指南 深入理解Rust中的结构体:逻辑与数据结合的实战示例 深入理解 Rust 枚举:从基础到实践 掌握Rust字符串的精髓:String与&str的最佳实践 全面解析 Rust 模块系统:实战案例与应用技巧 Rust 中的 HashMap 实战指南:理解与优化技巧 掌握Rust模式匹配:从基础语法到实际应用 Rust 中的面向对象编程:特性与实现指南 深入理解 Rust 的 Pin 和 Unpin:理论与实践解析 Rust Trait 与 Go Interface:从设计到实战的深度对比 从零开始:用 Rust 和 Axum 打造高效 Web 应用 Rust 错误处理详解:掌握 anyhow、thiserror 和 snafu Rust 如何优雅实现冒泡排序 链表倒数 K 节点怎么删?Python/Go/Rust 实战 用 Rust 玩转数据存储:JSON 文件持久化实战 Rust实战:打造高效字符串分割函数 如何高效学习一门技术:从知到行的飞轮效应 Rust 编程入门:Struct 让代码更优雅 Rust 编程:零基础入门高性能开发 用 Rust 写个猜数游戏,编程小白也能上手! Rust 入门教程:变量到数据类型,轻松掌握! 深入浅出 Rust:函数、控制流与所有权核心特性解析 从零开始:用 Rust 和 Axum 打造高效 Web 服务 Rust 集合类型解析:Vector、String、HashMap 深入浅出Rust:泛型、Trait与生命周期的硬核指南 Rust实战:博物馆门票限流系统设计与实现 用 Rust 打造高性能图片处理服务器:从零开始实现类似 Thumbor 的功能 Rust 编程入门实战:从零开始抓取网页并转换为 Markdown 深入浅出 Rust:高效处理二进制数据的 Bytes 与 BytesMut 实战 Rust智能指针:解锁内存管理的进阶之道 用 Rust 打造命令行利器:从零到一实现 mini-grep 解锁Rust代码组织:轻松掌握Package、Crate与Module Rust 所有权:从内存管理到生产力释放 深入解析 Rust 的面向对象编程:特性、实现与设计模式 Rust + Protobuf:从零打造高效键值存储项目 bacon 点燃 Rust:比 cargo-watch 更爽的开发体验 用 Rust 打造微型游戏:从零开始的 Flappy Dragon 开发之旅 函数式编程的Rust之旅:闭包与迭代器的深入解析与实践 探索Rust编程之道:从设计哲学到内存安全的学习笔记 精读《Rust编程之道》:吃透语言精要,彻底搞懂所有权与借用 Rust 避坑指南:搞定数值比较,别再让 0.1 + 0.2 != 0.3 困扰你! 告别 Vec!掌握 Rust bytes 库,解锁零拷贝的真正威力 告别竞态条件:基于 Axum 和 Serde 的 Rust 并发状态管理最佳实践 Rust 异步编程实践:从 Tokio 基础到阻塞任务处理模式 Rust 网络编程实战:用 Tokio 手写一个迷你 TCP 反向代理 (minginx) 保姆级教程:Zsh + Oh My Zsh 终极配置,让你的 Ubuntu 终端效率倍增 不止于后端:Rust 在 Web 开发中的崛起之路 (2024数据解读) Rust核心利器:枚举(Enum)与模式匹配(Match),告别空指针,写出优雅健壮的代码 Rust 错误处理终极指南:从 panic! 到 Result 的优雅之道 想用 Rust 开发游戏?这份超详细的入门教程请收好! 用 Rust 实现 HTTPie:一个现代 CLI 工具的构建过程 Rust 异步实战:从0到1,用 Tokio 打造一个高性能并发聊天室 深入 Rust 核心:彻底搞懂指针、引用与智能指针 Rust 生产级后端实战:用 Axum + sqlx 打造高性能短链接服务 深入 Rust 内存模型:栈、堆、所有权与底层原理 Rust 核心概念解析:引用、借用与内部可变性 掌握 Rust 核心:生命周期与借用检查全解析 Rust 内存布局深度解析:从对齐、填充到 repr 属性 Rust Trait 分派机制:静态与动态的抉择与权衡 Rust Thread::Builder 用法详解:线程命名与栈大小设置 Rust 泛型 Trait:关联类型与泛型参数的核心区别 Rust Scoped Threads 实战:更安全、更简洁的并发编程 Rust 核心设计:孤儿规则与代码一致性解析 Rust 实战:从零构建一个多线程 Web 服务器 Rust Web 开发实战:构建教师管理 API 硬核实战:从零到一,用 Rust 和 Axum 构建高性能聊天服务后端 Rust Web 开发实战:使用 SQLx 连接 PostgreSQL 数据库 硬核入门:从零开始,用 Actix Web 构建你的第一个 Rust REST API (推荐 🔥) Rust 并发编程:详解线程间数据共享的几种核心方法 Rust并发安全基石:Mutex与RwLock深度解析 Rust Web实战:构建优雅的 Actix Web 统一错误处理 煮咖啡里的大学问:用 Rust Async/Await 告诉你如何边烧水边磨豆 深入浅出:Rust 原子类型与多线程编程实践 Rust 并发编程利器:OnceCell 与 OnceLock 深度解析 Rust 懒人编程:LazyCell 与 LazyLock 的惰性哲学 Rust 入门精髓:详解 Enum 的三种魔法,从模式匹配到状态管理 Rust 字符串魔法:String 与 &str 的深度解析与实践 Rust 模块化编程:驾驭代码结构与可见性的三大法则 Rust 实用进阶:深度剖析 Rust 生命周期的奥秘 Rust 智能指针大揭秘:Box、Rc、Arc、Cow 深度剖析与应用实践 Rust 并发编程三步曲:Join、Arc<Mutex> 与 mpsc 通道同步实战 Rust 声明宏实战进阶:从基础定义到 #[macro_export] 与多规则重载 Rust 类型转换实战:利用 From/Into Trait 实现带 Default 容错的安全转换 Rust 实战:实现 FromStr Trait,定制化字符串 parse() 与精确错误报告 Rust 实战:TryFrom Trait——如何在类型转换中强制执行业务逻辑检查 Rust 泛型编程基石:AsRef 和 AsMut 的核心作用与实战应用 揭秘 Rust Unsafe 编程:程序员接管内存安全的契约与实践 Rust FFI 入门:extern、ABI 与底层符号链接解析 Rust性能优化:零内存拷贝的链表合并技术实战 Rust 进阶:用 NonNull 裸指针实现高性能双向链表 O(N) 反转实战 Rust实战:如何用泛型和特性实现一个高性能、通用的插入排序 Rust实战:深度解析二叉搜索树(BST)的实现,掌握泛型与内存安全 用 Rust 优雅实现图搜索核心算法:广度优先搜索 (BFS) 实战 Rust 多线程的高效等待术:park() 与 unpark() 信号通信实战 Rust 并发加速器:用 Condvar 实现线程间“精确握手”与高效等待 Rust 算法精讲:用 DFS 玩转图遍历,从起点“一走到底”的秘密 Rust 并发实战:用 MPSC 通道构建线程安全的“任务指挥中心”

Rust 并发实战:用 MPSC 通道构建线程安全的“任务指挥中心”

Rust并发实战:用MPSC通道构建线程安全的“任务指挥中心”在Rust的并发世界中,消息传递(MessagePassing)是实现线程间安全通信和数据共享的首选方式,它完美契合了Rust“无数据竞争”的设计哲学。其中,MPSC(多生产者,单消费者)通道是构建异步任务处理和线程

Rust 并发实战:用 MPSC 通道构建线程安全的“任务指挥中心”

在 Rust 的并发世界中,消息传递(Message Passing) 是实现线程间安全通信和数据共享的首选方式,它完美契合了 Rust “无数据竞争”的设计哲学。其中,MPSC(多生产者,单消费者) 通道是构建异步任务处理和线程池的基石。

本文将通过两个实战示例,展示如何利用 mpsc::channel 创建一个单工作线程的任务队列。更重要的是,我们将重点升级这个队列,通过引入一个统一的 enum 消息类型,不仅能传递任务执行指令,还能发送明确的 Quit 信号,从而实现对工作线程的优雅、受控的终止,这是生产级并发代码中至关重要的一步。

Rust 多线程 - MPSC 消息传递

消息传递 - MPSC

MPSC

Muti-producer, single-consumer FIFO 队列通信原语

  • 基于消息的通道(channel)通信机制
  • 三种类型
    • Sender
    • SyncSender
    • Receiver

异步通道(asynchronous channel),无限缓冲区的通道

  • channel 函数返回一个 (Sender,Receiver)
  • 发送操作是异步的
  • 概念上拥有无限容量的缓冲区

同步通道(synchronous channel),有界缓冲区的通道

  • sync_channel 函数返回一个(SyncSender, Receiver)
  • 其内部为预先分配好的固定大小的缓冲区
  • 发送操作都是同步的,除非缓冲区还有容量,否则就会阻塞
  • 注意:缓冲区大小允许设为0,通道会变为一种“会合(rendezvous)通道”。就是说每个发送者都会原子地将消息直接交给接收者,就是发送和接收相当于同时发生了。

断开链接(Disconnection)

  • 对通道的 send 和 receive 操作都会返回一个 Result,用来指示操作是否成功。如果操作失败,通常表示通道的另一端已经“挂起”或被释放(drop)。
  • 一旦通道的一半被释放,大多数操作将无法继续进行,因此会返回 ERR。在许多应用中,开发者会继续对这些 Result 调用 unwrap,从而在某个线程意外终止时,引发错误在其它线程中传播。

实操

示例一

Rust 中实现任务队列(Task Queue)单工作线程池的经典示例

利用 mpsc 消息通道实现了主线程与工作线程之间的异步通信和任务分发。

use std::{sync::mpsc, thread};

type Task = Box&lt;dyn FnOnce() + Send + 'static>;

fn hello() {
    println!("Hello, world!");
}

fn main() {
    let (tx, rx) = mpsc::channel::&lt;Task>();
    let handle = thread::spawn(move || {
        while let Ok(task) = rx.recv() {
            task();
        }
    });

    let closure = || println!("Hello, closure!");

    tx.send(Box::new(hello)).unwrap();
    tx.send(Box::new(closure)).unwrap();
    tx.send(Box::new(|| println!("Hello, thread!"))).unwrap();

    handle.join().unwrap();
}

Rust 任务队列代码详细解析

这段代码的核心在于:主线程作为生产者创建任务并发送,而工作线程作为消费者接收并执行这些任务。这种基于消息传递的模式是 Rust 并发编程的标准做法。

1. 任务类型定义 (Task Type)

  • type Task = Box&lt;dyn FnOnce() + Send + 'static>;
    • 这定义了一个类型别名 Task,表示任何可以作为任务被执行的项。
    • dyn FnOnce(): 表示一个只被调用一次的函数或闭包。
    • + Send: 保证这个闭包或函数可以安全地在线程间转移(从主线程发送到工作线程)。
    • + 'static: 意味着闭包捕获的任何数据必须拥有静态生命周期,或已被移动到闭包内部。
    • Box&lt;...>: 将任务动态地分配到堆上,使得所有任务(无论是普通函数还是捕获了不同数据的闭包)都能拥有统一的类型和大小,方便通过通道传输。

2. 通道设置与工作线程(消费者)

  • let (tx, rx) = mpsc::channel::&lt;Task>();: 创建了一个 mpsc (多生产者,单消费者) 通道。tx 是发送端(Transmitter),rx 是接收端(Receiver)。
  • let handle = thread::spawn(move || { ... });: 启动了一个新的工作线程。move 关键字将接收端 rx 的所有权转移给这个新线程。
  • while let Ok(task) = rx.recv() { task(); }: 这是工作线程的核心循环。
    • rx.recv(): 线程会在这里阻塞并等待新任务。只要发送端 tx 仍然存在,线程就不会消耗 CPU 资源(非忙等)。
    • 当成功接收到任务 task 后,立即通过 task() 执行。
    • 退出机制: 当主线程(或所有 tx 克隆)被丢弃时,通道关闭,rx.recv() 将返回 Err,循环结束,工作线程随后退出。

3. 主线程与任务发送(生产者)

  • tx.send(Box::new(hello)).unwrap();: 主线程通过 tx 发送了三种不同类型的任务:
    1. 普通函数hello
    2. 命名闭包closure
    3. 匿名闭包|| println!("Hello, thread!")
  • 它们都被 Box::new() 包装成统一的 Task 类型,安全地发送给了工作线程。

4. 线程同步与清理

  • handle.join().unwrap();: 这是确保程序正确执行的关键。主线程会在这里阻塞,等待工作线程(handle)运行结束。由于工作线程只有在通道关闭且所有任务执行完毕后才会退出,因此 join() 保证了所有任务都会被执行,程序不会提前终止。

这段代码是构建 Rust 中线程池和并发服务的基础,它优雅地展示了如何利用 消息通道 来解耦任务的创建和执行过程。

运行

➜ cargo run     
   Compiling mpsc v0.1.0 (/Users/qiaopengjun/Code/Rust/RustJourney/mpsc)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.72s
     Running `target/debug/mpsc`
Hello, world!
Hello, closure!
Hello, thread!

运行输出 (Output) 解释

输出 Hello, world!Hello, closure!Hello, thread! 这三行文本,证明了以下几个关键步骤的顺利完成:

  1. 任务发送成功(生产者): 主线程成功地将三个不同来源的任务(一个函数、一个命名闭包、一个匿名闭包)封装成统一的 Task 类型,并通过发送端 tx 依次传递到了通道中。
  2. 工作线程接收并执行(消费者): 子线程(工作线程)在 while let Ok(task) = rx.recv() 循环中被唤醒三次。每接收到一个任务,它就立即调用 task() 执行。
  3. 任务队列执行顺序: 在这个简单的示例中,任务是按照它们被发送到通道的顺序(FIFO,先入先出)被工作线程接收和执行的,因此输出的顺序与发送顺序一致。
  4. 优雅退出: 所有任务执行完毕后,主线程(在 main 函数末尾)没有显式地丢弃 tx,但程序运行到 main 结束时,所有局部变量(包括 tx)都会被自动销毁。这导致通道关闭,工作线程中的 rx.recv() 返回 Err,循环终止,工作线程退出。
  5. 同步等待 (join) 的重要性: handle.join().unwrap() 确保了主线程会一直等待,直到工作线程处理完所有任务并自然终止,从而保证了所有的 println! 语句都能在程序结束前被执行。

这个结果有力地证明了 mpsc 通道在 Rust 中作为线程安全、高效的任务分发机制的有效性。它实现了生产者和消费者之间清晰的职责分离,并且避免了资源浪费(工作线程在无任务时休眠)。

示例二

use std::{sync::mpsc, thread};

type Task = Box&lt;dyn FnOnce() + Send + 'static>;

enum Msg {
    Call(Task),
    Quit,
}

fn hello() {
    println!("Hello, world!");
}

fn main() {
    let (tx, rx) = mpsc::channel::&lt;Msg>();
    let handle = thread::spawn(move || {
        while let Ok(msg) = rx.recv() {
            match msg {
                Msg::Call(task) => task(),
                Msg::Quit => break,
            }
        }
    });

    let closure = || println!("Hello, closure!");

    // tx.send(Box::new(hello)).unwrap();
    // tx.send(Box::new(closure)).unwrap();
    // tx.send(Box::new(|| println!("Hello, thread!"))).unwrap();

    tx.send(Msg::Call(Box::new(hello))).unwrap();
    tx.send(Msg::Call(Box::new(closure))).unwrap();
    tx.send(Msg::Call(Box::new(|| println!("Hello, thread!"))))
        .unwrap();
    tx.send(Msg::Quit).unwrap();

    handle.join().unwrap();
}

这段 Rust 代码是对先前任务队列模式的升级版本,它引入了自定义的 enum 消息类型 (Msg),从而实现了对工作线程的显式、受控的终止

显式终止的任务队列代码详细解析

这段代码的核心目标是利用 mpsc 通道在主线程和工作线程之间传递两种指令:执行任务或安全退出。

1. 任务和指令的统一封装 (Msg Enum)

  • type Task = Box&lt;dyn FnOnce() + Send + 'static>;: 任务类型定义与之前相同,代表一个可跨线程发送、只执行一次的函数或闭包。
  • enum Msg { Call(Task), Quit }: 这是关键改进。它定义了一个联合体 Msg,将所有可能的通道传输内容封装起来:
    • Msg::Call(Task): 携带实际要执行的任务。
    • Msg::Quit: 一个明确的终止信号

2. 工作线程(消费者)的受控退出

  • 工作线程仍通过 thread::spawn 启动,并拥有接收端 rx
  • while let Ok(msg) = rx.recv() { ... }: 循环阻塞等待消息。
  • match msg { ... }: 接收到消息后,线程会根据消息类型进行分支处理:
    • 如果是 Msg::Call(task),则执行任务 task()
    • 如果是 Msg::Quit,则执行 break,跳出 while 循环,工作线程优雅地终止。

3. 主线程(生产者)的发送与控制

  • 主线程将三个任务(hello 函数、closure 命名闭包、匿名闭包)封装在 Msg::Call(...) 中并发送。
  • tx.send(Msg::Quit).unwrap();: 在发送完所有任务后,主线程显式地发送了一个 Quit 信号
    • 重要区别: 在第一个版本中,工作线程的退出依赖于 tx 被丢弃导致的通道关闭(隐式退出)。在这个版本中,工作线程的退出是由主线程发出的 Msg::Quit 信号(显式退出)驱动的,这提供了更强的控制力,尤其是在多发送者场景下,能确保线程在恰当的时机停止。

4. 线程同步

  • handle.join().unwrap();: 主线程等待工作线程处理完所有任务和 Quit 信号后安全关闭,确保程序在所有任务执行完毕后才结束。

总结来说,这段代码通过引入 Msg 枚举,将任务执行指令和线程控制指令(退出)整合到一个通道中,实现了比隐式通道关闭更健壮、更可控的任务队列和工作线程管理模式。

运行

➜ cargo run
   Compiling mpsc v0.1.0 (/Users/qiaopengjun/Code/Rust/RustJourney/mpsc)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.62s
     Running `target/debug/mpsc`
Hello, world!
Hello, closure!
Hello, thread!

这段运行输出的结果 Hello, world!Hello, closure!Hello, thread!,完美展示了受控退出(Graceful Shutdown)的任务队列机制成功执行了所有步骤。

运行输出 (Output) 解释

这个结果表明主线程和工作线程通过 Msg 枚举实现了高效且有明确控制的通信:

  1. 任务执行成功: 工作线程(消费者)从通道中依次接收到了三个 Msg::Call(Task) 消息,并按照发送顺序,成功执行了函数和闭包中的 println! 语句,完成了所有任务。
  2. 显式安全退出: 在所有任务发送完毕后,主线程发送了第四条消息 Msg::Quit。当工作线程接收到此信号后,match 表达式中的分支触发了 break 语句,使工作线程跳出了 while 循环并自然结束。
  3. 最终同步: 主线程的 handle.join().unwrap() 机制等待工作线程完成退出流程后才允许 main 函数结束。

与依赖于通道关闭的隐式退出机制相比,这个结果证明了通过 Msg::Quit 信号实现了更健壮、可预测的线程管理,这是构建更复杂、多发送者线程池时必须采用的同步模式。

总结

通过本次对 mpsc 通道两个示例的实战分析,我们掌握了 Rust 任务队列的核心模式。

示例一展示了 mpsc 的基础用法,其退出依赖于通道在所有发送端被丢弃时的隐式关闭。而 示例二通过引入 enum Msg { Call(Task), Quit } 结构,将任务和控制指令统一封装,实现了显式退出。这种模式的价值在于:即使在多个发送者(tx clone)场景下,我们依然可以精确控制工作线程何时停止,这是构建健壮、高性能线程池的关键。掌握这种基于消息的并发控制,你就掌握了 Rust 应对复杂多线程场景的利器。

参考

点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论