以太坊数据采集与热门合约统计:SpringBoot+Web3J端到端实战

·

关键词:以太坊区块数据、SpringBoot、Web3J、Kafka、实时热门合约、断点续传方案、区块链ETL


项目全景:你为什么需要一条“实时链上数据源”

每天上千笔智能合约部署、数百万次交易悄然发生;谁能最快识别出高热度合约,谁就能抓住掏钱、报价、套利的第一波红利。本文用SpringBoot + Web3J + Kafka帮你搭一条“实时以太坊区块数据管道”,将链上数据直接沉淀到消息队列,后续可做:

读完即可在本地拉通完整链路,生产部署时再按文末的“断点续传”建议做高可用扩展。


技术栈与核心关键词


实战目录结构

源码已做脱敏处理,可直接复制到你的 IDEA 中运行。
ethereum-data-pipeline/
 ├─ src/main/java/
 │  ├─ entity/
 │  │  └─ EthTransactions.java
 │  ├─ enums/
 │  │  └─ TransactionEnum.java
 │  ├─ pipeline/
 │  │  └─ EtherDataPipeline.java
 │  └─ runner/
 │     └─ InitApplicationDataSyncRunner.java
 └─ pom.xml & application.yml

Step-1 连接以太坊网络:30 秒获取高速 RPC

  1. 打开 Alchemy 注册免费账号
  2. 创建 Ethereum Mainnet App → 复制 HTTPS URL 形如
    https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY

👉 只需要 30 秒,就能立刻申请直连主网的高可用节点


Step-2 引入依赖:SpringBoot+Web3J 一条龙

pom.xml 追加(已对齐 2025 LTS 版本):

<dependencies>
    <!-- Web3J 全家桶 -->
    <dependency>
        <groupId>org.web3j</groupId>
        <artifactId>web3j-spring-boot-starter</artifactId>
        <version>4.0.3</version>
    </dependency>
    <dependency>
        <groupId>org.web3j</groupId>
        <artifactId>core</artifactId>
        <version>4.9.7</version>
    </dependency>
    <dependency>
        <groupId>org.web3j</groupId>
        <artifactId>crypto</artifactId>
        <version>4.9.7</version>
    </dependency>

    <!-- Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <!-- Gson 用于 POJO 转 JSON -->
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
    </dependency>
</dependencies>

Step-3 application.yml 双通道配置

web3j:
  client-address: https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY

server:
  port: 8900

spring:
  application:
    name: ethereum-data-pipeline
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: ods_transaction
      enable-auto-commit: true
      auto-offset-reset: earliest
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432

Step-4 设计 EthTransactions 实体

按照以太坊 RPC 返回字段 + 业务聚合字段创建 POJO,可以直接塞进 Kafka。

public class EthTransactions {
    private String hash;
    private String blockHash;
    private BigInteger blockNumber;
    private String from;
    private String to;
    private BigInteger value;
    private BigInteger timestamp;
    private Integer transactionsType; // 0=普通转账, 1=合约调用
    // getter/setter 省略
}

Step-5 核心管道逻辑:EtherDataPipeline

核心关键词:区块数据、Kafka、实时同步、合约识别
public class EtherDataPipeline {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final Web3j web3j;
    private BigInteger blockNumber;
    private final Gson gson = new Gson();

    public EtherDataPipeline(KafkaTemplate<String, String> kt, Web3j w3) throws Exception {
        this.kafkaTemplate = kt;
        this.web3j = w3;
        this.blockNumber = getLastBlockNumber();
    }

    private BigInteger getLastBlockNumber() throws Exception {
        return web3j.ethBlockNumber().sendAsync().get().getBlockNumber();
    }

    public void startSyncTransactionsData() throws Exception {
        BigInteger latest = getLastBlockNumber();
        if (!blockNumber.equals(latest)) {
            blockNumber = latest;
            syncTransactionsDataToKafka();
        }
    }

    private void syncTransactionsDataToKafka() throws Exception {
        EthBlock block = web3j.ethGetBlockByNumber(
                new DefaultBlockParameterNumber(blockNumber), true).sendAsync().get();

        List<Object> txList = block.getBlock().getTransactions();
        BigInteger ts = block.getBlock().getTimestamp();

        txList.parallelStream().forEach(txObj -> {
            Transaction tx = (Transaction) txObj;

            Integer type = isContractTransaction(tx.getTo());

            EthTransactions record = new EthTransactions(
                    tx.getHash(), tx.getBlockHash(), tx.getBlockNumber(),
                    tx.getFrom(), tx.getTo(), tx.getValue(), ts, type);

            kafkaTemplate.send("ods_transactions", gson.toJson(record));
        });
    }

    private Integer isContractTransaction(String address) throws Exception {
        String code = web3j.ethGetCode(address,
                new DefaultBlockParameterNumber(blockNumber))
                .sendAsync().get().getResult();
        return "0x".equals(code) ? 0 : 1;
    }
}

代码已做异常简化,生产环境请加 Retry、Circuit Breaker 与指标埋点。


Step-6 自动启动:InitApplicationDataSyncRunner

@Slf4j
@Component
public class InitApplicationDataSyncRunner implements ApplicationRunner {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private Web3j web3j;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        EtherDataPipeline pipeline = new EtherDataPipeline(kafkaTemplate, web3j);
        while (true) {
            pipeline.startSyncTransactionsData();
            Thread.sleep(3000); // 每 3s 拉一次新区块
        }
    }
}

Step-7 验证:订阅 Kafka → 实时看到热门合约

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic ods_transactions --from-beginning | jq .

输出示例:

{
  "hash": "0x5c8a...",
  "blockNumber": 20664789,
  "to": "0xC364...",
  "transactionsType": 1
}

👉 手把手教你 10 分钟把手通过这种日志实时计算“NFT 抢购热度”并落地页 API


生产强化:断点续传 & 幂等投递

  1. 断点续传

    • 本地持久化最新已同步区块号(Redis / RocksDB);服务重启后从此高度继续。
  2. 幂等写入

    • Kafka message key 使用 hash+blockNumber,保证重试时不重复落库。
  3. 并行刷盘

    • syncTransactionsDataToKafka() 换成 批处理 并控制并发,防止 back-pressure。

场景拓展


FAQ:读者最常见疑问速解

Q1:每分钟上千条交易,仅用一条线程会不会卡?
A:当前演示是单机单线程;生产可把循环改为线程池,或使用 Web3J Reactive API 以 Replay 模式订阅 newHeads 通知,再把耗时计算放到异步任务。

Q2:如何摆脱 Alchemy 的流量高峰空间?
A:自建 Erigon / Geth 全节点兼用;然后通过 Nginx 负载均衡多 RPC;如出现高并发,可接入专用节点池(关键词:私有节点、WebSocket 长连接)。

Q3:Kafka topic 创建策略?
A:手动提前建 ods_transactions,5 分区、副本因子 2;这样即使单 Broker 故障,仍能保障连续性。

Q4:为什么我的 isContractTransaction 每次返回 0?
A:请检查 ethGetCode块高 与实际高度是否一致;初期链回追时可能出现 Reorg,导致短暂结果为空地址。

Q5:断点续传的存储方案谁最好用?
A:推荐 阿里云 TablestoreDynamoDB On-Demand,支持 Atomic update 且成本极低,一次写 < 1 ms。

Q6:如何识别“新部署合约”而不是已部署合约?
A:在块内监听 to == null 并且 input.length > 2 的交易 —— 这类属于 合约创建交易,再做二次校验。


结语

通过以上 7 步,你已经得到:

立刻启动项目,把链上数据变成实时洞察吧!