Rust

2025年07月14日更新 7 人订阅
原价: ¥ 6 限时优惠
专栏简介 Rust编程语言之错误处理 Rust 语言之 flod Rust编程语言之Cargo、Crates.io详解 Rust编程语言之枚举与模式匹配 Rust语言 - 接口设计的建议之受约束(Constrained) Rust编程语言之无畏并发 Rust语言 - 接口设计的建议之灵活(flexible) Rust语言 - 接口设计的建议之显而易见(Obvious) Rust语言 - 接口设计的建议之不意外(unsurprising) Rust 实战:构建实用的 CLI 工具 HTTPie Rust编程语言学习之高级特性 Rust内存管理揭秘:深度剖析指针与智能指针 解决Rust中数组和切片的编译时大小问题 《Rust编程之道》学习笔记一 Rust Async 异步编程 简易教程 使用 Async Rust 构建简单的 P2P 节点 Rust编程语言入门之模式匹配 Rust async 编程 Rust编程语言之编写自动化测试 Rust编程语言之函数式语言特性:迭代器和闭包 《Rust编程之道》学习笔记二 Rust Tips 比较数值 使用 Rust 开发一个微型游戏 Rust编程初探:深入理解Struct结构体 深入理解Rust中的内存管理:栈、堆与静态内存详解 深入理解 Rust 结构体:经典结构体、元组结构体和单元结构体的实现 深入掌握 Rust 结构体:从模板到实例化的完整指南 深入理解Rust中的结构体:逻辑与数据结合的实战示例 深入理解 Rust 枚举:从基础到实践 掌握Rust字符串的精髓:String与&str的最佳实践 全面解析 Rust 模块系统:实战案例与应用技巧 Rust 中的 HashMap 实战指南:理解与优化技巧 掌握Rust模式匹配:从基础语法到实际应用 Rust 中的面向对象编程:特性与实现指南 深入理解 Rust 的 Pin 和 Unpin:理论与实践解析 Rust Trait 与 Go Interface:从设计到实战的深度对比 从零开始:用 Rust 和 Axum 打造高效 Web 应用 Rust 错误处理详解:掌握 anyhow、thiserror 和 snafu Rust 如何优雅实现冒泡排序 链表倒数 K 节点怎么删?Python/Go/Rust 实战 用 Rust 玩转数据存储:JSON 文件持久化实战 Rust实战:打造高效字符串分割函数 如何高效学习一门技术:从知到行的飞轮效应 Rust 编程入门:Struct 让代码更优雅 Rust 编程:零基础入门高性能开发 用 Rust 写个猜数游戏,编程小白也能上手! Rust 入门教程:变量到数据类型,轻松掌握! 深入浅出 Rust:函数、控制流与所有权核心特性解析 从零开始:用 Rust 和 Axum 打造高效 Web 服务 Rust 集合类型解析:Vector、String、HashMap 深入浅出Rust:泛型、Trait与生命周期的硬核指南 Rust实战:博物馆门票限流系统设计与实现 用 Rust 打造高性能图片处理服务器:从零开始实现类似 Thumbor 的功能 Rust 编程入门实战:从零开始抓取网页并转换为 Markdown 深入浅出 Rust:高效处理二进制数据的 Bytes 与 BytesMut 实战 Rust智能指针:解锁内存管理的进阶之道 用 Rust 打造命令行利器:从零到一实现 mini-grep 解锁Rust代码组织:轻松掌握Package、Crate与Module Rust 所有权:从内存管理到生产力释放 深入解析 Rust 的面向对象编程:特性、实现与设计模式 Rust + Protobuf:从零打造高效键值存储项目 bacon 点燃 Rust:比 cargo-watch 更爽的开发体验 用 Rust 打造微型游戏:从零开始的 Flappy Dragon 开发之旅 函数式编程的Rust之旅:闭包与迭代器的深入解析与实践 探索Rust编程之道:从设计哲学到内存安全的学习笔记 精读《Rust编程之道》:吃透语言精要,彻底搞懂所有权与借用 Rust 避坑指南:搞定数值比较,别再让 0.1 + 0.2 != 0.3 困扰你! 告别 Vec!掌握 Rust bytes 库,解锁零拷贝的真正威力 告别竞态条件:基于 Axum 和 Serde 的 Rust 并发状态管理最佳实践 Rust 异步编程实践:从 Tokio 基础到阻塞任务处理模式 Rust 网络编程实战:用 Tokio 手写一个迷你 TCP 反向代理 (minginx) 保姆级教程:Zsh + Oh My Zsh 终极配置,让你的 Ubuntu 终端效率倍增 不止于后端:Rust 在 Web 开发中的崛起之路 (2024数据解读) Rust核心利器:枚举(Enum)与模式匹配(Match),告别空指针,写出优雅健壮的代码 Rust 错误处理终极指南:从 panic! 到 Result 的优雅之道 想用 Rust 开发游戏?这份超详细的入门教程请收好! 用 Rust 实现 HTTPie:一个现代 CLI 工具的构建过程 Rust 异步实战:从0到1,用 Tokio 打造一个高性能并发聊天室

Rust 异步实战:从0到1,用 Tokio 打造一个高性能并发聊天室

Rust异步实战:从0到1,用Tokio打造一个高性能并发聊天室你是否曾对Discord、Slack这类高并发即时通讯应用的底层技术感到好奇?或者在学习Rust时,面对强大的Tokio异步运行时,感觉理论知识丰富,却不知如何下手实践?别担心!本文将是一篇极致的实战指南,我们将告

Rust 异步实战:从0到1,用 Tokio 打造一个高性能并发聊天室

你是否曾对 Discord、Slack 这类高并发即时通讯应用的底层技术感到好奇?或者在学习 Rust 时,面对强大的 Tokio 异步运行时,感觉理论知识丰富,却不知如何下手实践?

别担心!本文将是一篇极致的实战指南,我们将告别枯燥的理论。通过从零开始、一步步构建一个功能完善的 TCP 聊天服务器,你不仅能深入理解 Tokio 的核心工作模式,还将学会如何利用 tokio-console 对异步任务进行可视化调试,甚至使用 loom 这一并发测试神器来验证代码的线程安全性。

准备好了吗?让我们一起动手,用代码真正“看见”并征服 Rust 异步世界!

🚀 本文将带你解锁

  • tokio_util
  • tokio_stream
  • 写一个简单的 TCP Chat Server
    • client 连接:添加到全局状态
    • 创建 peer
    • 通知所有小伙伴
    • client 断连:从全局状态删除
    • 通知所有小伙伴
    • client 发消息
    • 广播
  • tokio-console

实操

Chat.rs 文件

use std::{fmt, net::SocketAddr, sync::Arc};

use anyhow::Result;
use dashmap::DashMap;
use futures::{SinkExt, StreamExt, stream::SplitStream};
use tokio::{
    net::{TcpListener, TcpStream},
    sync::mpsc,
};
use tokio_util::codec::{Framed, LinesCodec};
use tracing::{info, level_filters::LevelFilter, warn};
use tracing_subscriber::{Layer as _, fmt::Layer, layer::SubscriberExt, util::SubscriberInitExt};

const MAX_MESSAGES: usize = 128;

#[derive(Debug, Default)]
struct State {
    peers: DashMap<SocketAddr, mpsc::Sender<Arc<Message>>>,
}

#[derive(Debug)]
struct Peer {
    username: String,
    stream: SplitStream<Framed<TcpStream, LinesCodec>>,
}

#[derive(Debug)]
enum Message {
    UserJoined(String),
    UserLeft(String),
    Chat { sender: String, content: String },
}

#[tokio::main]
async fn main() -> Result<()> {
    let layer = Layer::new().with_filter(LevelFilter::INFO);
    tracing_subscriber::registry().with(layer).init();

    let addr = "0.0.0.0:8080";
    let listener = TcpListener::bind(addr).await?;
    info!("Listening on {}", addr);

    let state = Arc::new(State::default());

    loop {
        let (stream, addr) = listener.accept().await?;
        info!("Accepted connection from {}", addr);
        let state_cloned = state.clone();
        tokio::spawn(async move {
            if let Err(e) = handle_client(state_cloned, addr, stream).await {
                warn!("Failed to handle client {}: {}", addr, e);
            }
        });
    }
}

async fn handle_client(state: Arc<State>, addr: SocketAddr, stream: TcpStream) -> Result<()> {
    let mut stream = Framed::new(stream, LinesCodec::new());
    // 按帧发送的, LinesCodec 会在每行末尾加上 \n
    stream
        .send("Welcome to the chat! Please enter your username:")
        .await?;

    let username = match stream.next().await {
        Some(Ok(username)) => username,
        Some(Err(e)) => {
            warn!("Failed to receive username from {}: {}", addr, e);
            return Err(e.into());
        }
        None => {
            warn!("Client {} disconnected before sending username", addr);
            return Ok(());
        }
    };

    let mut peer = state.add(addr, username, stream).await;

    // notify others that a new peer has joined
    let message = Arc::new(Message::user_joined(&peer.username));
    state.broadcast(addr, message).await;

    while let Some(line) = peer.stream.next().await {
        let line = match line {
            Ok(line) => line,
            Err(err) => {
                warn!("Failed to receive message from {}: {}", addr, err);
                break;
            }
        };

        let message = Arc::new(Message::chat(&peer.username, line));
        state.broadcast(addr, message).await;
    }

    // when while loop exit, peer has left the chat or line reading failed
    // remove peer from state
    state.peers.remove(&addr);

    // notify others that peer has left the chat
    let message = Arc::new(Message::user_left(&peer.username));
    state.broadcast(addr, message).await;

    Ok(())
}

impl State {
    async fn broadcast(&self, addr: SocketAddr, message: Arc<Message>) {
        for peer in self.peers.iter() {
            if peer.key() == &addr {
                continue;
            }
            if let Err(e) = peer.value().send(message.clone()).await {
                warn!("Failed to send message to {}: {}", peer.key(), e);
                // Remove the peer from the state if it's no longer reachable
                self.peers.remove(peer.key());
            }
        }
    }

    async fn add(
        &self,
        addr: SocketAddr,
        username: String,
        stream: Framed<TcpStream, LinesCodec>,
    ) -> Peer {
        let (tx, mut rx) = mpsc::channel(MAX_MESSAGES);
        self.peers.insert(addr, tx);

        // split the stream into a sender and a receiver
        let (mut stream_sender, stream_receiver) = stream.split();

        // receive messages from others, and send them to the client
        tokio::spawn(async move {
            while let Some(message) = rx.recv().await {
                if let Err(e) = stream_sender.send(message.to_string()).await {
                    warn!("Failed to send message to {}: {}", addr, e);
                    break;
                }
            }
        });

        // return peer
        Peer {
            username,
            stream: stream_receiver,
        }
    }
}

impl Message {
    fn user_joined(username: &str) -> Self {
        let content = format!("{} has joined the chat", username);
        Self::UserJoined(content)
    }

    fn user_left(username: &str) -> Self {
        let content = format!("{} has left the chat", username);
        Self::UserLeft(content)
    }

    fn chat(sender: impl Into<String>, content: impl Into<String>) -> Self {
        Self::Chat {
            sender: sender.into(),
            content: content.into(),
        }
    }
}

impl fmt::Display for Message {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Message::UserJoined(content) => write!(f, "Server: {}", content),
            Message::UserLeft(content) => write!(f, "Server: {}", content),
            Message::Chat { sender, content } => write!(f, "{}: {}", sender, content),
        }
    }
}

这段 Rust 代码实现了一个基于 Tokio 的异步 TCP 聊天服务器

它的核心逻辑是:

  1. main 函数中,服务器启动并监听 8080 端口,等待客户端连接。
  2. 每当有新客户端连接,服务器会为其创建一个独立的异步任务 (tokio::spawn) 进行处理,这样可以高效地并发管理多个客户端。
  3. handle_client 函数负责与单个客户端的完整交互:首先提示客户端输入用户名,然后将其信息(地址和消息发送通道)存入一个全局共享的、线程安全的 State (使用 DashMap) 中。
  4. 服务器通过 broadcast 方法将新用户加入和离开的通知以及聊天消息广播给所有其他连接的客户端。
  5. State 结构中的 add 方法巧妙地利用 mpsc channel(多生产者,单消费者通道)和 stream.split(),将读写操作分离:一个任务负责从客户端接收消息,另一个任务负责将广播消息发送给该客户端。当客户端断开连接时,服务器会清理其状态并通知其他用户。

安装 Telnet

brew install telnet

运行与客户端调用测试

rust-ecosystem-learning on  main [!?] is 📦 0.1.0 via 🦀 1.88.0 took 2m 42.4s 
➜ cargo run --example chat
   Compiling rust-ecosystem-learning v0.1.0 (/Users/qiaopengjun/Code/Rust/rust-ecosystem-learning)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.21s
     Running `target/debug/examples/chat`
2025-07-13T04:33:58.059142Z  INFO chat: Listening on 0.0.0.0:8080
2025-07-13T04:39:47.784622Z  INFO chat: Accepted connection from 127.0.0.1:58259
2025-07-13T04:40:19.174428Z  INFO chat: Accepted connection from 127.0.0.1:58394
2025-07-13T04:42:32.433305Z  INFO chat: Accepted connection from 127.0.0.1:58959

# client qiao
rust-ecosystem-learning on  main [!?] is 📦 0.1.0 via 🦀 1.88.0 
➜ telnet 127.0.0.1 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome to the chat! Please enter your username:
qiao
Server: li has joined the chat
hello world
li: hi qiao
Server: Alice has joined the chat

## li
rust-ecosystem-learning on  main [!?] is 📦 0.1.0 via 🦀 1.88.0 
➜ telnet 127.0.0.1 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome to the chat! Please enter your username:
li
qiao: hello world
hi qiao
Server: Alice has joined the chat

# Alice
rust-ecosystem-learning on  main [!?] is 📦 0.1.0 via 🦀 1.88.0 
➜ telnet 127.0.0.1 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome to the chat! Please enter your username:
Alice

这段运行结果表明,你成功启动了 Rust 聊天服务器,并且它能够正确处理多个客户端的并发连接和消息交互。

测试中,三个客户端(用户名为 qiaoliAlice)通过 telnet 命令连接到了在 8080 端口上监听的服务器。交互日志显示,服务器的核心功能运行正常:

  1. 消息广播:一个用户(如 qiao)发送的消息能被其他所有在线用户(如 li)接收到。
  2. 状态通知:当有新用户(如 li 或 Alice)加入聊天室时,服务器会向所有已在线的用户广播一条系统通知。

这证明了该聊天程序成功实现了基本的多人实时通信功能。

💅 体验升级:让日志和界面更出色

use std::{fmt, net::SocketAddr, sync::Arc};

use anyhow::Result;
use dashmap::DashMap;
use futures::{SinkExt, StreamExt, stream::SplitStream};
use tokio::{
    net::{TcpListener, TcpStream},
    sync::mpsc,
};
use tokio_util::codec::{Framed, LinesCodec};
use tracing::{info, level_filters::LevelFilter, warn};
use tracing_subscriber::{Layer as _, fmt::Layer, layer::SubscriberExt, util::SubscriberInitExt};

const MAX_MESSAGES: usize = 128;

#[derive(Debug, Default)]
struct State {
    peers: DashMap<SocketAddr, mpsc::Sender<Arc<Message>>>,
}

#[derive(Debug)]
struct Peer {
    username: String,
    stream: SplitStream<Framed<TcpStream, LinesCodec>>,
}

#[derive(Debug)]
enum Message {
    UserJoined(String),
    UserLeft(String),
    Chat { sender: String, content: String },
}

#[tokio::main]
async fn main() -> Result<()> {
    let layer = Layer::new().with_filter(LevelFilter::INFO);
    tracing_subscriber::registry().with(layer).init();

    let addr = "0.0.0.0:8080";
    let listener = TcpListener::bind(addr).await?;
    info!("Listening on {}", addr);

    let state = Arc::new(State::default());

    loop {
        let (stream, addr) = listener.accept().await?;
        info!("Accepted connection from {}", addr);
        let state_cloned = state.clone();
        tokio::spawn(async move {
            if let Err(e) = handle_client(state_cloned, addr, stream).await {
                warn!("Failed to handle client {}: {}", addr, e);
            }
        });
    }
}

async fn handle_client(state: Arc<State>, addr: SocketAddr, stream: TcpStream) -> Result<()> {
    let mut stream = Framed::new(stream, LinesCodec::new());
    // 按帧发送的, LinesCodec 会在每行末尾加上 \n
    stream
        .send("Welcome to the chat! Please enter your username:")
        .await?;

    let username = match stream.next().await {
        Some(Ok(username)) => username,
        Some(Err(e)) => {
            warn!("Failed to receive username from {}: {}", addr, e);
            return Err(e.into());
        }
        None => {
            warn!("Client {} disconnected before sending username", addr);
            return Ok(());
        }
    };

    let mut peer = state.add(addr, username, stream).await;

    // notify others that a new peer has joined
    let message = Arc::new(Message::user_joined(&peer.username));
    info!("\x1b[32m🟢 用户加入: {:?}\x1b[0m", message);
    state.broadcast(addr, message).await;

    while let Some(line) = peer.stream.next().await {
        let line = match line {
            Ok(line) => line,
            Err(err) => {
                warn!("Failed to receive message from {}: {}", addr, err);
                break;
            }
        };

        let message = Arc::new(Message::chat(&peer.username, line));
        info!("\x1b[34m💬 聊天消息: {:?}\x1b[0m", message);
        state.broadcast(addr, message).await;
    }

    // when while loop exit, peer has left the chat or line reading failed
    // remove peer from state
    state.peers.remove(&addr);

    // notify others that peer has left the chat
    let message = Arc::new(Message::user_left(&peer.username));
    info!("\x1b[31m🔴 用户离开: {:?}\x1b[0m", message);
    state.broadcast(addr, message).await;

    Ok(())
}

impl State {
    async fn broadcast(&self, addr: SocketAddr, message: Arc<Message>) {
        for peer in self.peers.iter() {
            if peer.key() == &addr {
                continue;
            }
            if let Err(e) = peer.value().send(message.clone()).await {
                warn!("Failed to send message to {}: {}", peer.key(), e);
                // Remove the peer from the state if it's no longer reachable
                self.peers.remove(peer.key());
            }
        }
    }

    async fn add(
        &self,
        addr: SocketAddr,
        username: String,
        stream: Framed<TcpStream, LinesCodec>,
    ) -> Peer {
        let (tx, mut rx) = mpsc::channel(MAX_MESSAGES);
        self.peers.insert(addr, tx);

        // split the stream into a sender and a receiver
        let (mut stream_sender, stream_receiver) = stream.split();

        // receive messages from others, and send them to the client
        tokio::spawn(async move {
            while let Some(message) = rx.recv().await {
                if let Err(e) = stream_sender.send(message.to_string()).await {
                    warn!("Failed to send message to {}: {}", addr, e);
                    break;
                }
            }
        });

        // return peer
        Peer {
            username,
            stream: stream_receiver,
        }
    }
}

impl Message {
    fn user_joined(username: &str) -> Self {
        let content = format!("{} has joined the chat", username);
        Self::UserJoined(content)
    }

    fn user_left(username: &str) -> Self {
        let content = format!("{} has left the chat", username);
        Self::UserLeft(content)
    }

    fn chat(sender: impl Into<String>, content: impl Into<String>) -> Self {
        Self::Chat {
            sender: sender.into(),
            content: content.into(),
        }
    }
}

impl fmt::Display for Message {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Message::UserJoined(content) => write!(f, "\x1b[32m🟢 [系统] {}\x1b[0m", content),
            Message::UserLeft(content) => write!(f, "\x1b[31m🔴 [系统] {}\x1b[0m", content),
            Message::Chat { sender, content } => {
                write!(f, "\x1b[34m[{}]\x1b[0m {}", sender, content)
            }
        }
    }
}

这项优化主要集中在提升程序的可观察性(Observability)和终端用户体验(UX),而非性能。

它通过两方面的修改实现:

  1. 服务器端日志增强:在 handle_client 函数中,针对用户加入、离开和发送消息等关键事件,增加了带有 ANSI 颜色代码和表情符号info! 日志。这使得在监控服务器后台时,不同类型的事件一目了然,极大地提升了调试和监控的效率。
  2. 客户端显示美化:修改了 Message 类型的 Display trait 实现,将颜色和格式化信息(如 [系统] 标签)直接编码到发送给客户端的字符串中。这样,用户在自己的终端(如 telnet)里看到的聊天内容不再是单调的文本,而是色彩分明、重点突出的富文本信息,显著改善了可读性和交互体验。

运行结果


rust-ecosystem-learning on  main [!?] is 📦 0.1.0 via 🦀 1.88.0 took 9m 11.7s 
➜ cargo run --example chat
   Compiling rust-ecosystem-learning v0.1.0 (/Users/qiaopengjun/Code/Rust/rust-e...

剩余50%的内容订阅专栏后可查看

点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论