本文介绍了如何使用QuickNode Functions将区块链数据流式传输到本地Kafka集群,包括Kafka集群的搭建、ngrok的配置、Function的创建及数据流的监控。
实时区块链数据处理通常需要强大的消息队列来处理高吞吐量并确保数据的可靠传递。 Apache Kafka 是这一用例中最流行的解决方案之一。
在本指南中,我们将向你展示如何设置本地 Kafka 集群,并使用 QuickNode Functions 将区块链数据流式传输到其中。
你可以在 QuickNode 的 awesome-functions GitHub 仓库 中找到本仓库中的代码。
观看这个使用 QuickNode Functions 设置和使用 Kafka 的简短演示:
通过 QuickNode Functions 将区块链数据流式传输到 Kafka 集群

通过 QuickNode Functions 将区块链数据流式传输到 Kafka 集群
7 分钟
20 次观看
0
1.2×
7 分钟⚡️8 分钟 36 秒6 分钟 53 秒5 分钟 44 秒4 分钟 35 秒4 分钟 3 秒3 分钟 26 秒2 分钟 45 秒
介绍
你的用户代理不支持 HTML5 视频元素。

通过 QuickNode Functions 将区块链数据流式传输到 Kafka 集群
7 分钟
20 次观看
0
1.2×
7 分钟⚡️8 分钟 36 秒6 分钟 53 秒5 分钟 44 秒4 分钟 35 秒4 分钟 3 秒3 分钟 26 秒2 分钟 45 秒
介绍
在这个演示中,你将看到:
现在,让我们逐步进行设置。
首先,让我们设置本地 Kafka 环境。为你的项目创建一个新目录,并添加以下 docker-compose.yml 文件:
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://${NGROK_URL:-localhost:9092}
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: PLAINTEXT
      DYNAMIC_CONFIG_ENABLED: 'true'
此配置设置了:
为了使我们的本地 Kafka 能够被 QuickNode Functions 访问,我们将使用 ngrok。打开终端并运行:
ngrok tcp 9092
你将收到一个转发 URL,例如 tcp://2.tcp.ngrok.io:18139。复制此 URL - 我们将在 Docker 和我们的 Function 中用到它。
现在,使用 ngrok URL 启动 Kafka。不要忘记将转发 URL 替换为你自己的 URL,并去掉 tcp:// 前缀:
NGROK_URL=2.tcp.ngrok.io:18139 docker-compose up -d
在 QuickNode 仪表板 中,创建一个新的 Function 并添加 kafkajs 作为依赖项。以下是我们的 Function 代码:
const { Kafka } = require('kafkajs');
// 使用你的 ngrok URL 配置 Kafka 代理,去掉 `tcp://` 前缀
const KAFKA_BROKER = process.env.KAFKA_BROKER || '2.tcp.ngrok.io:18139';  // 👈 替换为你自己的 ngrok URL
async function initializeKafka() {
    try {
        const kafka = new Kafka({
            clientId: 'quicknode-stream-producer',
            brokers: [KAFKA_BROKER],
            retry: {
                initialRetryTime: 100,
                retries: 5
            }
        });
        const producer = kafka.producer();
        await producer.connect();
        console.log('成功连接到 Kafka 代理');
        return producer;
    } catch (error) {
        console.error('初始化 Kafka 失败:', error);
        throw error;
    }
}
async function main(params) {
    let producer = null;
    try {
        producer = await initializeKafka();
        const {
            metadata: { dataset, network },
            data,
            user_data
        } = params;
        // 创建主题名称
        const sanitizedDataset = dataset.toLowerCase().replace(/[^a-z0-9-]/g, '-');
        const topic = `${network.toLowerCase()}-${sanitizedDataset}`;
        // 准备消息负载
        const messagePayload = {
            dataset,
            network,
            timestamp: new Date().toISOString(),
            data,
            user_data
        };
        // 发送到 Kafka
        const result = await producer.send({
            topic,
            messages: [\
                {\
                    key: `${network}-${dataset}-${Date.now()}`,\
                    value: JSON.stringify(messagePayload),\
                    headers: {\
                        network,\
                        dataset,\
                        timestamp: new Date().toISOString()\
                    }\
                }\
            ]
        });
        console.log(`成功将数据发送到 Kafka 主题 ${topic}`);
        // 返回前始终断开连接
        await producer.disconnect();
        console.log('Kafka 生产者已断开连接');
        return {
            status: 'success',
            message: `数据已发送到 Kafka 主题 ${topic}`,
            metadata: {
                dataset,
                network,
                kafka_result: result
            }
        };
    } catch (error) {
        if (producer) {
            await producer.disconnect();
            console.log('发生错误后 Kafka 生产者已断开连接');
        }
        return {
            status: 'error',
            message: error.message,
            metadata: {
                dataset: params.metadata?.dataset,
                network: params.metadata?.network
            }
        };
    }
}
module.exports = { main };
将测试参数保留为默认值,然后点击“保存并关闭”。
此 Function 的关键特性:
访问 http://localhost:8080 的 Kafka UI 以监控:

你还可以在 QuickNode 仪表板中检查你的 Function 日志,以获取详细的执行信息。
如果你看到连接错误:
如果主题没有出现:
如果消息没有出现:
虽然此设置非常适合开发,但在生产环境中,你应该考虑:
订阅我们的 新闻通讯 以获取更多关于区块链开发的文章和指南。无论你在构建什么,我们都希望听到你的声音。在 Discord 或 Twitter 上给我们留言,告诉我们你在做什么!
让我们知道 如果你有任何反馈或新主题的请求。我们非常乐意听取你的意见。
- 原文链接: quicknode.com/guides/qui...
- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
 
                如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!