使用CoinPaprika和Yellowstone Geyser监控高交易量流动性池

本文介绍了如何结合QuickNode的CoinPaprika价格与市场数据API以及Yellowstone Geyser gRPC插件,构建一个强大的Solana流动性池监控系统。该系统能够获取特定Token的最高交易量池,并订阅这些池的实时交易更新进行分析,从而帮助交易者发现套利、做市或趋势跟踪的机会。文章提供搭建该系统的Rust代码示例和详细步骤。

概述

跟踪高交易量流动性池的活动对于 Solana 上的自动化交易策略至关重要。通过识别具有显著交易量的池并实时监控其交易流,交易者可以发现套利、做市或趋势跟踪的机会。本指南演示了如何将 QuickNode 的 CoinPaprika 价格和市场数据 APIYellowstone Geyser gRPC 插件结合起来,构建一个强大的池监控系统。

你将要做什么

我们将构建一个 Rust 应用程序,它可以:

  • 获取特定 Token 的最高交易量池
  • 订阅这些池的实时交易更新
  • 处理传入的交易以进行分析

你将需要什么

依赖 版本
rustc 1.85.0
cargo 1.85.0

了解架构

在深入研究代码之前,让我们了解这些服务是如何协同工作的:

CoinPaprika API 提供全面的市场数据,包括:

  • 池交易量和流动性信息
  • Token 价格数据
  • 历史交易指标
  • DEX 特定的池数据

Yellowstone Geyser gRPC 提供:

  • Solana 区块链数据的实时流
  • 低延迟的交易通知
  • 基于账户的过滤订阅
  • 提交级别控制

通过组合这些服务,我们可以识别重要的池,并以最小的延迟监控它们的活动。

让我们开始构建吧!

项目设置

首先,创建一个新的 Rust 项目并添加所需的依赖项:

cargo new pool-monitor && cd pool-monitor

使用以下依赖项更新你的 Cargo.toml

[package]
name = "pool-monitor"
version = "0.1.0"
edition = "2021"

[dependencies]
reqwest = { version = "0.12", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["full"] }
anyhow = "1.0"
dotenv = "0.15"
bs58 = "0.5.1"
futures = "0.3"
log = "0.4"
env_logger = "0.11"
tonic = { version = "0.12", features = ["tls", "tls-roots"] }
yellowstone-grpc-client = "6.1.0"
yellowstone-grpc-proto = "6.1.0"

在你的项目根目录中创建一个 .env 文件:

## QuickNode endpoint with CoinPaprika add-on enabled
QUICKNODE_URL=https://your-endpoint-name.solana-mainnet.quiknode.pro/your-token

## Yellowstone Geyser credentials
GEYSER_ENDPOINT=https://your-endpoint-name.solana-mainnet.quiknode.pro:10000
GEYSER_AUTH_TOKEN=your-auth-token

## Configuration
TARGET_TOKEN_ADDRESS=DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263
MONITORED_POOL_COUNT=3

将占位符值替换为你实际的 QuickNode Yellowstone gRPC 节点和身份验证 Token。你可以在我们的文档中找到有关配置你的节点的信息,这里

构建池监控器

让我们将实现分解为可管理的部分。

导入依赖项

首先,我们需要在我们的 main.rs 文件中导入必要的 crate 和模块:

use {
    anyhow::{Context, Result},
    bs58,
    dotenv::dotenv,
    futures::{sink::SinkExt, stream::StreamExt},
    log::{error, info, warn},
    reqwest,
    serde::{Deserialize, Serialize},
    std::{collections::HashMap, env},
    tokio,
    tonic::{service::Interceptor, transport::ClientTlsConfig, Status},
    yellowstone_grpc_client::GeyserGrpcClient,
    yellowstone_grpc_proto::{
        geyser::SubscribeUpdate,
        prelude::{
            subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest,
            SubscribeRequestFilterTransactions,
        },
    },
};

定义常量

为 BONK Token 地址和默认池计数添加常量:

const RUST_LOG_LEVEL: &str = "info";
const BONK_TOKEN_ADDRESS: &str = "DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263";

设置日志

接下来,我们需要设置日志以捕获重要事件和错误:

fn setup_logging() {
    env::set_var("RUST_LOG", RUST_LOG_LEVEL);
    env_logger::init();
}

此函数使用默认日志级别初始化日志记录器,默认日志级别可以被 RUST_LOG 环境变量覆盖。

数据结构

接下来,我们将定义数据结构来表示来自 CoinPaprika API 的池信息:

##[derive(Debug, Deserialize)]
pub struct PoolsResponse {
    pub pools: Vec<Pool>,
}

##[derive(Debug, Deserialize, Clone)]
pub struct Pool {
    pub id: String,
    pub dex_name: String,
    pub volume_usd: f64,
    pub tokens: Vec<Token>,
}

##[derive(Debug, Deserialize, Clone)]
pub struct Token {
    pub symbol: String,
}

##[derive(Debug, Serialize)]
pub struct PoolsQuery {
    pub limit: u32,
    pub sort: String,
    pub order_by: String,
}

这些结构直接映射到 CoinPaprika API 响应格式。Pool 结构包含:

  • id: 池的链上地址
  • dex_name: 哪个 DEX 托管了这个池 (Raydium, Orca, 等等)
  • volume_usd: 24 小时交易量,以美元计
  • tokens: 此池中的 Token 对

来源DexPaprika 文档

接下来,让我们为 Pool 结构定义方法,以帮助我们格式化 Token 对并管理池元数据:

impl Pool {
    pub fn token_pair(&self) -> String {
        if self.tokens.len() >= 2 {
            format!("{}/{}", self.tokens[0].symbol, self.tokens[1].symbol)
        } else {
            "Unknown pair".to_string()
        }
    }
}

##[derive(Debug, Clone)]
pub struct PoolMetadata {
    pools: Vec<Pool>,
}

impl PoolMetadata {
    pub fn new(pools: Vec<Pool>) -> Self {
        Self { pools }
    }

    pub fn get_pool_ids(&self) -> Vec<String> {
        self.pools.iter().map(|p| p.id.clone()).collect()
    }
}

用于 CoinPaprika 的 QuickNode 客户端

接下来,我们将创建一个客户端来与 CoinPaprika API 交互:

##[derive(Debug, Clone)]
pub struct QuickNodeClient {
    client: reqwest::Client,
    base_url: String,
}

impl QuickNodeClient {
    pub fn from_env() -> Result<Self> {
        let base_url = env::var("QUICKNODE_URL")
            .context("Missing QUICKNODE_URL")?;
        Ok(Self {
            client: reqwest::Client::new(),
            base_url,
        })
    }

    pub async fn get_top_pools_by_volume(
        &self,
        token_address: &str,
        limit: u32
    ) -> Result<Vec<Pool>> {
        let query = PoolsQuery {
            limit,
            sort: "desc".to_string(),
            order_by: "volume_usd".to_string(),
        };

        let url = format!(
            "{}/addon/912/networks/solana/tokens/{}/pools",
            self.base_url,
            token_address
        );

        for attempt in 1..=3 {
            let response = self.client
                .get(&url)
                .query(&query)
                .send()
                .await;

            match response {
                Ok(resp) if resp.status().is_success() => {
                    let json_text = resp.text().await.context("Failed to read response body")?;
                    let pools_response: PoolsResponse = serde_json::from_str(&json_text).context("Failed to parse JSON response")?;
                    return Ok(pools_response.pools);
                }
                Ok(resp) if attempt < 3 => {
                    let status = resp.status().as_u16();
                    warn!("Request failed with status {}, retrying in {}s... (attempt {}/3)", status, attempt, attempt);
                    tokio::time::sleep(tokio::time::Duration::from_secs(attempt)).await;
                    continue;
                }
                Ok(resp) => {
                    anyhow::bail!("API error: {}", resp.status());
                }
                Err(e) if attempt < 3 => {
                    warn!("Network error, retrying: {}", e);
                    tokio::time::sleep(
                        tokio::time::Duration::from_secs(attempt)
                    ).await;
                    continue;
                }
                Err(e) => return Err(e).context("Request failed"),
            }
        }

        unreachable!()
    }
}

此客户端:

  • 构建正确的 API 节点 URL
  • 实现具有指数退避的重试逻辑
  • 查询按交易量降序排序的池以获取最高交易量池
  • 返回已解析的池数据,如果请求失败,则引发错误

获取高交易量池

现在让我们实现获取和显示热门池的逻辑:

async fn fetch_pools() -> Result<PoolMetadata> {
    let target_address = env::var("TARGET_TOKEN_ADDRESS").unwrap_or_else(|_| BONK_TOKEN_ADDRESS.to_string());
    let pool_count: u32 = env::var("MONITORED_POOL_COUNT")
        .unwrap_or_else(|_| "3".to_string())
        .parse()
        .unwrap_or(3);

    info!("Fetching top {} target pools by volume...", pool_count);

    let client = QuickNodeClient::from_env().context("Failed to create QuickNode client")?;
    let pools = client.get_top_pools_by_volume(&target_address, pool_count)
        .await
        .context("Failed to fetch BONK pools")?;

    if pools.is_empty() {
        anyhow::bail!("No BONK pools found");
    }

    for (i, pool) in pools.iter().enumerate() {
        info!("{}. {} - {} (${:.0})", i + 1, pool.dex_name, pool.token_pair(), pool.volume_usd);
    }

    Ok(PoolMetadata::new(pools))
}

此函数:

  • 从环境变量读取配置
  • 获取交易量排名前 3 的池
  • 显示池信息以进行验证
  • 返回池元数据以供进一步处理

设置 Yellowstone Geyser

在确定了我们的池之后,我们需要设置 Geyser 客户端进行实时监控:

async fn create_geyser_client() -> Result<GeyserGrpcClient<impl Interceptor>> {
    let endpoint = env::var("GEYSER_ENDPOINT").context("Missing GEYSER_ENDPOINT")?;
    let auth_token = env::var("GEYSER_AUTH_TOKEN").context("Missing GEYSER_AUTH_TOKEN")?;

    info!("Connecting to gRPC endpoint...");

    let client = GeyserGrpcClient::build_from_shared(endpoint)?
        .x_token(Some(auth_token))?
        .tls_config(ClientTlsConfig::new().with_native_roots())?
        .connect()
        .await?;

    Ok(client)
}

Geyser 客户端设置:

  • 使用 TLS 进行安全连接
  • 使用你的 QuickNode Token 进行身份验证
  • 返回已连接的客户端,可以进行订阅

如果你不熟悉 Yellowstone,请查看我们的 Yellowstone Geyser 文档Yellowstone gRPC (Rust) 指南 了解更多关于如何配置和使用它的细节。

订阅池交易

现在我们将订阅涉及我们的目标池的交易:

async fn subscribe_to_pools(
    client: &mut GeyserGrpcClient<impl Interceptor>,
    pool_ids: Vec<String>,
) -> Result<impl StreamExt<Item = Result<SubscribeUpdate, Status>>> {
    let (mut tx, rx) = client.subscribe().await?;

    info!("Setting up filters for {} pools", pool_ids.len());

    let mut accounts_filter = HashMap::new();
    accounts_filter.insert(
        "bonk_monitor".to_string(),
        SubscribeRequestFilterTransactions {
            account_include: pool_ids,
            account_exclude: vec![],
            account_required: vec![],
            vote: Some(false),
            failed: Some(false),
            signature: None,
        },
    );

    tx.send(SubscribeRequest {
        transactions: accounts_filter,
        commitment: Some(CommitmentLevel::Processed as i32),
        ..Default::default()
    }).await?;

    Ok(rx)
}

此订阅:

  • 筛选包含我们的池地址的交易
  • 排除投票和失败的交易以减少噪音
  • 使用 "Processed" 承诺以获得更快的更新
  • 返回交易更新流

处理交易流

最后,让我们处理传入的交易流:

async fn process_transaction_stream(
    mut stream: impl StreamExt<Item = Result<SubscribeUpdate, Status>> + Unpin,
) -> Result<()> {
    while let Some(message) = stream.next().await {
        match message {
            Ok(msg) => handle_update(msg),
            Err(e) => {
                error!("Stream error: {:?}", e);
                break;
            }
        }
    }
    Ok(())
}

fn handle_update(update: SubscribeUpdate) {
    if let Some(UpdateOneof::Transaction(transaction_update)) = update.update_oneof {
        if let Some(tx_info) = &transaction_update.transaction {
            let tx_id = bs58::encode(&tx_info.signature).into_string();
            info!("   Pool transaction: {}", tx_id);

            // 在这里你将实现你的交易逻辑:
            // - 解析指令数据
            // - 计算交易金额
            // - 检查套利机会
            // - 执行反向交易
        }
    }
}

流处理器:

  • 持续接收交易更新
  • 提取交易签名
  • 提供一个用于实现交易逻辑的Hook

整合在一起

这是编排一切的 main 函数:

##[tokio::main]
async fn main() -> Result<()> {
    dotenv().ok();
    setup_logging();

    info!("🚀 Starting Pool Monitor");

    // 步骤 1:获取高交易量池
    let pool_metadata = fetch_pools().await?;

    // 步骤 2:连接到 Geyser
    let mut client = create_geyser_client().await?;

    // 步骤 3:订阅池交易
    let pool_ids = pool_metadata.get_pool_ids();
    let stream = subscribe_to_pools(&mut client, pool_ids).await?;

    info!("👂 Listening for transactions...");

    // 步骤 4:处理交易
    process_transaction_stream(stream).await?;

    Ok(())
}

运行监控器

要运行你的池监控器,请在你的终端中执行以下命令:

cargo run

确保你的 .env 文件已使用你的 QuickNode 节点和 Yellowstone Geyser 凭据正确配置。监控器将获取热门的 BONK 池,订阅它们的交易流,并在发生更新时记录更新。如果一切设置正确,你应该看到类似于以下的输出:

[2025-05-29T17:32:11Z INFO  pool-monitor] 🚀 Starting BONK Pool Monitor
[2025-05-29T17:32:11Z INFO  pool-monitor] Fetching top 3 BONK pools by volume...
[2025-05-29T17:32:13Z INFO  pool-monitor] 1. Orca - SOL/Bonk ($1675994)
[2025-05-29T17:32:13Z INFO  pool-monitor] 2. Meteora - Bonk/SOL ($1316065)
[2025-05-29T17:32:13Z INFO  pool-monitor] 3. Raydium CLMM - SOL/Bonk ($1373222)
[2025-05-29T17:32:13Z INFO  pool-monitor] Connecting to gRPC endpoint...
[2025-05-29T17:32:13Z INFO  pool-monitor] Setting up filters for 3 pools
[2025-05-29T17:32:13Z INFO  pool-monitor] 👂 Listening for transactions...
[2025-05-29T17:32:22Z INFO  pool-monitor]    BONK Pool transaction: 4ZMQyEM...
[2025-05-29T17:32:23Z INFO  pool-monitor]    BONK Pool transaction: 3ztQfCE...
[2025-05-29T17:32:23Z INFO  pool-monitor]    BONK Pool transaction: rKkM21m...

干得漂亮!

扩展监控器

这个基本的监控器为更复杂的交易策略提供了基础。以下是你可能实施的一些增强功能的想法:

1. 交易分析

解析指令数据以确定:

  • 交易方向(买入/卖出)
  • 交易量和价格影响
  • 滑点容忍度
  • 费用结构

查看我们的指南,让解析指令更容易:

2. 套利检测

比较不同池的价格以识别:

  • 跨 DEX 套利机会
  • 三角套利路径
  • 闪电贷机会

3. 做市

实施以下策略:

  • 在最佳价格范围内提供流动性
  • 基于流量重新平衡头寸
  • 动态费用调整

4. 数据持久性

存储交易数据以进行:

  • 历史分析
  • 策略回测
  • 性能跟踪

最佳实践

在构建生产交易系统时:

  1. 错误处理:实施全面的错误恢复
  2. 速率限制:了解你的 速率限制实施你自己的限制 以避免达到它们
  3. 监控:跟踪系统健康和性能指标
  4. 安全:永远不要在代码或日志中暴露 API 密钥
  5. 测试:在主网部署之前,在开发网上进行彻底测试。在部署真实的交易策略时,首先从小额资金开始,并密切监控意外行为/边缘情况。

结论

通过结合 CoinPaprika 的 Solana Token 价格和流动性池 API 以及 QuickNode 的 Yellowstone Geyser 插件,你可以构建强大的池监控系统,而无需额外的基础设施开销。这种方法提供:

  • 实时了解高交易量池
  • 低延迟的交易通知
  • 用于交易决策的可靠数据源

无论你是在构建套利机器人、做市商还是分析工具,这个基础都为你提供了在 Solana 上进行复杂交易策略所需的数据管道。

其他资源

我们 ❤️ 反馈!

如果你有任何反馈或对新主题的请求,请 告诉我们。我们很乐意听到你的声音。

  • 原文链接: quicknode.com/guides/sol...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
QuickNode
QuickNode
江湖只有他的大名,没有他的介绍。