工程师指南: 量化交易策略构建与验证

本文介绍了一个量化交易策略开发的完整框架,包括数据收集、策略实施、统计验证、模拟和最终评估。文章使用Donchian Channel Breakout策略作为示例,详细讲解了如何通过数据清洗和准备、策略实现、参数优化、蒙特卡洛置换检验和步进分析等步骤,来构建、测试和验证交易策略,以避免过拟合,并最终做出是否部署策略的决策。

从数据收集到统计验证——开发有利可图的交易算法的严格框架

介绍

开发成功的量化交易策略不仅仅需要一个好主意。它需要一种系统的、科学严谨的方法,将真正的市场无效性与统计噪声区分开来。本指南提供了一个完整的框架,任何工程师都可以遵循该框架,以统计置信度构建、测试和验证交易策略。

我们将逐步完成整个过程:

  1. 数据收集 —— 收集和清理市场数据
  2. 策略实施 —— 编写唐奇安通道突破策略
  3. 统计验证 —— 避免过度拟合的四步框架
  4. 模拟 —— 具有交易成本的真实回测
  5. 最终评估 —— 做出批准/不批准的决定

到最后,你将拥有一个完整的、可复制的策略开发框架,专业量化分析师在实践中使用该框架。

策略:唐奇安通道突破

我们将使用经典的趋势跟踪策略来演示此框架:唐奇安通道突破。逻辑很简单:

  • 做多信号:当价格突破最近 N 个周期的最高价时进入
  • 做空信号:当价格跌破最近 N 个周期的最低价时进入
  • 退出:在相反的信号上反转头寸

该策略假设近期范围的突破表明新趋势的开始。

第 1 步:数据收集和准备

高质量的数据是可靠回测的基础。我们将使用 CCXT 库从多个交易所获取加密货币数据。

安装依赖

pip install ccxt pandas numpy scipy matplotlib seaborn yfinance

完整的数据收集脚本

import ccxt
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import time
import os
from typing import List, Tuple, Optional
class CryptoDataCollector:
    """
    使用 CCXT 的强大的加密货币数据收集器。
    处理速率限制、数据验证和存储。
    """    def __init__(self, exchange_name: str = 'binance'):
        """使用指定的交易所初始化数据收集器。"""
        self.exchange = getattr(ccxt, exchange_name)({
            'rateLimit': 1200,  # 对速率限制保持保守
            'enableRateLimit': True,
        })    def fetch_ohlcv_data(self,
                        symbol: str = 'BTC/USDT',
                        timeframe: str = '1h',
                        start_date: str = '2020-01-01',
                        end_date: Optional[str] = None) -> pd.DataFrame:
        """
        获取历史 OHLCV 数据,具有适当的错误处理和验证。

        Args:
            symbol: 交易对 (例如, 'BTC/USDT')
            timeframe: 蜡烛图时间范围 ('1m', '5m', '1h', '1d')
            start_date: 开始日期,格式为 'YYYY-MM-DD'
            end_date: 结束日期,格式为 'YYYY-MM-DD' (如果为 None,则为当前日期)

        Returns:
            带有 OHLCV 数据的 pandas.DataFrame
        """
        print(f"Fetching {symbol} {timeframe} data from {start_date}...")
        # 将日期转换为时间戳
        since_timestamp = self.exchange.parse8601(f"{start_date}T00:00:00Z")
        end_timestamp = None
        if end_date:
            end_timestamp = self.exchange.parse8601(f"{end_date}T23:59:59Z")        all_candles = []
        current_timestamp = since_timestamp        while True:
            try:
                # 分块获取数据
                candles = self.exchange.fetch_ohlcv(
                    symbol, timeframe, current_timestamp, limit=1000
                )                if not candles or len(candles) == 0:
                    break                # 如果指定了结束日期,则按结束日期进行过滤
                if end_timestamp:
                    candles = [c for c in candles if c[0] <= end_timestamp]
                    if not candles:
                        break                all_candles.extend(candles)                # 更新时间戳以进行下一次迭代
                last_timestamp = candles[-1][0]
                current_timestamp = last_timestamp + self._timeframe_to_ms(timeframe)                # 检查是否已到达结束日期
                if end_timestamp and current_timestamp > end_timestamp:
                    break                print(f"Fetched {len(candles)} candles, total: {len(all_candles)}")                # 遵守速率限制
                time.sleep(self.exchange.rateLimit / 1000)            except Exception as e:
                print(f"Error fetching data: {e}")
                time.sleep(5)  # 重试前等待
                continue        # 转换为 DataFrame 并清理
        df = self._process_raw_data(all_candles)        # 保存到文件
        filename = f"{symbol.replace('/', '')}_{timeframe}_{start_date}_{end_date or 'latest'}.parquet"
        df.to_parquet(filename)
        print(f"Data saved to {filename}")        return df    def _timeframe_to_ms(self, timeframe: str) -> int:
        """将时间范围字符串转换为毫秒"""
        timeframes = {
            '1m': 60 * 1000,
            '5m': 5 * 60 * 1000,
            '15m': 15 * 60 * 1000,
            '1h': 60 * 60 * 1000,
            '4h': 4 * 60 * 60 * 1000,
            '1d': 24 * 60 * 60 * 1000,
        }
        return timeframes.get(timeframe, 60 * 60 * 1000)    def _process_raw_data(self, raw_data: List) -> pd.DataFrame:
        """将原始 OHLCV 数据处理为干净的 DataFrame"""
        df = pd.DataFrame(raw_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])        # 转换时间戳并设置为索引
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
        df.set_index('timestamp', inplace=True)        # 确保数字类型
        numeric_columns = ['open', 'high', 'low', 'close', 'volume']
        df[numeric_columns] = df[numeric_columns].astype(float)        # 删除重复项并排序
        df = df[~df.index.duplicated(keep='first')]
        df.sort_index(inplace=True)        # 基本数据验证
        self._validate_data(df)        return df    def _validate_data(self, df: pd.DataFrame) -> None:
        """验证 OHLCV 数据的完整性"""
        # 检查是否存在缺失值
        if df.isnull().any().any():
            print("Warning: Found missing values in data")        # 检查是否存在不可能的价格关系
        invalid_rows = (df['high'] < df['low']) | (df['high'] < df['open']) | \\\\
                      (df['high'] < df['close']) | (df['low'] > df['open']) | \\\\
                      (df['low'] > df['close'])        if invalid_rows.any():
            print(f"Warning: Found {invalid_rows.sum()} rows with invalid OHLC relationships")        # 检查是否存在负价格或负交易量
        if (df[['open', 'high', 'low', 'close', 'volume']] < 0).any().any():
            print("Warning: Found negative prices or volumes")        print(f"Data validation complete. Shape: {df.shape}")
        print(f"Date range: {df.index.min()} to {df.index.max()}")# 用法示例
if __name__ == "__main__":
    collector = CryptoDataCollector('binance')    # 获取 4 年的每小时比特币数据
    btc_data = collector.fetch_ohlcv_data(
        symbol='BTC/USDT',
        timeframe='1h',
        start_date='2020-01-01',
        end_date='2024-01-01'
    )    print("\\\\nSample data:")
    print(btc_data.head())
    print(f"\\\\nData shape: {btc_data.shape}")

第 2 步:策略实施

现在,让我们使用适当的信号生成和绩效跟踪来实施唐奇安通道突破策略。

import pandas as pd
import numpy as np
from typing import Tuple, Dict, List
import matplotlib.pyplot as plt
import seaborn as sns
class DonchianStrategy:
    """
    唐奇安通道突破策略的实现,具有全面的分析。
    """    def __init__(self, lookback_period: int = 20):
        """
        使用指定的回溯周期初始化策略。

        Args:
            lookback_period: 用于计算最高价/最低价的回溯周期数
        """
        self.lookback_period = lookback_period
        self.signals = None
        self.returns = None
        self.positions = None    def generate_signals(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        根据唐奇安通道突破生成交易信号。

        Args:
            data: 包含 OHLCV 数据的 DataFrame

        Returns:
            包含信号和相关计算的 DataFrame
        """
        df = data.copy()        # 计算唐奇安通道
        df['upper_channel'] = df['high'].rolling(window=self.lookback_period, min_periods=self.lookback_period).max()
        df['lower_channel'] = df['low'].rolling(window=self.lookback_period, min_periods=self.lookback_period).min()        # 生成信号
        df['signal'] = 0  # 0 = 无头寸, 1 = 多头, -1 = 空头        # 多头信号:收盘价突破上通道
        long_condition = df['close'] > df['upper_channel'].shift(1)
        df.loc[long_condition, 'signal'] = 1        # 空头信号:收盘价跌破下通道
        short_condition = df['close'] < df['lower_channel'].shift(1)
        df.loc[short_condition, 'signal'] = -1        # 向前填充信号(保持头寸直到新信号)
        df['position'] = df['signal'].replace(0, np.nan).ffill().fillna(0)        # 计算回报
        df['price_return'] = df['close'].pct_change()
        df['strategy_return'] = df['position'].shift(1) * df['price_return']        # 存储结果
        self.signals = df
        self.returns = df['strategy_return'].dropna()
        self.positions = df['position']        return df    def calculate_performance_metrics(self) -> Dict[str, float]:
        """计算全面的绩效指标"""
        if self.returns is None:
            raise ValueError("Must generate signals first")        returns = self.returns.dropna()        # 基本回报指标
        total_return = (1 + returns).prod() - 1
        annualized_return = (1 + returns.mean()) ** (252 * 24) - 1  # 假设每小时数据
        volatility = returns.std() * np.sqrt(252 * 24)
        sharpe_ratio = annualized_return / volatility if volatility > 0 else 0        # 回撤分析
        cumulative_returns = (1 + returns).cumprod()
        rolling_max = cumulative_returns.expanding().max()
        drawdown = (cumulative_returns - rolling_max) / rolling_max
        max_drawdown = drawdown.min()        # 盈亏分析
        winning_returns = returns[returns > 0]
        losing_returns = returns[returns < 0]        win_rate = len(winning_returns) / len(returns) if len(returns) > 0 else 0
        avg_win = winning_returns.mean() if len(winning_returns) > 0 else 0
        avg_loss = losing_returns.mean() if len(losing_returns) > 0 else 0        # 利润因子
        gross_profit = winning_returns.sum()
        gross_loss = abs(losing_returns.sum())
        profit_factor = gross_profit / gross_loss if gross_loss > 0 else np.inf        # 其他指标
        calmar_ratio = annualized_return / abs(max_drawdown) if max_drawdown != 0 else 0        return {
            'total_return': total_return,
            'annualized_return': annualized_return,
            'volatility': volatility,
            'sharpe_ratio': sharpe_ratio,
            'max_drawdown': max_drawdown,
            'calmar_ratio': calmar_ratio,
            'win_rate': win_rate,
            'avg_win': avg_win,
            'avg_loss': avg_loss,
            'profit_factor': profit_factor,
            'total_trades': len(returns),
            'winning_trades': len(winning_returns),
            'losing_trades': len(losing_returns)
        }    def plot_strategy_performance(self, data: pd.DataFrame) -> None:
        """绘制策略表现和信号"""
        fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(15, 12))        # 价格和通道
        ax1.plot(data.index, data['close'], label='收盘价', alpha=0.7)
        ax1.plot(data.index, data['upper_channel'], label='上通道', linestyle='--', alpha=0.7)
        ax1.plot(data.index, data['lower_channel'], label='下通道', linestyle='--', alpha=0.7)        # 标记信号
        long_signals = data[data['signal'] == 1]
        short_signals = data[data['signal'] == -1]        ax1.scatter(long_signals.index, long_signals['close'], color='green', marker='^', s=50, label='多头信号')
        ax1.scatter(short_signals.index, short_signals['close'], color='red', marker='v', s=50, label='空头信号')        ax1.set_title(f'唐奇安通道突破 (回溯: {self.lookback_period})')
        ax1.legend()
        ax1.grid(True, alpha=0.3)        # 头寸
        ax2.plot(data.index, data['position'], label='头寸', linewidth=2)
        ax2.set_title('随时间变化的头寸')
        ax2.set_ylabel('头寸')
        ax2.legend()
        ax2.grid(True, alpha=0.3)        # 累积回报
        cumulative_returns = (1 + data['strategy_return'].fillna(0)).cumprod()
        cumulative_buy_hold = (1 + data['price_return'].fillna(0)).cumprod()        ax3.plot(data.index, cumulative_returns, label='策略', linewidth=2)
        ax3.plot(data.index, cumulative_buy_hold, label='买入并持有', alpha=0.7)
        ax3.set_title('累积回报比较')
        ax3.set_ylabel('累积回报')
        ax3.legend()
        ax3.grid(True, alpha=0.3)        plt.tight_layout()
        plt.show()# 用法示例
def test_single_strategy():
    """测试具有单个参数集的策略。"""
    # 加载你的数据 (替换为实际数据加载)
    # btc_data = pd.read_parquet('BTCUSDT_1h_2020-01-01_2024-01-01.parquet')    # 为了演示,创建合成数据
    np.random.seed(42)
    dates = pd.date_range('2020-01-01', '2024-01-01', freq='1H')
    price = 100 * np.exp(np.cumsum(np.random.normal(0.0001, 0.02, len(dates))))    synthetic_data = pd.DataFrame({
        'open': price * (1 + np.random.normal(0, 0.001, len(dates))),
        'high': price * (1 + np.abs(np.random.normal(0, 0.005, len(dates)))),
        'low': price * (1 - np.abs(np.random.normal(0, 0.005, len(dates)))),
        'close': price,
        'volume': np.random.uniform(1000, 10000, len(dates))
    }, index=dates)    # 测试策略
    strategy = DonchianStrategy(lookback_period=20)
    signals_df = strategy.generate_signals(synthetic_data)
    metrics = strategy.calculate_performance_metrics()    print("Performance Metrics:")
    for key, value in metrics.items():
        print(f"{key}: {value:.4f}")    strategy.plot_strategy_performance(signals_df)if __name__ == "__main__":
    test_single_strategy()

第 3 步:参数优化框架

接下来,我们将构建一个强大的优化框架,该框架将测试多个参数组合并跟踪结果。

import itertools
from concurrent.futures import ProcessPoolExecutor
import warnings
warnings.filterwarnings('ignore')
class StrategyOptimizer:
    """
    使用并行处理和详细跟踪的全面策略优化。
    """    def __init__(self, data: pd.DataFrame):
        """
        使用市场数据初始化优化器。

        Args:
            data: 包含 OHLCV 数据的 DataFrame
        """
        self.data = data
        self.optimization_results = None    def optimize_parameters(self,
                          lookback_range: range = range(5, 51, 5),
                          objective_function: str = 'profit_factor',
                          n_jobs: int = 4) -> pd.DataFrame:
        """
        使用并行处理优化策略参数。

        Args:
            lookback_range: 要测试的回溯周期范围
            objective_function: 要优化的指标 ('profit_factor', 'sharpe_ratio' 等)
            n_jobs: 并行进程数

        Returns:
            包含优化结果的 DataFrame
        """
        print(f"Optimizing parameters over {len(lookback_range)} combinations...")        # 准备参数组合
        param_combinations = [(lookback,) for lookback in lookback_range]        # 并行运行优化
        with ProcessPoolExecutor(max_workers=n_jobs) as executor:
            results = list(executor.map(
                self._test_parameter_combination,
                param_combinations
            ))        # 转换为 DataFrame
        results_df = pd.DataFrame(results)
        results_df = results_df.sort_values(objective_function, ascending=False)        self.optimization_results = results_df        print(f"Optimization complete. Best {objective_function}: {results_df.iloc[0][objective_function]:.4f}")
        print(f"Best parameters: Lookback = {results_df.iloc[0]['lookback_period']}")        return results_df    def _test_parameter_combination(self, params: Tuple) -> Dict:
        """测试单个参数组合"""
        lookback_period = params[0]        try:
            strategy = DonchianStrategy(lookback_period=lookback_period)
            strategy.generate_signals(self.data)
            metrics = strategy.calculate_performance_metrics()            # 添加参数信息
            metrics['lookback_period'] = lookback_period            return metrics        except Exception as e:
            print(f"Error testing lookback {lookback_period}: {e}")
            return {
                'lookback_period': lookback_period,
                'profit_factor': 0,
                'sharpe_ratio': 0,
                'total_return': 0,
                'max_drawdown': -1,
                'win_rate': 0
            }    def plot_optimization_results(self, top_n: int = 10) -> None:
        """绘制优化结果"""
        if self.optimization_results is None:
            raise ValueError("Must run optimization first")        results = self.optimization_results.head(top_n)        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))        # 利润因子
        ax1.bar(range(len(results)), results['profit_factor'])
        ax1.set_title('按等级划分的利润因子')
        ax1.set_xlabel('等级')
        ax1.set_ylabel('利润因子')
        ax1.grid(True, alpha=0.3)        # 夏普比率
        ax2.bar(range(len(results)), results['sharpe_ratio'])
        ax2.set_title('按等级划分的夏普比率')
        ax2.set_xlabel('等级')
        ax2.set_ylabel('夏普比率')
        ax2.grid(True, alpha=0.3)        # 总回报
        ax3.bar(range(len(results)), results['total_return'] * 100)
        ax3.set_title('按等级划分的总回报 (%)')
        ax3.set_xlabel('等级')
        ax3.set_ylabel('总回报 (%)')
        ax3.grid(True, alpha=0.3)        # 最大回撤
        ax4.bar(range(len(results)), results['max_drawdown'] * 100)
        ax4.set_title('按等级划分的最大回撤 (%)')
        ax4.set_xlabel('等级')
        ax4.set_ylabel('最大回撤 (%)')
        ax4.grid(True, alpha=0.3)        plt.tight_layout()
        plt.show()# 用法示例
def run_optimization():
    """运行参数优化示例"""
    # 加载或创建数据
    np.random.seed(42)
    dates = pd.date_range('2020-01-01', '2023-01-01', freq='1H')
    price = 100 * np.exp(np.cumsum(np.random.normal(0.0001, 0.02, len(dates))))    data = pd.DataFrame({
        'open': price * (1 + np.random.normal(0, 0.001, len(dates))),
        'high': price * (1 + np.abs(np.random.normal(0, 0.005, len(dates)))),
        'low': price * (1 - np.abs(np.random.normal(0, 0.005, len(dates)))),
        'close': price,
        'volume': np.random.uniform(1000, 10000, len(dates))
    }, index=dates)    # 运行优化
    optimizer = StrategyOptimizer(data)
    results = optimizer.optimize_parameters(
        lookback_range=range(5, 51, 5),
        objective_function='profit_factor'
    )    print("\\\\nTop 5 Results:")
    print(results[['lookback_period', 'profit_factor', 'sharpe_ratio', 'total_return', 'max_drawdown']].head())    optimizer.plot_optimization_results()if __name__ == "__main__":
    run_optimization()

第 4 步:蒙特卡罗置换检验

这是将真实优势与统计侥幸分开的关键步骤。我们将实施全面的置换检验,以验证我们的结果。

from scipy import stats
import random
from typing import Callable
class MonteCarloValidator:
    """
    用于策略验证的蒙特卡罗置换检验。
    确定策略表现是否具有统计显着性。
    """    def __init__(self, data: pd.DataFrame, n_simulations: int = 1000):
        """
        使用市场数据初始化验证器。

        Args:
            data: 包含 OHLCV 数据的 DataFrame
            n_simulations: 蒙特卡罗模拟次数
        """
        self.data = data
        self.n_simulations = n_simulations
        self.permutation_results = None    def run_permutation_test(self,
                           optimal_lookback: int,
                           test_statistic: str = 'profit_factor') -> Dict[str, float]:
        """
        在优化的策略上运行蒙特卡罗置换检验。

        Args:
            optimal_lookback: 来自优化的最佳回溯周期
            test_statistic: 要测试显着性的指标

        Returns:
            包含测试结果(包括 **p** 值)的字典
        """
        print(f"Running {self.n_simulations} Monte Carlo simulations...")        # 计算实际的策略表现
        actual_strategy = DonchianStrategy(lookback_period=optimal_lookback)
        actual_strategy.generate_signals(self.data)
        actual_metrics = actual_strategy.calculate_performance_metrics()
        actual_test_stat = actual_metrics[test_statistic]        print(f"Actual {test_statistic}: {actual_test_stat:.4f}")        # 运行置换检验
        permutation_stats = []        for i in range(self.n_simulations):
            if i % 100 == 0:
                print(f"Simulation {i}/{self.n_simulations}")            # 创建排列的数据
            permuted_data = self._permute_data()            # 在排列的数据上运行完整优化
            permuted_optimizer = StrategyOptimizer(permuted_data)
            permuted_results = permuted_optimizer.optimize_parameters(
                lookback_range=range(5, 51, 5),
                objective_function=test_statistic,
                n_jobs=1  # 对置换检验使用单个进程
            )            # 从排列的优化中获取最佳结果
            best_permuted_stat = permuted_results.iloc[0][test_statistic]
            permutation_stats.append(best_permuted_stat)        # 计算 **p** 值
        permutation_stats = np.array(permutation_stats)
        p_value = np.mean(permutation_stats >= actual_test_stat)        # 存储结果
        self.permutation_results = {
            'actual_statistic': actual_test_stat,
            'permutation_stats': permutation_stats,
            'p_value': p_value,
            'test_statistic': test_statistic,
            'mean_permuted': np.mean(permutation_stats),
            'std_permuted': np.std(permutation_stats),
            'percentile_95': np.percentile(permutation_stats, 95),
            'percentile_99': np.percentile(permutation_stats, 99)
        }        print(f"\\\\nMonte Carlo Permutation Test Results:")
        print(f"Actual {test_statistic}: {actual_test_stat:.4f}")
        print(f"Mean of permuted results: {np.mean(permutation_stats):.4f}")
        print(f"95th percentile of permuted results: {np.percentile(permutation_stats, 95):.4f}")
        print(f"P-value: {p_value:.4f}")        return self.permutation_results    def _permute_data(self) -> pd.DataFrame:
        """
        创建数据的排列版本,该版本保留统计属性
        但会破坏任何真实模式。
        """
        permuted_data = self.data.copy()        # 方法 1:块引导程序(保留短期相关性)
        block_size = 24  # 每小时数据为 24 小时
        n_blocks = len(permuted_data) // block_size        # 创建随机块索引
        block_indices = []
        for _ in range(n_blocks + 1):  # +1 确保我们覆盖所有数据
            start_idx = random.randint(0, len(permuted_data) - block_size - 1)
            block_indices.extend(range(start_idx, start_idx + block_size))        # 调整到原始长度
        block_indices = block_indices[:len(permuted_data)]        # 在保留价格结构的同时,将排列应用于回报
        returns = permuted_data['close'].pct_change().dropna()
        permuted_returns = returns.iloc[block_indices[1:]].values  # 跳过第一个 NaN        # 从排列的回报中重建价格序列
        permuted_prices = [permuted_data['close'].iloc[0]]
        for ret in permuted_returns:
            permuted_prices.append(permuted_prices[-1] * (1 + ret))        # 按比例更新所有价格列
        price_ratio = np.array(permuted_prices) / permuted_data['close'].iloc[:len(permuted_prices)].values        for col in ['open', 'high', 'low', 'close']:
            permuted_data[col].iloc[:len(permuted_prices)] *= price_ratio        return permuted_data    def plot_permutation_results(self) -> None:
        """绘制置换检验结果的直方图"""
        if self.permutation_results is None:
            raise ValueError("Must run permutation test first")        results = self.permutation_results        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))        # 置换结果的直方图
        ax1.hist(results['permutation_stats'], bins=50, alpha=0.7, color='skyblue',
                density=True, label='排列结果')
        ax1.axvline(results['actual_statistic'], color='red', linestyle='--', linewidth=2,
                   label=f'实际策略 ({results["actual_statistic"]:.4f})')
        ax1.axvline(results['percentile_95'], color='orange', linestyle='--',
                   label=f'第 95 个百分位数 ({results["percentile_95"]:.4f})')
        ax1.set_xlabel(results['test_statistic'].replace('_', ' ').title())
        ax1.set_ylabel('密度')
        ax1.set_title(f'蒙特卡罗置换检验\\\\nP-value: {results["p_value"]:.4f}')
        ax1.legend()
        ax1.grid(True, alpha=0.3)        # Q-Q 图检查置换结果的正态性
        stats.probplot(results['permutation_stats'], dist="norm", plot=ax2)
        ax2.set_title('置换结果的 Q-Q 图')
        ax2.grid(True, alpha=0.3)        plt.tight_layout()
        plt.show()# 用法示例
def run_monte_carlo_test():
    """运行完整的蒙特卡罗验证示例"""
    # 创建具有一定趋势的合成数据
    np.random.seed(42)
    dates = pd.date_range('2020-01-01', '2023-01-01', freq='1H')    # 添加轻微趋势以使其更逼真
    trend = np.linspace(0, 0.5, len(dates))
    noise = np.random.normal(0, 0.02, len(dates))
    returns = trend/len(dates) + noise    price = 100 * np.exp(np.cumsum(returns))    data = pd.DataFrame({
        'open': price * (1 + np.random.normal(0, 0.001, len(dates))),
        'high': price * (1 + np.abs(np.random.normal(0, 0.005, len(dates)))),
        'low': price * (1 - np.abs(np.random.normal(0, 0.005, len(dates)))),
        'close': price,
        'volume': np.random.uniform(1000, 10000, len(dates))
    }, index=dates)    # 首先优化
    print("Step 1: Parameter Optimization")
    optimizer = StrategyOptimizer(data)
    optimization_results = optimizer.optimize_parameters(
        lookback_range=range(10, 31, 5),
        objective_function='profit_factor'
    )    best_lookback = optimization_results.iloc[0]['lookback_period']
    best_profit_factor = optimization_results.iloc[0]['profit_factor']    print(f"\\\\nBest parameters: Lookback = {best_lookback}, Profit Factor = {best_profit_factor:.4f}")    # 然后运行蒙特卡罗检验
    print("\\\\nStep 2: Monte Carlo Validation")
    validator = MonteCarloValidator(data, n_simulations=100)  # 减少了用于演示
    mc_results = validator.run_permutation_test(
        optimal_lookback=int(best_lookback),
        test_statistic='profit_factor'
    )    validator.plot_permutation_results()    # 解释结果
    print(f"\\\\nInterpretation:")
    if mc_results['p_value'] < 0.01:
        print("✅ Strategy is statistically significant (p < 0.01)")
    elif mc_results['p_value'] < 0.05:
        print("⚠️  Strategy is marginally significant (p < 0.05)")
    else:
        print("❌ Strategy is NOT statistically significant")
        print("   This suggests the performance may be due to overfitting")if __name__ == "__main__":
    run_monte_carlo_test()

第 5 步:步进式分析

步进式分析通过在滚动窗口上持续重新优化参数来模拟真实世界的交易。

    def __init__(self, data: pd.DataFrame):
        self.data = data
        self.results = []    def run_walk_forward(self,
                        train_months: int = 12,
                        test_months: int = 1,
                        step_months: int = 1) -> pd.DataFrame:
        """
        Run walk-forward analysis with rolling optimization windows.
        **使用滚动优化窗口运行步进式分析。**
        Args:
            train_months: Training window size in months
            **train_months: 训练窗口大小,以月为单位**
            test_months: Testing window size in months
            **test_months: 测试窗口大小,以月为单位**
            step_months: Step size between windows in months
            **step_months: 窗口之间的步长,以月为单位**
        """
        print("Running Walk-Forward Analysis...")
        start_date = self.data.index[0]
        end_date = self.data.index[-1]        current_date = start_date + pd.DateOffset(months=train_months)
        results = []        while current_date + pd.DateOffset(months=test_months) <= end_date:
            # Define train and test periods
            # **定义训练和测试周期**
            train_start = current_date - pd.DateOffset(months=train_months)
            train_end = current_date
            test_start = current_date
            test_end = current_date + pd.DateOffset(months=test_months)            # Get train and test data
            # **获取训练和测试数据**
            train_data = self.data[train_start:train_end]
            test_data = self.data[test_start:test_end]            if len(train_data) < 100 or len(test_data) < 10:
                current_date += pd.DateOffset(months=step_months)
                continue            # Optimize on training data
            # **在训练数据上进行优化**
            optimizer = StrategyOptimizer(train_data)
            opt_results = optimizer.optimize_parameters(
                lookback_range=range(5, 51, 5),
                objective_function='profit_factor',
                n_jobs=1
            )            if len(opt_results) == 0:
                current_date += pd.DateOffset(months=step_months)
                continue            best_lookback = opt_results.iloc[0]['lookback_period']            # Test on out-of-sample data
            # **在样本外数据上进行测试**
            strategy = DonchianStrategy(lookback_period=int(best_lookback))
            test_signals = strategy.generate_signals(test_data)
            test_metrics = strategy.calculate_performance_metrics()            # Store results
            # **存储结果**
            result = {
                'train_start': train_start,
                'train_end': train_end,
                'test_start': test_start,
                'test_end': test_end,
                'optimal_lookback': best_lookback,
                'oos_profit_factor': test_metrics['profit_factor'],
                'oos_sharpe_ratio': test_metrics['sharpe_ratio'],
                'oos_total_return': test_metrics['total_return'],
                'oos_max_drawdown': test_metrics['max_drawdown'],
                'oos_win_rate': test_metrics['win_rate']
            }
            results.append(result)            print(f"Period {test_start.date()} to {test_end.date()}: "
                  f"PF={test_metrics['profit_factor']:.3f}, "
                  f"SR={test_metrics['sharpe_ratio']:.3f}")            current_date += pd.DateOffset(months=step_months)        self.results = pd.DataFrame(results)
        return self.results    def plot_walk_forward_results(self):
        """Plot walk-forward analysis results."""
        # **绘制步进式分析结果。**
        if len(self.results) == 0:
            raise ValueError("Must run walk-forward analysis first")        fig, axes = plt.subplots(2, 2, figsize=(15, 10))        # Profit Factor over time
        # **随时间变化的盈利因子**
        axes[0,0].plot(self.results['test_start'], self.results['oos_profit_factor'], 'o-')
        axes[0,0].axhline(y=1.0, color='red', linestyle='--', alpha=0.7)
        axes[0,0].set_title('Out-of-Sample Profit Factor')
        axes[0,0].set_ylabel('Profit Factor')
        axes[0,0].grid(True, alpha=0.3)        # Sharpe Ratio
        # **夏普比率**
        axes[0,1].plot(self.results['test_start'], self.results['oos_sharpe_ratio'], 'o-', color='green')
        axes[0,1].axhline(y=0, color='red', linestyle='--', alpha=0.7)
        axes[0,1].set_title('Out-of-Sample Sharpe Ratio')
        axes[0,1].set_ylabel('Sharpe Ratio')
        axes[0,1].grid(True, alpha=0.3)        # Total Return
        # **总回报**
        axes[1,0].plot(self.results['test_start'], self.results['oos_total_return']*100, 'o-', color='purple')
        axes[1,0].axhline(y=0, color='red', linestyle='--', alpha=0.7)
        axes[1,0].set_title('Out-of-Sample Total Return (%)')
        axes[1,0].set_ylabel('Return (%)')
        axes[1,0].grid(True, alpha=0.3)        # Max Drawdown
        # **最大回撤**
        axes[1,1].plot(self.results['test_start'], self.results['oos_max_drawdown']*100, 'o-', color='orange')
        axes[1,1].set_title('Out-of-Sample Max Drawdown (%)')
        axes[1,1].set_ylabel('Drawdown (%)')
        axes[1,1].grid(True, alpha=0.3)        plt.tight_layout()
        plt.show()        # Summary statistics
        # **概要统计**
        print("\\\\nWalk-Forward Analysis Summary:")
        print(f"Total periods: {len(self.results)}")
        print(f"Average OOS Profit Factor: {self.results['oos_profit_factor'].mean():.3f}")
        print(f"Average OOS Sharpe Ratio: {self.results['oos_sharpe_ratio'].mean():.3f}")
        print(f"Profitable periods: {(self.results['oos_total_return'] > 0).sum()}/{len(self.results)}")
        print(f"Win rate: {(self.results['oos_total_return'] > 0).mean()*100:.1f}%")

Step 6: Complete Strategy Simulation

步骤6:完整的策略模拟

Finally, let’s implement a realistic simulation that includes transaction costs, slippage, and position sizing.

最后,让我们实现一个现实的模拟,包括交易成本、滑点和头寸规模。

class StrategySimulator:
    """
    Realistic strategy simulation with transaction costs and risk management.
    **具有交易成本和风险管理的现实策略模拟。**
    """
    def __init__(self, initial_capital: float = 100000):
        self.initial_capital = initial_capital
        self.current_capital = initial_capital
        self.positions = []
        self.trades = []    def simulate_strategy(self,
                         data: pd.DataFrame,
                         strategy: DonchianStrategy,
                         transaction_cost: float = 0.001,  # 0.1% per trade
                         slippage: float = 0.0005,  # 0.05% slippage
                         max_position_size: float = 0.95) -> Dict:
        """
        Run complete strategy simulation with realistic trading costs.
        **运行具有实际交易成本的完整策略模拟。**
        Args:
            data: Market data
            **data:市场数据**
            strategy: Configured strategy instance
            **strategy:配置的策略实例**
            transaction_cost: Transaction cost as fraction
            **transaction_cost:作为分数形式的交易成本**
            slippage: Slippage as fraction
            **slippage:作为分数形式的滑点**
            max_position_size: Maximum position size as fraction of capital
            **max_position_size:作为资本分数形式的最大头寸规模**
        """
        signals_df = strategy.generate_signals(data)        portfolio_value = [self.initial_capital]
        positions = []
        current_position = 0
        entry_price = 0        for i, (timestamp, row) in enumerate(signals_df.iterrows()):
            if i == 0:
                continue            prev_signal = signals_df.iloc[i-1]['position']
            curr_signal = row['position']
            price = row['close']            # Check for signal change
            # **检查信号变化**
            if curr_signal != prev_signal:
                # Close existing position
                # **平仓现有头寸**
                if current_position != 0:
                    trade_return = self._execute_trade(
                        current_position, entry_price, price,
                        transaction_cost, slippage, 'close'
                    )
                    self.current_capital *= (1 + trade_return)                # Open new position
                # **开立新头寸**
                if curr_signal != 0:
                    position_size = curr_signal * max_position_size
                    entry_price = price * (1 + curr_signal * slippage)  # Adjust for slippage
                    current_position = position_size
                else:
                    current_position = 0
                    entry_price = 0            # Calculate portfolio value
            # **计算投资组合价值**
            if current_position != 0:
                unrealized_pnl = current_position * (price - entry_price) / entry_price
                portfolio_value.append(self.current_capital * (1 + unrealized_pnl))
            else:
                portfolio_value.append(self.current_capital)            positions.append(current_position)        # Calculate final metrics
        # **计算最终指标**
        returns = pd.Series(portfolio_value).pct_change().dropna()        metrics = {
            'total_return': (portfolio_value[-1] / self.initial_capital) - 1,
            'annualized_return': (portfolio_value[-1] / self.initial_capital) ** (252*24/len(data)) - 1,
            'volatility': returns.std() * np.sqrt(252*24),
            'sharpe_ratio': (returns.mean() * 252*24) / (returns.std() * np.sqrt(252*24)),
            'max_drawdown': self._calculate_max_drawdown(portfolio_value),
            'final_capital': portfolio_value[-1],
            'total_trades': len([p for p in positions if p != 0])
        }        return {
            'metrics': metrics,
            'portfolio_values': portfolio_value,
            'positions': positions
        }    def _execute_trade(self, position_size, entry_price, exit_price,
                      transaction_cost, slippage, trade_type):
        """Execute a trade with costs."""
        # **执行具有成本的交易。**
        if trade_type == 'close':
            gross_return = position_size * (exit_price - entry_price) / entry_price
            total_cost = transaction_cost + slippage
            net_return = gross_return - total_cost
            return net_return
        return 0    def _calculate_max_drawdown(self, portfolio_values):
        """Calculate maximum drawdown."""
        # **计算最大回撤。**
        peak = portfolio_values[0]
        max_dd = 0        for value in portfolio_values:
            if value > peak:
                peak = value
            dd = (peak - value) / peak
            if dd > max_dd:
                max_dd = dd        return max_dd## Putting It All Together: Complete Pipeline## **把它放在一起:完整流程**def complete_strategy_validation_pipeline():
    """
    Complete end-to-end strategy validation pipeline.
    **完整的端到端策略验证流程。**
    """
    print("=== QUANTITATIVE STRATEGY VALIDATION PIPELINE ===\\\\n")    # Step 1: Load Data
    # **步骤1:加载数据**
    print("1. Loading market data...")
    np.random.seed(42)
    dates = pd.date_range('2020-01-01', '2024-01-01', freq='1H')
    trend = np.linspace(0, 0.3, len(dates))
    noise = np.random.normal(0, 0.02, len(dates))
    returns = trend/len(dates) + noise
    price = 100 * np.exp(np.cumsum(returns))    data = pd.DataFrame({
        'open': price * (1 + np.random.normal(0, 0.001, len(dates))),
        'high': price * (1 + np.abs(np.random.normal(0, 0.005, len(dates)))),
        'low': price * (1 - np.abs(np.random.normal(0, 0.005, len(dates)))),
        'close': price,
        'volume': np.random.uniform(1000, 10000, len(dates))
    }, index=dates)    print(f"✓ Data loaded: {len(data)} candles from {data.index[0]} to {data.index[-1]}")    # Step 2: In-Sample Optimization
    # **步骤2:样本内优化**
    print("\\\\n2. In-sample parameter optimization...")
    train_data = data['2020-01-01':'2023-01-01']
    test_data = data['2023-01-01':'2024-01-01']    optimizer = StrategyOptimizer(train_data)
    opt_results = optimizer.optimize_parameters(
        lookback_range=range(10, 31, 5),
        objective_function='profit_factor'
    )    best_lookback = int(opt_results.iloc[0]['lookback_period'])
    best_pf = opt_results.iloc[0]['profit_factor']    print(f"✓ Best parameters: Lookback={best_lookback}, Profit Factor={best_pf:.3f}")    # Step 3: Monte Carlo Validation
    # **步骤 3:蒙特卡洛验证**
    print("\\\\n3. Monte Carlo permutation testing...")
    validator = MonteCarloValidator(train_data, n_simulations=50)  # Reduced for demo
    mc_results = validator.run_permutation_test(
        optimal_lookback=best_lookback,
        test_statistic='profit_factor'
    )    is_significant = mc_results['p_value'] < 0.05
    print(f"✓ Statistical significance: {'PASS' if is_significant else 'FAIL'} (p={mc_results['p_value']:.3f})")    # Step 4: Walk-Forward Analysis
    # **步骤4:步进式分析**
    print("\\\\n4. Walk-forward analysis...")
    wf_analyzer = WalkForwardAnalyzer(data)
    wf_results = wf_analyzer.run_walk_forward(
        train_months=12,
        test_months=1,
        step_months=1
    )    avg_oos_pf = wf_results['oos_profit_factor'].mean()
    print(f"✓ Average out-of-sample profit factor: {avg_oos_pf:.3f}")    # Step 5: Final Simulation
    # **步骤5:最终模拟**
    print("\\\\n5. Realistic trading simulation...")
    strategy = DonchianStrategy(lookback_period=best_lookback)
    simulator = StrategySimulator(initial_capital=100000)    sim_results = simulator.simulate_strategy(
        test_data,
        strategy,
        transaction_cost=0.001,
        slippage=0.0005
    )    final_return = sim_results['metrics']['total_return']
    sharpe = sim_results['metrics']['sharpe_ratio']
    max_dd = sim_results['metrics']['max_drawdown']    print(f"✓ Out-of-sample simulation complete:")
    print(f"  Total Return: {final_return*100:.2f}%")
    print(f"  Sharpe Ratio: {sharpe:.3f}")
    print(f"  Max Drawdown: {max_dd*100:.2f}%")    # Final Decision
    # **最终决定**
    print("\\\\n=== FINAL ASSESSMENT ===")    criteria = {
        'Statistical Significance': is_significant,
        'Positive Walk-Forward': avg_oos_pf > 1.0,
        'Positive Out-of-Sample': final_return > 0,
        'Acceptable Sharpe': sharpe > 0.5,
        'Manageable Drawdown': max_dd < 0.20
    }    passed_criteria = sum(criteria.values())
    total_criteria = len(criteria)    print(f"\\\\nCriteria passed: {passed_criteria}/{total_criteria}")
    for criterion, passed in criteria.items():
        status = "✓ PASS" if passed else "✗ FAIL"
        print(f"  {criterion}: {status}")    recommendation = "DEPLOY" if passed_criteria >= 4 else "REJECT"
    print(f"\\\\nRECOMMENDATION: {recommendation} this strategy")    if recommendation == "REJECT":
        print("\\\\nReasons for rejection:")
        for criterion, passed in criteria.items():
            if not passed:
                print(f"  - {criterion}")    return {
        'recommendation': recommendation,
        'criteria_passed': passed_criteria,
        'total_criteria': total_criteria,
        'final_metrics': sim_results['metrics']
    }# Run the complete pipeline
# **运行完整流程**
if __name__ == "__main__":
    results = complete_strategy_validation_pipeline()

Conclusion

结论

This comprehensive framework provides a scientifically rigorous approach to quantitative strategy development. The key principles are:

这个综合框架为量化策略的开发提供了一种科学严谨的方法。其关键原则是:

1. Statistical Rigor

1. 统计严谨性

  • Always test for statistical significance using permutation testing
  • 始终使用置换检验来测试统计显著性
  • Use multiple validation methods (in-sample, out-of-sample, walk-forward)
  • 使用多种验证方法(样本内、样本外、步进式)
  • Require strategies to pass multiple criteria before deployment
  • 要求策略在部署前通过多个标准

2. Realistic Simulation

2. 实际模拟

  • Include transaction costs and slippage in all testing
  • 在所有测试中包括交易成本和滑点
  • Use realistic position sizing and risk management
  • 使用实际的头寸规模和风险管理
  • Test on completely unseen data for final validation
  • 在完全未见过的数据上进行最终验证

3. Systematic Process

3. 系统流程

  • Follow the same validation framework for every strategy
  • 对每个策略都遵循相同的验证框架
  • Document all assumptions and parameters
  • 记录所有假设和参数
  • Maintain detailed records of all tests and results
  • 维护所有测试和结果的详细记录

4. Overfitting Prevention

4. 过拟合预防

  • Use Monte Carlo permutation testing to validate optimization results
  • 使用蒙特卡洛置换检验来验证优化结果
  • Employ walk-forward analysis to test adaptability
  • 采用步进式分析来测试适应性
  • Require significance at multiple levels (p < 0.05 minimum)
  • 要求多个级别的显著性(最低 p < 0.05)

Key Takeaways for Engineers:

工程师的主要收获:

  1. Data Quality Matters: Invest time in proper data collection and validation
  2. 数据质量至关重要:投入时间进行适当的数据收集和验证
  3. Beware of Overfitting: A strategy that performs well in-sample may fail out-of-sample
  4. 小心过拟合:在样本内表现良好的策略可能在样本外失败
  5. Statistical Significance: Use permutation testing to distinguish real edges from random noise
  6. 统计显著性:使用置换检验来区分真实优势和随机噪声
  7. Real-World Costs: Always include transaction costs, slippage, and market impact
  8. 真实世界的成本:始终包括交易成本、滑点和市场影响
  9. Multiple Validation: No single test is sufficient — use multiple validation methods
  10. 多重验证:没有一个测试是足够的——使用多重验证方法

This framework has successfully identified our Donchian strategy as statistically insignificant, saving us from deploying a strategy that would likely lose money in live trading. The process is designed to be conservative — it’s better to reject a potentially profitable strategy than to deploy one that will lose money.

该框架已成功将我们的唐奇安策略识别为统计上不显着,从而避免了我们部署可能在实际交易中赔钱的策略。该过程旨在保持保守——拒绝一个潜在的盈利策略比部署一个会赔钱的策略要好。

By following this systematic approach, you can build confidence in your quantitative strategies and avoid the common pitfalls that cause most algorithmic trading ventures to fail. Remember: in quantitative trading, the absence of evidence is not evidence of absence, but the presence of statistical significance is strong evidence of a real edge.

通过遵循这种系统的方法,你可以对你的量化策略建立信心,并避免导致大多数算法交易企业失败的常见陷阱。请记住:在量化交易中,缺乏证据并不是缺乏证据的证据,但是统计显著性的存在是真实优势的有力证据。

  • 原文链接: extremelysunnyyk.medium....
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
Yong kang Chia
Yong kang Chia
江湖只有他的大名,没有他的介绍。