使用Yellowstone gRPC Geyser插件(Go)监控Solana流动性池

  • QuickNode
  • 发布于 2025-04-25 11:39
  • 阅读 51

本文介绍了如何使用QuickNode的Yellowstone gRPC接口,通过Go语言实时监控Solana链上DEX的流动性池交易。文章详细说明了如何设置Go环境、创建gRPC客户端,以及如何监控Raydium流动性池的交易活动和分析交易吞吐量,为DeFi应用和交易系统提供实时数据访问。

概述

黄石龙之口 (Yellowstone) 是一个强大的 gRPC 接口,构建于 Solana 的 Geyser 插件系统之上,可以实时传输区块链数据。通过连接到这项服务,你可以同时跟踪多个 DEX 池的活动,分析交易模式,并构建需要精确到毫秒级数据的响应式应用程序。

在本指南中,你将:

  1. 为 Yellowstone gRPC 设置 Go 环境
  2. 创建一个客户端来订阅 SOL/USDC 池的交易
  3. 监控活跃 Raydium 流动性池中的交易
  4. 使用基本统计信息分析交易吞吐量

前提条件

了解 Yellowstone 和 Geyser

什么是 Geyser?

Geyser 是 Solana 的插件系统,它使验证者能够将实时的区块链数据流传输到外部系统,而不会造成沉重的 RPC 负载。Geyser 不是通过 RPC 调用重复轮询区块链,而是在数据可用时推送数据,从而显著降低了延迟和资源使用。

什么是 Yellowstone?

黄石龙之口是一个构建于 Solana 的 Geyser 插件之上的开源 gRPC 接口。它提供高性能、类型安全的流传输:

  • 账户更新
  • 交易
  • 条目
  • 区块通知
  • 插槽通知

对于 DeFi 应用程序和交易系统,这种实时数据访问可以提供关键的竞争优势。

设置你的环境

首先,让我们创建一个新的 Go 项目并安装必要的依赖项。

  1. 创建一个项目目录:
mkdir solana-dex-monitor && cd solana-dex-monitor
  1. 初始化 Go 模块:
go mod init solana-dex-monitor
  1. 安装所需的依赖项:
go get google.golang.org/grpc
go get github.com/joho/godotenv
go get github.com/mr-tron/base58
go get github.com/rpcpool/yellowstone-grpc/examples/golang/proto
  1. 创建一个 .env 文件来存储你的 QuickNode 凭据:
qn_grpc_url=your-quicknode-yellowstone-endpoint.grpc.solana.quiknode.pro:443
qn_grpc_token=your-quicknode-token

你可以在我们的文档中找到有关配置端点的信息,这里

构建流动性池监控器

现在,让我们创建我们的主应用程序来监控流动性池的交易。对于此示例,我们将利用 Raydium 上一个非常活跃的 SOL/USDC 池,3ucNos4NbumPLZNWztqGHNFFgkHeRMBQAVemeeomsUxv,但是你可以轻松添加其他池或将其适应其他池或 DEX。

创建一个名为 main.go 的文件,并添加以下代码:

package main

import (
    "context"
    "crypto/tls"
    "fmt"
    "log"
    "os"
    "strings"
    "sync"
    "time"

    pb "github.com/rpcpool/yellowstone-grpc/examples/golang/proto"

    "github.com/joho/godotenv"
    "github.com/mr-tron/base58"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    "google.golang.org/grpc/encoding/gzip"
    "google.golang.org/grpc/keepalive"
)

var (
    endpoint string
    token    string
)

var SolUsdcPoolAddresses = []string{
    "3ucNos4NbumPLZNWztqGHNFFgkHeRMBQAVemeeomsUxv", // Raydium SOL/USDC 池的例子
    // 根据需要添加更多池地址
}

// 加载环境变量
func init() {
    err := godotenv.Load()
    if err != nil {
        log.Fatalf("Error loading .env file: %v", err)
    }

    endpoint = getEnv("qn_grpc_url", "example.com:10000") // 默认值作为回退
    token = getEnv("qn_grpc_token", "token")
}

// 辅助函数,用于获取具有默认值的环境变量
func getEnv(key, defaultValue string) string {
    value := os.Getenv(key)
    if value == "" {
        return defaultValue
    }
    return value
}

// PoolTxStats 跟踪交易的统计信息
type PoolTxStats struct {
    txCount     int
    firstTxTime time.Time
    lastTxTime  time.Time
    mutex       sync.Mutex
}

// 用于跟踪统计信息的全局变量
var (
    poolStats  = make(map[string]*PoolTxStats) // 键是字符串形式的插槽
    statsMutex sync.RWMutex
)

type tokenAuth struct {
    token string
}

func (t tokenAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
    return map[string]string{
        "authorization": t.token,
    }, nil
}

func (t tokenAuth) RequireTransportSecurity() bool {
    return true
}

// 用于从交易中安全提取签名的函数
func extractSignature(tx *pb.SubscribeUpdateTransaction) string {
    if tx == nil {
        return "No transaction"
    }

    if sig := tx.GetTransaction().GetSignature(); len(sig) > 0 {
        // 将二进制签名转换为 base58
        return base58.Encode(sig)
    }

    return "No signature found"
}

func boolPtr(b bool) *bool {
    return &b
}

func main() {
    // 设置连接参数
    kacp := keepalive.ClientParameters{
        Time:                10 * time.Second,
        Timeout:             5 * time.Second,
        PermitWithoutStream: true,
    }

    opts := []grpc.DialOption{
        grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})),
        grpc.WithKeepaliveParams(kacp),
        grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*1024), grpc.UseCompressor(gzip.Name)),
        grpc.WithPerRPCCredentials(tokenAuth{token: token}),
    }

    // 建立连接
    conn, err := grpc.Dial(endpoint, opts...)
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()

    client := pb.NewGeyserClient(conn)

    // 为 SOL/USDC 流动性池交易创建订阅请求
    transactions := make(map[string]*pb.SubscribeRequestFilterTransactions)
    transactions["sol_usdc_pool_txs"] = &pb.SubscribeRequestFilterTransactions{
        Vote:            boolPtr(false),
        Failed:          boolPtr(false),
        AccountInclude:  SolUsdcPoolAddresses,
        AccountExclude:  []string{},
        AccountRequired: []string{},
    }

    commitment := pb.CommitmentLevel_CONFIRMED
    subReq := &pb.SubscribeRequest{
        Transactions: transactions,
        Commitment:   &commitment,
    }

    fmt.Println("Connecting to Yellowstone gRPC...")
    stream, err := client.Subscribe(context.Background())
    if err != nil {
        fmt.Printf("Failed to subscribe to yellowstone: %v\n", err)
        return
    }

    fmt.Println("Sending subscription request for SOL/USDC pool transactions...")
    if err = stream.Send(subReq); err != nil {
        fmt.Printf("Failed to send subscription request: %v\n", err)
        return
    }

    // 打印池统计信息的标题
    fmt.Printf("\n%-12s %-12s %-15s %-20s\n",
        "Slot", "TX Count", "TX/sec", "Total Time (ms)")
    fmt.Println(strings.Repeat("-", 65))

    // 每 5 秒打印一次统计信息
    lastPrintTime := time.Now()
    printInterval := time.Second * 5

    fmt.Println("Monitoring SOL/USDC liquidity pool transactions...")
    for {
        m, err := stream.Recv()
        if err != nil {
            fmt.Printf("Failed to receive yellowstone message: %v\n", err)
            return
        }

        if tx := m.GetTransaction(); tx != nil {
            // 提取交易签名
            signature := extractSignature(tx)

            // 使用插槽处理交易以进行跟踪
            slot := tx.GetSlot()
            now := time.Now()

            // 维护基于插槽的简化跟踪方法
            statsMutex.Lock()
            slotStr := fmt.Sprintf("%d", slot)
            if _, exists := poolStats[slotStr]; !exists {
                poolStats[slotStr] = &PoolTxStats{
                    firstTxTime: now,
                    lastTxTime:  now,
                }
            }

            stats := poolStats[slotStr]
            stats.mutex.Lock()
            stats.txCount++
            stats.lastTxTime = now
            stats.mutex.Unlock()
            statsMutex.Unlock()

            // 打印带有签名的交易信息
            fmt.Printf("Pool transaction detected at Slot=%d\nSignature=%s\n", slot, signature)

            // 定期打印统计信息
            if now.Sub(lastPrintTime) >= printInterval {
                printPoolStats()
                lastPrintTime = now

                // 清理旧的统计信息
                cleanupOldStats()
            }
        }
    }
}

func printPoolStats() {
    statsMutex.RLock()
    defer statsMutex.RUnlock()

    fmt.Println("\nTransaction Statistics:")
    fmt.Printf("\n%-12s %-12s %-15s %-20s\n",
        "Slot", "TX Count", "TX/sec", "Total Time (ms)")
    fmt.Println(strings.Repeat("-", 65))

    for slotStr, stats := range poolStats {
        stats.mutex.Lock()
        duration := stats.lastTxTime.Sub(stats.firstTxTime).Milliseconds()
        var txPerSec float64
        if duration > 0 {
            txPerSec = float64(stats.txCount) / (float64(duration) / 1000.0)
        }

        fmt.Printf("%-12s %-12d %-15.2f %-20d\n",
            slotStr,
            stats.txCount,
            txPerSec,
            duration,
        )
        stats.mutex.Unlock()
    }
    fmt.Println()
}

func cleanupOldStats() {
    statsMutex.Lock()
    defer statsMutex.Unlock()

    now := time.Now()
    for slotStr, stats := range poolStats {
        stats.mutex.Lock()
        // 删除超过 1 分钟的统计信息
        if now.Sub(stats.lastTxTime) > time.Minute {
            delete(poolStats, slotStr)
        }
        stats.mutex.Unlock()
    }
}

了解代码

让我们分解应用程序的关键组件:

配置和设置

  1. 池地址:我们定义了一个示例 SOL/USDC 池地址列表——在本例中,我们使用一个活跃的 Raydium 池。你可以通过将地址附加到 SolUsdcPoolAddresses 切片来添加更多池。

  2. 身份验证tokenAuth 结构处理使用你的 QuickNode Token进行授权。

  3. 连接设置:我们使用 TLS、压缩和 keepalive 参数配置 gRPC 连接设置。

交易订阅

我们监控系统的核心是订阅请求:

transactions["sol_usdc_pool_txs"] = &pb.SubscribeRequestFilterTransactions{
    Vote:            boolPtr(false),
    Failed:          boolPtr(false),
    AccountInclude:  SolUsdcPoolAddresses,
    AccountExclude:  []string{},
    AccountRequired: []string{},
}

此过滤器:

  • 排除投票交易
  • 仅包括成功的交易
  • 监控涉及 DEX 程序 ID 或特定池帐户的活动

交易处理

对于每个传入的交易:

  1. 我们使用 extractSignature 函数提取签名
  2. 按插槽跟踪交易计数和时间
  3. 定期计算和显示统计信息(每秒交易数、总持续时间)
  4. 清理旧的统计信息以管理内存使用

运行监控器

要运行监控器:

go run main.go

你应该看到类似于以下的输出:

Connecting to Yellowstone gRPC...
Sending subscription request for SOL/USDC pool transactions...

Slot        TX Count     TX/sec         Total Time (ms)
-----------------------------------------------------------------
Monitoring SOL/USDC liquidity pool transactions...
Pool transaction detected at Slot=212439883
Signature=4ZV7JsQTwQfLWtN9YMu2EJkTKjyAC9Yjjd1TGY8X5qvqYhfpfTKk3PUK5NZ2P9HFfxXUE2mRJsW2LcUfTF1oTBcP

Transaction Statistics:

Slot        TX Count     TX/sec         Total Time (ms)
-----------------------------------------------------------------
212439883   1            0.20           5000

增强监控器

想要继续构建吗?以下是一些增强你的监控器的想法:

1. 添加更多池

你可以通过在 SolUsdcPoolAddresses 切片中包含它们的地址来轻松添加更多要监控的池:

var SolUsdcPoolAddresses = []string{
    "3ucNos4NbumPLZNWztqGHNFFgkHeRMBQAVemeeomsUxv", // Raydium SOL/USDC 池的例子
    // 根据需要添加更多池地址
}

2. 解析交易数据

你可以解码交易数据以提取交换金额、价格影响和其他详细信息,而不仅仅是计数交易:

// 添加此函数以解码交易数据
func decodeTransaction(tx *pb.SubscribeUpdateTransaction) {
    // 基于程序 ID 解析交易数据
    // 不同的 DEX(Orca、Raydium、Jupiter)具有不同的交易结构
}

3. 存储数据以供分析

将监控器连接到数据库以存储交易数据以供以后分析:

// 添加数据库集成
func storeTransactionData(slot uint64, signature string, details map[string]interface{}) {
    // 插入数据库(PostgreSQL、InfluxDB 等)
}

4. 实施价格警报

解析帐户数据后,可以添加逻辑来检测显着的价格波动或异常活动:

// 添加价格监控
func detectPriceAnomaly(currentPrice, previousPrice float64) bool {
    // 实施异常检测逻辑
    percentChange := (currentPrice - previousPrice) / previousPrice * 100
    return math.Abs(percentChange) > 1.0 // 警报超过 1% 的价格变化
}

总结

将 Yellowstone gRPC 与 Go 结合使用提供了一种强大的方式来以极低的延迟监控 Solana 流动性池。这种方法可以实时跟踪多个程序中的 DEX 活动,为你提供交易、分析或监控应用程序所需的数据。

对于毫秒至关重要的交易机器人或套利系统,Yellowstone 的流式传输方法比传统的 RPC 方法具有显着优势。通过直接进入 Solana 区块链数据源,你可以随时了解市场动向。

我们 ❤️ 反馈!

如果你对新主题有任何反馈或要求,请告诉我们。我们很乐意收到你的来信。

附加资源

如果你有任何问题或者需要帮助去实现你的 Solana dApp,加入我们的 Discord 社区 或者联系我们的支持团队!

  • 原文链接: quicknode.com/guides/sol...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
QuickNode
QuickNode
江湖只有他的大名,没有他的介绍。