【使用go开发区块链】之获取链上数据(04)

上一篇文章,我们完成了go连接区块链的操作,本章我们将要完成获取链上数据,并持久化到数据库的功能开发

本系列文章 1、【使用go开发区块链】之获取链上数据(01) 2、【使用go开发区块链】之获取链上数据(02) 3、【使用go开发区块链】之获取链上数据(03) 4、【使用go开发区块链】之获取链上数据(04)

1、获取区块链数据

1.1、通过区块高度获取对应区块信息

在上一章里,我们最后通过下面代码获取到了区块链的最新高度

blockNumber, err := global.EthRpcClient.BlockNumber(context.Background())

接下来我们我们需要将得到的区块高度blockNumber当做入参传入,获取到区块高度对应的区块信息:

lastBlock, err := global.EthRpcClient.BlockByNumber(context.Background(), big.NewInt(int64(blockNumber)))

我们来看一下它的定义

func (b *Block) Transactions() Transactions { return b.transactions }

func (b *Block) Transaction(hash common.Hash) *Transaction {
    for _, transaction := range b.transactions {
        if transaction.Hash() == hash {
            return transaction
        }
    }
    return nil
}
func (b *Block) NumberU64() uint64        { return b.header.Number.Uint64() }
func (b *Block) MixDigest() common.Hash   { return b.header.MixDigest }
func (b *Block) Nonce() uint64            { return binary.BigEndian.Uint64(b.header.Nonce[:]) }
func (b *Block) Bloom() Bloom             { return b.header.Bloom }
func (b *Block) Coinbase() common.Address { return b.header.Coinbase }
func (b *Block) Root() common.Hash        { return b.header.Root }
func (b *Block) ParentHash() common.Hash  { return b.header.ParentHash }
func (b *Block) TxHash() common.Hash      { return b.header.TxHash }
func (b *Block) ReceiptHash() common.Hash { return b.header.ReceiptHash }
func (b *Block) UncleHash() common.Hash   { return b.header.UncleHash }
func (b *Block) Hash() common.Hash {
    if hash := b.hash.Load(); hash != nil {
        return hash.(common.Hash)
    }
    v := b.header.Hash()
    b.hash.Store(v)
    return v
}

可以看到,它提供了很多方法,比如Hash()获取区块Hash、Transactions()获取区块包含的交易等,我们可以根据自己的业务选择存储需要的数据

1.2、解析区块包含的交易数据

1.2.1、解析交易数据

我们通过区块可以获取到区块里包含的交易lastBlock.Transactions(),它实际上是一个Transactions数组,通过range我们可以将它进行遍历:

for _, tx := range block.Transactions()

1.2.1.1、获取交易回执

tx就是该区块包含的每一个交易对象,我们需要通过交易对象的Hash拿到交易回执信息:

receipt, err := global.EthRpcClient.TransactionReceipt(context.Background(), tx.Hash())

receipt结构如下:

type Receipt struct {
    // Consensus fields: These fields are defined by the Yellow Paper
    Type              uint8  `json:"type,omitempty"`
    PostState         []byte `json:"root"`
    Status            uint64 `json:"status"`
    CumulativeGasUsed uint64 `json:"cumulativeGasUsed" gencodec:"required"`
    Bloom             Bloom  `json:"logsBloom"         gencodec:"required"`
    Logs              []*Log `json:"logs"              gencodec:"required"`

    // Implementation fields: These fields are added by geth when processing a transaction.
    TxHash            common.Hash    `json:"transactionHash" gencodec:"required"`
    ContractAddress   common.Address `json:"contractAddress"`
    GasUsed           uint64         `json:"gasUsed" gencodec:"required"`
    EffectiveGasPrice *big.Int       `json:"effectiveGasPrice"`

    // Inclusion information: These fields provide information about the inclusion of the
    // transaction corresponding to this receipt.
    BlockHash        common.Hash `json:"blockHash,omitempty"`
    BlockNumber      *big.Int    `json:"blockNumber,omitempty"`
    TransactionIndex uint        `json:"transactionIndex"`
}

其中有一个定义Logs,它就是我们每个交易里面包含的log事件,同样也是一个数组类型,我们需要把它给解析出来 这里,有些同学可能有疑问,这个log到底是什么东西?我发一张图来说明: 在这里插入图片描述 该图截取自:区块链浏览器数据 的Logs标签

从图中,我们可以看到一个交易中会存在多个log,其实这个log,才是我们最需要抓取的数据,拿监听某个NFT合约Mint事件举例,我们其实就是需要抓取到该NFT合约的Log事件,然后逐个分析是否是Mint事件

1.2.1.1.1、解析Log事件

Log结构体定义:

type Log struct {
    // Consensus fields:
    // address of the contract that generated the event
    Address common.Address `json:"address" gencodec:"required"`
    // list of topics provided by the contract.
    Topics []common.Hash `json:"topics" gencodec:"required"`
    // supplied by the contract, usually ABI-encoded
    Data []byte `json:"data" gencodec:"required"`

    // Derived fields. These fields are filled in by the node
    // but not secured by consensus.
    // block in which the transaction was included
    BlockNumber uint64 `json:"blockNumber"`
    // hash of the transaction
    TxHash common.Hash `json:"transactionHash" gencodec:"required"`
    // index of the transaction in the block
    TxIndex uint `json:"transactionIndex"`
    // hash of the block in which the transaction was included
    BlockHash common.Hash `json:"blockHash"`
    // index of the log in the block
    Index uint `json:"logIndex"`

    // The Removed field is true if this log was reverted due to a chain reorganisation.
    // You must pay attention to this field if you receive logs through a filter query.
    Removed bool `json:"removed"`
}

我们需要的就是Topics[]和Data[],其中Topics[0] 为该方法的keccak256加密后的前4个字节(即为函数选择器)Topics最多包含4个数据,即为声明为 indexed的字段(这块涉及到solidity知识,大家有个印象即可,我会在solidity教程里详细讲解,大家感兴趣可以查看我发布的solidity课程),Data里包含的是剩余的数据

1.2.1.2、处理交易数据

1.2.1.2.1、验证是否是创建合约

如果我们想知道一笔交易是否是创建合约该怎么办呢?其实很简单,在每个交易里有都有一个to字段,如果to字段为空,则代表该交易为创建合约操作。

1.2.1.2.2、验证地址是否是合约地址

我们可以通过下面方法来验证一个地址是否是合约地址:

// 判断一个地址是否是合约地址
func isContractAddress(address string) (bool, error) {
    addr := common.HexToAddress(address)
    code, err := global.EthRpcClient.CodeAt(context.Background(), addr, nil)
    if err != nil {
        return false, err
    }
    return len(code) > 0, nil
}

通过获取指定地址的code来判断,若code长度不为0,则该地址为合约地址

1.3、持久化链上数据

在上面的章节,我们已经讲解了如何获取链上的区块数据,以及如何进行解析,下面我们将要把链上数据持久化到我们的数据库中

1.3.1、创建实体类

1.3.1.1、创建transaction.go

在internal/model目录下创建transaction.go文件,用来存储交易数据:

type Transaction struct {
    Id          uint64 `json:"id" gorm:"primary_key;AUTO_INCREMENT"`
    BlockNumber uint64 `json:"block_number"`
    TxHash      string `json:"tx_hash" gorm:"type:char(66)" `
    From        string `json:"from" gorm:"type:char(42)" `
    To          string `json:"to" gorm:"type:char(42)" `
    Value       string `json:"value" gorm:"type:varchar(256)" `
    Contract    string `json:"contract" gorm:"type:char(42)" `
    Status      uint64 `json:"status"`
    InputData   string `json:"input_data" gorm:"type:varchar(4096)"`
    *gorm.Model
}

func (tx *Transaction) TableName() string {
    return "transactions"
}
func (tx *Transaction) Insert() error {
    if err := global.DBEngine.Create(&tx).Error; err != nil {
        return err
    }
    return nil
}

1.3.1.2、创建event.go

在internal/model目录下创建event.go文件,用来存储事件数据:

type Events struct {
    Id          uint64 `json:"id" gorm:"primary_key;AUTO_INCREMENT" `
    Address     string `json:"address" gorm:"type:char(42)" `
    Data        string `json:"data" gorm:"type:longtext" `
    BlockNumber uint64 `json:"block_number"`
    TxHash      string `json:"tx_hash" gorm:"type:char(66)" `
    TxIndex     uint   `json:"tx_index" `
    BlockHash   string `json:"block_hash" gorm:"type:varchar(256)" `
    LogIndex    uint   `json:"log_index"`
    Removed     bool   `json:"removed"`
    *gorm.Model
}

func (e *Events) TableName() string {
    return "events"
}

func (e *Events) Insert() error {
    if err := global.DBEngine.Create(&e).Error; err != nil {
        return err
    }
    return nil
}

func (e *Events) GetEventByTxHash() (*Events, error) {
    var event Events
    if err := global.DBEngine.Where("tx_hash = ?", e.TxHash).First(&event).Error; err != nil {
        return nil, err
    }
    return &event, nil
}

1.3.1.3、创建topic.go

在internal/model目录下创建topic.go文件,用来存储事件的主题数据:

type Topic struct {
    Id      uint64 `json:"id" gorm:"primary_key;AUTO_INCREMENT" json:"id"`
    EventId uint64 `json:"event_id"`
    Topic   string `json:"topic" gorm:"type:longtext" `
    *gorm.Model
}

func (tc *Topic) TableName() string {
    return "topics"
}

func (tc *Topic) Insert() error {
    if err := global.DBEngine.Create(&tc).Error; err != nil {
        return err
    }
    return nil
}

1.3.1.4、修改MigrateDb方法

将db.go里面的MigrateDb()方法进行修改,如下:

// MigrateDb 初始化数据库表
func MigrateDb() error {
    if err := global.DBEngine.AutoMigrate(&models.Blocks{}, &models.Transaction{}, &models.Events{}, &models.Topic{}); err != nil {
        return err
    }
    return nil
}

1.3.2、准备工作

1.3.2.1、创建block.go

在pkg目录下,新建blockchain目录,然后在blockchain目录下新建block.go文件

1.3.2.2、初始化区块信息

我们查询区块信息的时候,需要一个区块高度参数,在我们项目刚创建的时候,数据库block表是空的,所以我们需要先进行第一个区块信息的初始化工作,在pkg/blockchain/block.go文件新建InitBlock()方法:

// InitBlock 初始化第一个区块数据
func InitBlock() {
    block := &models.Blocks{}
    count := block.Counts()
    if count == 0 {
        lastBlockNumber, err := global.EthRpcClient.BlockNumber(context.Background())
        if err != nil {
            log.Panic("InitBlock - BlockNumber err : ", err)
        }
        lastBlock, err := global.EthRpcClient.BlockByNumber(context.Background(), big.NewInt(int64(lastBlockNumber)))

        if err != nil {
            log.Panic("InitBlock - BlockByNumber err : ", err)
        }
        block.BlockHash = lastBlock.Hash().Hex()
        block.BlockHeight = lastBlock.NumberU64()
        block.LatestBlockHeight = lastBlock.NumberU64()
        block.ParentHash = lastBlock.ParentHash().Hex()
        err = block.Insert()
        if err != nil {
            log.Panic("InitBlock - Insert block err : ", err)
        }
    }
}

以上代码主要做了几个工作:

  1. 先查询数据库中是否已经存在block记录
  2. 若不存在,查询最新区块高度
  3. 通过区块高度查询最新区块信息
  4. 组装数据,存储到数据库

1.3.3、持久化数据

1.3.3.1、新建执行任务方法

pkg/blockchain/block.go文件新建SyncTask()方法,我们希望程序可以间隔一段时间从链上拉取数据,在项目中,我们可以使用 ticker来实现,声明一个 ticker对象,示例中是一秒时间间隔,然后通过chan(通道)取值,进行定时操作:

func SyncTask() {
    ticker := time.NewTicker(time.Second * 1)
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            latestBlockNumber, err := global.EthRpcClient.BlockNumber(context.Background())
            if err != nil {
                log.Panic("EthRpcClient.BlockNumber error : ", err)
            }
            var blocks models.Blocks
            latestBlock, err := blocks.GetLatest()
            if err != nil {
                log.Panic("blocks.GetLatest error : ", err)
            }
            if latestBlock.LatestBlockHeight > latestBlockNumber {
                log.Printf("latestBlock.LatestBlockHeight : %v greater than latestBlockNumber : %v \n", latestBlock.LatestBlockHeight, latestBlockNumber)
                continue
            }
            currentBlock, err := global.EthRpcClient.BlockByNumber(context.Background(), big.NewInt(int64(latestBlock.LatestBlockHeight)))
            if err != nil {
                log.Panic("EthRpcClient.BlockByNumber error : ", err)
            }
            log.Printf("get currentBlock blockNumber : %v , blockHash : %v \n", currentBlock.Number(), currentBlock.Hash().Hex())
            err = HandleBlock(currentBlock)
            if err != nil {
                log.Panic("HandleBlock error : ", err)
            }
        }
    }
}

上面代码主要完成操作:

  1. 获取最新区块高度
  2. 从数据库查询最新存储的区块数据
  3. 判断数据库存储的最新区块链高度是否大于查询的最新区块高度
  4. 如果大于则跳出循环不执行后面操作,反之通过数据库存储的最新区块链高度查询区块信息
  5. 通过HandleBlock()方法处理最新区块信息(存储到数据库)

1.3.3.2、处理区块数据

HandleBlock()方法如下:

// HandleBlock 处理区块信息
func HandleBlock(currentBlock *types.Block) error {
    block := &models.Blocks{
        BlockHeight:       currentBlock.NumberU64(),
        BlockHash:         currentBlock.Hash().Hex(),
        ParentHash:        currentBlock.ParentHash().Hex(),
        LatestBlockHeight: currentBlock.NumberU64() + 1,
    }
    err := block.Insert()
    if err != nil {
        return err
    }
    err = HandleTransaction(currentBlock)
    if err != nil {
        return err
    }
    return nil
}

上面代码主要完成工作:

  1. 处理区块数据,存储到数据库
  2. 调用HandleTransaction()方法处理区块里包含的交易数据

1.3.3.3、处理交易数据

pkg/blockchain目录下新建transaction.go文件:

// HandleTransaction 处理交易数据
func HandleTransaction(block *types.Block) error {
    for _, tx := range block.Transactions() {
        receipt, err := global.EthRpcClient.TransactionReceipt(context.Background(), tx.Hash())
        if err != nil {
            log.Error("get transaction fail", "err", err)
        }
        for _, rLog := range receipt.Logs {
            err = HandleTransactionEvent(rLog, receipt.Status)
            if err != nil {
                log.Error("process transaction event fail", "err", err)
            }
        }
        err = ProcessTransaction(tx, block.Number(), receipt.Status)
        if err != nil {
            log.Error("process transaction fail", "err", err)
        }
    }
    return nil
}

func ProcessTransaction(tx *types.Transaction, blockNumber *big.Int, status uint64) error {
    from, err := types.Sender(types.LatestSignerForChainID(tx.ChainId()), tx)
    if err != nil {
        log.Error("Failed to read the sender address", "TxHash", tx.Hash(), "err", err)
        return err
    }
    log.Info("hand transaction", "txHash", tx.Hash().String())
    transaction := &models.Transaction{
        BlockNumber: blockNumber.Uint64(),
        TxHash:      tx.Hash().Hex(),
        From:        from.Hex(),
        Value:       tx.Value().String(),
        Status:      status,
        InputData:   hex.EncodeToString(tx.Data()),
    }
    if tx.To() == nil {
        log.Info("Contract creation found", "Sender", transaction.From, "TxHash", transaction.TxHash)
        toAddress := crypto.CreateAddress(from, tx.Nonce()).Hex()
        transaction.Contract = toAddress
    } else {
        isContract, err := isContractAddress(tx.To().Hex())
        if err != nil {
            return err
        }
        if isContract {
            transaction.Contract = tx.To().Hex()
        } else {
            transaction.To = tx.To().Hex()
        }
    }
    err = transaction.Insert()
    if err != nil {
        log.Error("insert transaction fail", "err", err)
        return err
    }
    return nil
}

1.3.3.4、处理事件数据

pkg/blockchain目录下新建event.go文件:

func HandleTransactionEvent(rLog *types.Log, status uint64) error {
    log.Info("ProcessTransactionEvent", "address", rLog.Address, "data", rLog.Data)
    event := &models.Events{
        Address:     rLog.Address.String(),
        Data:        "",
        BlockNumber: rLog.BlockNumber,
        TxHash:      rLog.TxHash.String(),
        TxIndex:     rLog.TxIndex,
        BlockHash:   rLog.BlockHash.String(),
        LogIndex:    rLog.Index,
        Removed:     rLog.Removed,
    }
    err := event.Insert()
    if err != nil {
        log.Error("event.Insert() fail", "err", err)
        return err
    }
    evt, err := event.GetEventByTxHash()
    if err != nil {
        log.Error("event.GetEventByTxHash() fail", "err", err)
        return err
    }
    log.Info("Topics", "topic", rLog.Topics)
    for _, tp := range rLog.Topics {
        topic := &models.Topic{
            EventId: evt.Id,
            Topic:   tp.String(),
        }
        err := topic.Insert()
        if err != nil {
            log.Error("topic.Insert() fail", "err", err)
            return err
        }
    }
    return nil
}

1.4、验证

1.4.1、修改main.go

修改main()方法:

func main() {
    blockchain.InitBlock()
    blockchain.SyncTask()
}

1.4.2、执行

执行main()方法,正常打印结果如下:

image.png

1.4.3、查看数据库信息

1.4.3.1、block表

image.png

1.4.3.2、transaction表

image.png

1.4.3.3、event表

image.png

1.4.3.4、topic表

image.png

数据已经正确的插入到数据库中,说明我们的程序是正常运作的

通过本章的学习,我们完成了1)链上数据拉取,2)链上数据解析,3)链上数据持久化,其实对于区块的数据解析还可以更深入,比如判断是否是ERC721/ERC20合约创建,然后根据实际的业务进行不同的处理,本章就不详细讲解了,如果有想了解的同学,可以私信我,到此,【使用go获取链上数据】系列文章就全部完结了,有任何问题欢迎给我留言

请关注公众号:外柏叁布道者(web3_preacher),回复 “go获取链上数据” 领取完整代码

点赞 3
收藏 1
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
外柏叁布道者
外柏叁布道者
0x6ea1...9dbe
全网同名 资深区块链专家 更多web3、区块链技术与前沿信息 请关注公众号:外柏叁布道者(web3_preacher) Web3工具网站现已上线: https://utools.me 不定时更新各种实用工具,敬请关注 接各种Dapp、合约、web3相关工具开发