3. 金融数据流水线构建

3.金融数据流水线构建

金融数据是量化交易的基石,也是强化学习智能体理解市场的唯一窗口。在实际项目中,花费在数据获取、清洗和特征工程上的时间,往往占到整个开发周期的 60% 以上。FinRL 将这套繁琐的流程封装成标准化的流水线,让我们能把精力集中在策略设计上。这一章,我们深入数据层的实现细节,看看如何构建一个健壮、高效的金融数据流水线。

多源金融数据接入

FinRL 的数据层设计遵循统一接口原则。无论是美股、A 股还是加密货币,无论是日线数据还是分钟线数据,都通过标准化的 DataProcessor 接入。这种设计让我们切换数据源时,几乎不需要修改上层代码。

支持的数据源全景

FinRL 目前集成了 14 个主流数据平台,覆盖全球主要市场。每个数据源都有自己的特点和限制,选择时需要权衡数据质量、更新频率和调用成本。

数据源 市场范围 时间跨度 最高频率 调用限制
YahooFinance 美股 频率特定 1 分钟 每小时 2000 次
Alpaca 美股、ETF 2015 年至今 1 分钟 账户特定
Akshare A 股 2015 年至今 日线 账户特定
Baostock A 股 1990 年至今 5 分钟 账户特定
Tushare A 股 至今 1 分钟 账户特定
Binance 加密货币 API 特定 1 秒 API 特定
CCXT 加密货币 API 特定 1 分钟 API 特定
IEXCloud 美股 1970 年至今 日线 每秒 100 次
JoinQuant A 股 2005 年至今 1 分钟 每次 3 请求
RiceQuant A 股 2005 年至今 1 毫秒 账户特定
WRDS 美股 2003 年至今 1 毫秒 每次 5 请求
EODhistoricaldata 美股 频率特定 1 分钟 API 特定
QuantConnect 美股 1998 年至今 1 秒 无明确限制
Sinopac 台股 2023 年至今 1 分钟 账户特定

对于初学者和研究用途,YahooFinance 是最友好的选择。数据免费、无需注册、覆盖范围广,虽然频率和精度不如商业数据,但足以验证策略逻辑。对于 A 股研究,Akshare 和 Baostock 提供了完整的本土数据支持。加密货币交易则可以考虑 Binance 或 CCXT,它们能直接对接交易所实时数据。

YahooFinance 接入实战

FinRL 通过 YahooDownloader 类封装了 yfinance 库,将原始数据转换成统一的 DataFrame 格式。这个类隐藏了日期处理、股票代码验证、异常重试等细节。

from finrl.meta.preprocessor.yahoodownloader import YahooDownloader

# 定义时间范围和股票列表
TRAIN_START_DATE = '2009-01-01'
TRAIN_END_DATE = '2020-07-01'
TICKER_LIST = ['AAPL', 'MSFT', 'GOOGL']

# 创建下载器实例并获取数据
df = YahooDownloader(
    start_date=TRAIN_START_DATE,
    end_date=TRAIN_END_DATE,
    ticker_list=TICKER_LIST
).fetch_data()

print(df.head())

这段代码首先导入 YahooDownloader 类,然后指定训练数据的起止日期和股票代码列表。fetch_data 方法会并行下载所有股票的数据,返回一个标准的 DataFrame。输出结果包含以下列:

  • date:交易日期
  • tic:股票代码
  • open:开盘价
  • high:最高价
  • low:最低价
  • close:收盘价
  • volume:成交量
  • day:星期几(0 表示周一,4 表示周五)

注意这里使用的是调整后收盘价(adjusted close),已经考虑了分红、拆股等公司行为的影响。这是回测中必须使用的数据,否则会在除权除息日看到虚假的价格跳空。

下载过程会自动处理一些常见问题。比如某只股票在某个时间段内停牌,或者新上市股票数据不足,YahooDownloader 会填充 NaN 值并记录警告信息。对于大规模数据获取,建议分批次下载,避免触发每小时 2000 次的 IP 限制。

其他数据源接入模式

其他数据源的接入方式遵循相似的模式,只是参数略有不同。以 Alpaca 为例,它需要 API 密钥认证:

from finrl.meta.preprocessor.alpaca import AlpacaProcessor

# 配置 API 密钥
API_KEY = 'your_api_key'
API_SECRET = 'your_api_secret'
APCA_API_BASE_URL = 'https://paper-api.alpaca.markets'

# 创建处理器
processor = AlpacaProcessor(
    api_key=API_KEY,
    api_secret=API_SECRET,
    api_base_url=APCA_API_BASE_URL
)

# 获取数据
df = processor.download_data(
    ticker_list=TICKER_LIST,
    start_date=TRAIN_START_DATE,
    end_date=TRAIN_END_DATE,
    time_interval='1Min'
)

对于 A 股数据,Akshare 的接入方式类似,但需要注意交易日历的差异:

from finrl.meta.preprocessor.akshare import AkshareProcessor

processor = AkshareProcessor()
df = processor.download_data(
    ticker_list=['600519', '000858'],  # 贵州茅台和五粮液
    start_date='2015-01-01',
    end_date='2023-12-31',
    time_interval='1d'
)

每个处理器都实现了相同的接口规范,返回统一格式的 DataFrame。这种设计让我们可以在不同市场间无缝切换策略,只需要更换数据处理器即可。

数据清洗与质量检查

原始金融数据往往充满噪声和缺失值。不同数据源的格式差异、交易所的临时停牌、极端行情下的数据延迟,都会导致数据质量问题。FinRL 的清洗流程针对这些问题提供了自动化解决方案。

缺失数据的成因与影响

在低频数据(日线)中,缺失值通常意味着股票停牌。这种情况下直接删除缺失行是合理的,因为停牌期间确实无法交易。但在高频数据(分钟线)中,缺失值往往只是表示该分钟内没有成交,而非停牌。如果直接删除,会损失大量有效信息。

FinRL 团队做过一个测试:下载 2021 年上半年道琼斯 30 成分股的 1 分钟数据,原始数据有 39736 行。直接删除含 NaN 的行后,只剩下 3361 行,数据利用率不足 10%。这对于训练强化学习智能体来说是灾难性的,因为智能体需要足够的市场微观结构数据来学习。

低频数据清洗策略

对于日线数据,FinRL 采用直接删除法。数据处理器会检查每行数据是否完整,只要 open、high、low、close、volume 中有任意一个为 NaN,就整行删除。这种方法简单有效,因为停牌信息本身就可以作为一种市场状态,通过日期索引的间断来体现。

# 检查缺失数据
has_missing = df.isnull().values.any()
print(f"数据是否包含缺失值: {has_missing}")

# 查看每列的缺失情况
missing_stats = df.isnull().sum()
print(missing_stats)

# 删除含缺失值的行
df_cleaned = df.dropna()
print(f"清洗前: {len(df)} 行,清洗后: {len(df_cleaned)} 行")

这段代码首先检查整个 DataFrame 是否存在缺失值,然后统计每列的缺失数量。dropna 方法会删除所有包含 NaN 的行。对于多股票数据,建议按股票代码分组处理,避免某只股票的停牌影响其他股票的数据完整性。

高频数据清洗策略

对于分钟线或更高频率的数据,FinRL 采用前向填充(forward fill)策略。具体规则是:用上一个有效收盘价填充 open、high、low、close 列,将 volume 填充为 0。这样既保留了时间序列的连续性,又合理反映了无成交时段的市场状态。

def clean_high_frequency_data(df):
    """
    清洗高频数据,处理无成交时段的缺失值
    
    参数:
        df: 原始高频数据 DataFrame
        
    返回:
        清洗后的 DataFrame
    """
    # 按股票代码分组处理
    cleaned_stocks = []
    
    for tic in df['tic'].unique():
        tic_data = df[df['tic'] == tic].copy()
        tic_data = tic_data.sort_values('date')
        
        # 前向填充价格列
        price_columns = ['open', 'high', 'low', 'close']
        tic_data[price_columns] = tic_data[price_columns].fillna(method='ffill')
        
        # 成交量填充为 0
        tic_data['volume'] = tic_data['volume'].fillna(0)
        
        # 如果开盘前就有缺失,用第一个有效值向后填充
        tic_data[price_columns] = tic_data[price_columns].fillna(method='bfill')
        
        cleaned_stocks.append(tic_data)
    
    return pd.concat(cleaned_stocks, ignore_index=True)

这个函数按股票代码分组,确保填充不会跨股票污染数据。对于价格列,先用前向填充,再用后向填充处理开盘前的缺失值。成交量在无成交时段设为 0,符合实际情况。

异常值检测与处理

除了缺失值,金融数据还可能包含异常值。比如错误的最高价(比高低 10 倍以上)、负成交量等。FinRL 提供了基本的异常值检测:

def detect_anomalies(df):
    """
    检测数据中的异常值
    
    返回:
        异常值统计信息
    """
    anomalies = {}
    
    # 检查高价是否低于低价
    price_anomaly = df[df['high'] < df['low']]
    anomalies['high_low_swap'] = len(price_anomaly)
    
    # 检查价格是否非正
    for col in ['open', 'high', 'low', 'close']:
        non_positive = df[df[col] <= 0]
        anomalies[f'{col}_non_positive'] = len(non_positive)
    
    # 检查成交量是否为负
    negative_volume = df[df['volume'] < 0]
    anomalies['negative_volume'] = len(negative_volume)
    
    # 检查价格跳空是否超过 20%
    df_sorted = df.sort_values(['tic', 'date'])
    df_sorted['prev_close'] = df_sorted.groupby('tic')['close'].shift(1)
    df_sorted['price_gap'] = abs(df_sorted['open'] - df_sorted['prev_close']) / df_sorted['prev_close']
    large_gaps = df_sorted[df_sorted['price_gap'] > 0.2]
    anomalies['large_price_gaps'] = len(large_gaps)
    
    return anomalies

# 使用示例
anomaly_report = detect_anomalies(df_cleaned)
print("异常值检测报告:", anomaly_report)

这个检测函数会返回各类异常的数量。对于检测到的异常,通常的处理方式是删除对应行,或者将异常值设为 NaN 后按缺失值处理。在回测中,保守的做法是删除异常数据,避免模型学到错误的市场规律。

技术指标特征工程实践

原始的价格和成交量数据对智能体来说信息密度太低。技术指标通过数学变换,将价格序列中的趋势、波动、动量等抽象特征量化,帮助智能体更好地理解市场状态。FinRL 通过集成 stockstats 库,实现了 50 多种常用技术指标的自动计算。

核心技术指标解析

FinRL 默认配置包含 7 个基础指标,这些指标覆盖了趋势跟踪、震荡判断和波动率测量三个维度:

  • macd:移动平均收敛散度,反映价格趋势的强度和方向
  • boll_ubboll_lb:布林带上下轨,衡量价格波动范围
  • rsi_30:30 日相对强弱指标,判断超买超卖状态
  • dx_30:30 日动向指标,衡量趋势强度
  • close_30_smaclose_60_sma:30 日和 60 日简单移动平均,识别中期趋势

这些指标的选择经过大量实验验证,能在信息量和计算成本间取得平衡。当然,用户可以根据策略需求添加更多指标。

使用 FeatureEngineer 添加指标

FinRL 的 FeatureEngineer 类封装了整个特征工程流程。只需要几行代码,就能完成技术指标计算、数据标准化和特征选择。

from finrl.meta.preprocessor.preprocessors import FeatureEngineer
from finrl.config import INDICATORS

# 创建特征工程器
fe = FeatureEngineer(
    df.copy(),
    use_technical_indicator=True,
    tech_indicator_list=INDICATORS,
    use_turbulence=True,
    user_defined_feature=False
)

# 执行预处理
df_processed = fe.preprocess_data()

print(f"原始特征数: {len(df.columns)}")
print(f"处理后特征数: {len(df_processed.columns)}")
print("新增特征:", [col for col in df_processed.columns if col not in df.columns])

FeatureEngineer 的构造函数接受多个参数:

  • use_technical_indicator:是否计算技术指标
  • tech_indicator_list:需要计算的指标列表
  • use_turbulence:是否添加市场波动率指数
  • user_defined_feature:是否使用用户自定义特征

preprocess_data 方法内部会完成所有计算,并确保特征值在合理范围内。对于存在无限大或 NaN 值的特征,会自动用中位数填充。

手动计算技术指标

有时我们需要自定义指标计算逻辑,或者调试某个指标的具体行为。这时可以直接使用 stockstats 库:

from stockstats import StockDataFrame as Sdf

def calculate_custom_indicators(df):
    """
    手动计算技术指标,展示底层实现
    
    参数:
        df: 原始数据 DataFrame
        
    返回:
        包含新增指标的 DataFrame
    """
    # 转换为 StockDataFrame
    stock = Sdf.retype(df.copy())
    
    # 使用调整后的收盘价作为基准
    stock['close'] = stock['adjcp'] if 'adjcp' in stock else stock['close']
    
    # 获取所有股票代码
    unique_ticker = stock.tic.unique()
    
    # 为每个指标创建临时 DataFrame
    indicators = {}
    for indicator in INDICATORS:
        indicators[indicator] = pd.DataFrame()
    
    # 按股票分别计算指标
    for i in range(len(unique_ticker)):
        temp_stock = stock[stock.tic == unique_ticker[i]]
        
        for indicator in INDICATORS:
            if indicator in temp_stock.columns:
                temp_indicator = temp_stock[indicator]
                temp_indicator = pd.DataFrame(temp_indicator)
                indicators[indicator] = indicators[indicator].append(
                    temp_indicator, ignore_index=True
                )
    
    # 合并所有指标到原始数据
    for indicator, data in indicators.items():
        df[indicator] = data.values
    
    return df

# 使用示例
df_with_indicators = calculate_custom_indicators(df_raw)

这个函数展示了 FeatureEngineer 的内部逻辑。它按股票代码分组,分别计算每个指标,最后合并结果。理解这个过程很重要,因为有些指标的计算依赖于正确的分组,跨股票计算会得到无意义的结果。

市场波动率指数(Turbulence Index)

波动率指数是 FinRL 的特色功能,用于衡量市场的极端波动状态。当市场进入危机模式时,传统的技术指标可能失效,此时需要降低仓位或暂停交易。波动率指数通过计算资产收益率的统计异常程度来实现这一目的。

def calculate_turbulence_index(df, lookback=252):
    """
    计算市场波动率指数
    
    参数:
        df: 多股票数据 DataFrame
        lookback: 回溯期,默认 252 个交易日(一年)
        
    返回:
        包含波动率指数的 DataFrame
    """
    # 按日期和股票代码排序
    df = df.sort_values(['date', 'tic']).reset_index(drop=True)
    
    # 创建透视表,每列是一只股票,每行是一个交易日的收盘价
    price_pivot = df.pivot_table(
        index='date',
        columns='tic',
        values='close'
    )
    
    # 计算收益率
    returns = price_pivot.pct_change().dropna()
    
    # 计算波动率指数
    turbulence_index = []
    for i in range(lookback, len(returns)):
        # 获取回溯期内的收益率数据
        lookback_returns = returns.iloc[i-lookback:i]
        
        # 计算均值和协方差矩阵
        mean_returns = lookback_returns.mean()
        cov_matrix = lookback_returns.cov()
        
        # 计算马氏距离(Mahalanobis distance)
        current_return = returns.iloc[i]
        diff = current_return - mean_returns
        turbulence = diff @ np.linalg.inv(cov_matrix) @ diff.T
        
        turbulence_index.append([returns.index[i], turbulence])
    
    # 转换为 DataFrame
    turbulence_df = pd.DataFrame(
        turbulence_index,
        columns=['date', 'turbulence']
    )
    
    return turbulence_df

# 使用示例
turbulence_df = calculate_turbulence_index(df_processed)
df_final = df_processed.merge(turbulence_df, on='date')

波动率指数的本质是马氏距离,衡量当前市场状态与近期正常状态的偏离程度。当指数超过预设阈值(如 100)时,可以认为市场进入极端状态。在环境配置中,可以设置 turbulence_threshold 参数,让智能体在极端市场中采取更保守的策略。

协方差矩阵特征

对于投资组合优化任务,还需要计算资产间的协方差矩阵。这反映了不同股票收益率的相关性,是组合风险计算的基础。

def add_covariance_matrix(df, lookback=252):
    """
    添加协方差矩阵作为状态特征
    
    参数:
        df: 多股票数据 DataFrame
        lookback: 回溯期
        
    返回:
        包含协方差矩阵的 DataFrame
    """
    # 按日期排序并建立索引
    df = df.sort_values(['date', 'tic']).reset_index(drop=True)
    df.index = df.date.factorize()[0]
    
    cov_list = []
    
    # 对每个时间点计算协方差矩阵
    for i in range(lookback, len(df.index.unique())):
        # 获取回溯期数据
        data_lookback = df.loc[i-lookback:i, :]
        
        # 创建价格透视表
        price_lookback = data_lookback.pivot_table(
            index='date',
            columns='tic',
            values='close'
        )
        
        # 计算收益率和协方差
        return_lookback = price_lookback.pct_change().dropna()
        covs = return_lookback.cov().values
        
        cov_list.append(covs)
    
    # 创建协方差 DataFrame
    df_cov = pd.DataFrame({
        'date': df.date.unique()[lookback:],
        'cov_list': cov_list
    })
    
    # 合并到主数据
    df = df.merge(df_cov, on='date')
    df = df.sort_values(['date', 'tic']).reset_index(drop=True)
    
    return df

# 使用示例
df_with_cov = add_covariance_matrix(df_final)

协方差矩阵被展平成一个列表存储在 cov_list 列中。在环境中,这个矩阵会被重新 reshape,用于计算组合风险。对于 30 只股票的组合,协方差矩阵有 900 个元素,显著增加了状态空间的维度,但也提供了更丰富的风险信息。

数据存储与高效加载

经过清洗和特征工程的数据,通常需要保存到磁盘,避免每次训练都重复计算。对于大规模数据,存储格式和加载方式直接影响训练效率。

存储格式选择

FinRL 推荐使用 Parquet 格式存储处理后的数据。相比 CSV,Parquet 有以下优势:

  1. 列式存储:只读取需要的列,节省 IO 带宽
  2. 压缩率高:通常比 CSV 小 70% 以上
  3. 类型安全:保留数据类型信息,避免加载时类型推断
  4. 支持分块:可以并行读写,适合大数据集
import pandas as pd

def save_processed_data(df, file_path):
    """
    保存处理后的数据到 Parquet 格式
    
    参数:
        df: 处理后的 DataFrame
        file_path: 保存路径,建议以 .parquet 结尾
    """
    # 确保日期列是 datetime 类型
    if 'date' in df.columns:
        df['date'] = pd.to_datetime(df['date'])
    
    # 重置索引,避免多层索引导致保存失败
    df_to_save = df.reset_index(drop=True)
    
    # 保存为 Parquet 格式
    df_to_save.to_parquet(
        file_path,
        engine='pyarrow',
        compression='snappy',
        index=False
    )
    
    print(f"数据已保存至 {file_path}")
    print(f"文件大小: {pd.io.formats.format.get_size(df_to_save)}")
    print(f"数据形状: {df_to_save.shape}")

# 使用示例
save_processed_data(df_with_cov, 'data/dow30_processed.parquet')

保存时,我们显式指定使用 pyarrow 引擎和 snappy 压缩算法。snappy 在压缩率和速度间取得了良好平衡。对于协方差矩阵这类复杂对象,需要先转换为字符串或二进制格式,因为 Parquet 不直接支持嵌套数组。

高效加载策略

加载数据时,可以利用 Parquet 的列式特性,只读取训练需要的特征列。对于超大数据集,还可以使用分块读取。

def load_training_data(file_path, required_columns=None):
    """
    高效加载训练数据
    
    参数:
        file_path: Parquet 文件路径
        required_columns: 需要的列列表,None 表示加载所有列
        
    返回:
        DataFrame
    """
    if required_columns is None:
        # 默认加载所有列
        df = pd.read_parquet(file_path, engine='pyarrow')
    else:
        # 只加载需要的列,节省内存和时间
        df = pd.read_parquet(
            file_path,
            engine='pyarrow',
            columns=required_columns
        )
    
    # 恢复日期索引
    if 'date' in df.columns:
        df['date'] = pd.to_datetime(df['date'])
        df = df.sort_values(['date', 'tic']).reset_index(drop=True)
    
    print(f"数据加载完成,形状: {df.shape}")
    print(f"内存占用: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
    
    return df

# 使用示例:只加载必要特征
required_cols = ['date', 'tic', 'close', 'volume', 'macd', 'rsi_30', 'turbulence']
df_train = load_training_data('data/dow30_processed.parquet', required_cols)

对于多股票数据,建议按日期排序并建立索引。这样在环境 step 时,可以通过日期快速定位到当前时间步的数据,避免每次遍历整个 DataFrame。

内存优化技巧

当处理数千只股票、数年分钟线数据时,内存占用可能成为瓶颈。以下是一些优化技巧:

def optimize_memory_usage(df):
    """
    优化 DataFrame 内存占用
    
    参数:
        df: 原始 DataFrame
        
    返回:
        优化后的 DataFrame
    """
    df_optimized = df.copy()
    
    # 遍历每列,选择最优数据类型
    for col in df_optimized.columns:
        col_type = df_optimized[col].dtype
        
        if col_type != object:
            c_min = df_optimized[col].min()
            c_max = df_optimized[col].max()
            
            if str(col_type)[:3] == 'int':
                # 优化整数类型
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df_optimized[col] = df_optimized[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df_optimized[col] = df_optimized[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df_optimized[col] = df_optimized[col].astype(np.int32)
            else:
                # 优化浮点数类型
                if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df_optimized[col] = df_optimized[col].astype(np.float32)
    
    # 对股票代码列使用 category 类型
    if 'tic' in df_optimized.columns:
        df_optimized['tic'] = df_optimized['tic'].astype('category')
    
    # 对日期列使用 datetime 类型
    if 'date' in df_optimized.columns:
        df_optimized['date'] = pd.to_datetime(df_optimized['date'])
    
    # 打印内存优化报告
    original_mem = df.memory_usage(deep=True).sum() / 1024**2
    optimized_mem = df_optimized.memory_usage(deep=True).sum() / 1024**2
    
    print(f"原始内存占用: {original_mem:.2f} MB")
    print(f"优化后内存占用: {optimized_mem:.2f} MB")
    print(f"内存节省: {(1 - optimized_mem/original_mem)*100:.1f}%")
    
    return df_optimized

# 使用示例
df_optimized = optimize_memory_usage(df_train)

这个函数通过向下转换数据类型显著减少内存占用。例如,将 float64 转换为 float32 可以节省 50% 的内存,而精度损失对训练影响很小。股票代码列使用 category 类型,因为它们是重复出现的字符串。对于包含数百万行的数据集,这种优化可以将内存占用从数 GB 降低到几百 MB。

总结

金融数据流水线是连接市场与智能体的桥梁。FinRL 通过统一的数据处理器、自动化的清洗流程和灵活的特征工程,将繁琐的数据准备工作转化为标准化的配置过程。掌握这些工具,意味着我们可以快速验证不同市场、不同频率、不同特征集的策略想法,将更多时间投入到算法设计和调优中。

数据质量直接决定了策略表现的上限。再精巧的算法也无法从垃圾数据中学到有效的交易模式。因此,在实际项目中,建议花足够时间检查数据完整性、验证指标计算逻辑、观察特征分布。TensorBoard 是可视化这些数据质量检查结果的绝佳工具,下一章我们将看到如何将数据流与训练过程无缝集成。

现在,我们已经准备好了干净、丰富、高效的数据,接下来就要设计交易环境,让智能体在模拟市场中学习和成长。环境设计是强化学习应用中最具创造性的部分,它决定了智能体如何观察市场、如何执行动作、如何获得反馈。让我们进入第 4 章,探索 FinRL 的环境架构与配置技巧。