3.金融数据流水线构建
金融数据是量化交易的基石,也是强化学习智能体理解市场的唯一窗口。在实际项目中,花费在数据获取、清洗和特征工程上的时间,往往占到整个开发周期的 60% 以上。FinRL 将这套繁琐的流程封装成标准化的流水线,让我们能把精力集中在策略设计上。这一章,我们深入数据层的实现细节,看看如何构建一个健壮、高效的金融数据流水线。
多源金融数据接入
FinRL 的数据层设计遵循统一接口原则。无论是美股、A 股还是加密货币,无论是日线数据还是分钟线数据,都通过标准化的 DataProcessor 接入。这种设计让我们切换数据源时,几乎不需要修改上层代码。
支持的数据源全景
FinRL 目前集成了 14 个主流数据平台,覆盖全球主要市场。每个数据源都有自己的特点和限制,选择时需要权衡数据质量、更新频率和调用成本。
| 数据源 | 市场范围 | 时间跨度 | 最高频率 | 调用限制 |
|---|---|---|---|---|
| YahooFinance | 美股 | 频率特定 | 1 分钟 | 每小时 2000 次 |
| Alpaca | 美股、ETF | 2015 年至今 | 1 分钟 | 账户特定 |
| Akshare | A 股 | 2015 年至今 | 日线 | 账户特定 |
| Baostock | A 股 | 1990 年至今 | 5 分钟 | 账户特定 |
| Tushare | A 股 | 至今 | 1 分钟 | 账户特定 |
| Binance | 加密货币 | API 特定 | 1 秒 | API 特定 |
| CCXT | 加密货币 | API 特定 | 1 分钟 | API 特定 |
| IEXCloud | 美股 | 1970 年至今 | 日线 | 每秒 100 次 |
| JoinQuant | A 股 | 2005 年至今 | 1 分钟 | 每次 3 请求 |
| RiceQuant | A 股 | 2005 年至今 | 1 毫秒 | 账户特定 |
| WRDS | 美股 | 2003 年至今 | 1 毫秒 | 每次 5 请求 |
| EODhistoricaldata | 美股 | 频率特定 | 1 分钟 | API 特定 |
| QuantConnect | 美股 | 1998 年至今 | 1 秒 | 无明确限制 |
| Sinopac | 台股 | 2023 年至今 | 1 分钟 | 账户特定 |
对于初学者和研究用途,YahooFinance 是最友好的选择。数据免费、无需注册、覆盖范围广,虽然频率和精度不如商业数据,但足以验证策略逻辑。对于 A 股研究,Akshare 和 Baostock 提供了完整的本土数据支持。加密货币交易则可以考虑 Binance 或 CCXT,它们能直接对接交易所实时数据。
YahooFinance 接入实战
FinRL 通过 YahooDownloader 类封装了 yfinance 库,将原始数据转换成统一的 DataFrame 格式。这个类隐藏了日期处理、股票代码验证、异常重试等细节。
from finrl.meta.preprocessor.yahoodownloader import YahooDownloader
# 定义时间范围和股票列表
TRAIN_START_DATE = '2009-01-01'
TRAIN_END_DATE = '2020-07-01'
TICKER_LIST = ['AAPL', 'MSFT', 'GOOGL']
# 创建下载器实例并获取数据
df = YahooDownloader(
start_date=TRAIN_START_DATE,
end_date=TRAIN_END_DATE,
ticker_list=TICKER_LIST
).fetch_data()
print(df.head())
这段代码首先导入 YahooDownloader 类,然后指定训练数据的起止日期和股票代码列表。fetch_data 方法会并行下载所有股票的数据,返回一个标准的 DataFrame。输出结果包含以下列:
date:交易日期tic:股票代码open:开盘价high:最高价low:最低价close:收盘价volume:成交量day:星期几(0 表示周一,4 表示周五)
注意这里使用的是调整后收盘价(adjusted close),已经考虑了分红、拆股等公司行为的影响。这是回测中必须使用的数据,否则会在除权除息日看到虚假的价格跳空。
下载过程会自动处理一些常见问题。比如某只股票在某个时间段内停牌,或者新上市股票数据不足,YahooDownloader 会填充 NaN 值并记录警告信息。对于大规模数据获取,建议分批次下载,避免触发每小时 2000 次的 IP 限制。
其他数据源接入模式
其他数据源的接入方式遵循相似的模式,只是参数略有不同。以 Alpaca 为例,它需要 API 密钥认证:
from finrl.meta.preprocessor.alpaca import AlpacaProcessor
# 配置 API 密钥
API_KEY = 'your_api_key'
API_SECRET = 'your_api_secret'
APCA_API_BASE_URL = 'https://paper-api.alpaca.markets'
# 创建处理器
processor = AlpacaProcessor(
api_key=API_KEY,
api_secret=API_SECRET,
api_base_url=APCA_API_BASE_URL
)
# 获取数据
df = processor.download_data(
ticker_list=TICKER_LIST,
start_date=TRAIN_START_DATE,
end_date=TRAIN_END_DATE,
time_interval='1Min'
)
对于 A 股数据,Akshare 的接入方式类似,但需要注意交易日历的差异:
from finrl.meta.preprocessor.akshare import AkshareProcessor
processor = AkshareProcessor()
df = processor.download_data(
ticker_list=['600519', '000858'], # 贵州茅台和五粮液
start_date='2015-01-01',
end_date='2023-12-31',
time_interval='1d'
)
每个处理器都实现了相同的接口规范,返回统一格式的 DataFrame。这种设计让我们可以在不同市场间无缝切换策略,只需要更换数据处理器即可。
数据清洗与质量检查
原始金融数据往往充满噪声和缺失值。不同数据源的格式差异、交易所的临时停牌、极端行情下的数据延迟,都会导致数据质量问题。FinRL 的清洗流程针对这些问题提供了自动化解决方案。
缺失数据的成因与影响
在低频数据(日线)中,缺失值通常意味着股票停牌。这种情况下直接删除缺失行是合理的,因为停牌期间确实无法交易。但在高频数据(分钟线)中,缺失值往往只是表示该分钟内没有成交,而非停牌。如果直接删除,会损失大量有效信息。
FinRL 团队做过一个测试:下载 2021 年上半年道琼斯 30 成分股的 1 分钟数据,原始数据有 39736 行。直接删除含 NaN 的行后,只剩下 3361 行,数据利用率不足 10%。这对于训练强化学习智能体来说是灾难性的,因为智能体需要足够的市场微观结构数据来学习。
低频数据清洗策略
对于日线数据,FinRL 采用直接删除法。数据处理器会检查每行数据是否完整,只要 open、high、low、close、volume 中有任意一个为 NaN,就整行删除。这种方法简单有效,因为停牌信息本身就可以作为一种市场状态,通过日期索引的间断来体现。
# 检查缺失数据
has_missing = df.isnull().values.any()
print(f"数据是否包含缺失值: {has_missing}")
# 查看每列的缺失情况
missing_stats = df.isnull().sum()
print(missing_stats)
# 删除含缺失值的行
df_cleaned = df.dropna()
print(f"清洗前: {len(df)} 行,清洗后: {len(df_cleaned)} 行")
这段代码首先检查整个 DataFrame 是否存在缺失值,然后统计每列的缺失数量。dropna 方法会删除所有包含 NaN 的行。对于多股票数据,建议按股票代码分组处理,避免某只股票的停牌影响其他股票的数据完整性。
高频数据清洗策略
对于分钟线或更高频率的数据,FinRL 采用前向填充(forward fill)策略。具体规则是:用上一个有效收盘价填充 open、high、low、close 列,将 volume 填充为 0。这样既保留了时间序列的连续性,又合理反映了无成交时段的市场状态。
def clean_high_frequency_data(df):
"""
清洗高频数据,处理无成交时段的缺失值
参数:
df: 原始高频数据 DataFrame
返回:
清洗后的 DataFrame
"""
# 按股票代码分组处理
cleaned_stocks = []
for tic in df['tic'].unique():
tic_data = df[df['tic'] == tic].copy()
tic_data = tic_data.sort_values('date')
# 前向填充价格列
price_columns = ['open', 'high', 'low', 'close']
tic_data[price_columns] = tic_data[price_columns].fillna(method='ffill')
# 成交量填充为 0
tic_data['volume'] = tic_data['volume'].fillna(0)
# 如果开盘前就有缺失,用第一个有效值向后填充
tic_data[price_columns] = tic_data[price_columns].fillna(method='bfill')
cleaned_stocks.append(tic_data)
return pd.concat(cleaned_stocks, ignore_index=True)
这个函数按股票代码分组,确保填充不会跨股票污染数据。对于价格列,先用前向填充,再用后向填充处理开盘前的缺失值。成交量在无成交时段设为 0,符合实际情况。
异常值检测与处理
除了缺失值,金融数据还可能包含异常值。比如错误的最高价(比高低 10 倍以上)、负成交量等。FinRL 提供了基本的异常值检测:
def detect_anomalies(df):
"""
检测数据中的异常值
返回:
异常值统计信息
"""
anomalies = {}
# 检查高价是否低于低价
price_anomaly = df[df['high'] < df['low']]
anomalies['high_low_swap'] = len(price_anomaly)
# 检查价格是否非正
for col in ['open', 'high', 'low', 'close']:
non_positive = df[df[col] <= 0]
anomalies[f'{col}_non_positive'] = len(non_positive)
# 检查成交量是否为负
negative_volume = df[df['volume'] < 0]
anomalies['negative_volume'] = len(negative_volume)
# 检查价格跳空是否超过 20%
df_sorted = df.sort_values(['tic', 'date'])
df_sorted['prev_close'] = df_sorted.groupby('tic')['close'].shift(1)
df_sorted['price_gap'] = abs(df_sorted['open'] - df_sorted['prev_close']) / df_sorted['prev_close']
large_gaps = df_sorted[df_sorted['price_gap'] > 0.2]
anomalies['large_price_gaps'] = len(large_gaps)
return anomalies
# 使用示例
anomaly_report = detect_anomalies(df_cleaned)
print("异常值检测报告:", anomaly_report)
这个检测函数会返回各类异常的数量。对于检测到的异常,通常的处理方式是删除对应行,或者将异常值设为 NaN 后按缺失值处理。在回测中,保守的做法是删除异常数据,避免模型学到错误的市场规律。
技术指标特征工程实践
原始的价格和成交量数据对智能体来说信息密度太低。技术指标通过数学变换,将价格序列中的趋势、波动、动量等抽象特征量化,帮助智能体更好地理解市场状态。FinRL 通过集成 stockstats 库,实现了 50 多种常用技术指标的自动计算。
核心技术指标解析
FinRL 默认配置包含 7 个基础指标,这些指标覆盖了趋势跟踪、震荡判断和波动率测量三个维度:
macd:移动平均收敛散度,反映价格趋势的强度和方向boll_ub和boll_lb:布林带上下轨,衡量价格波动范围rsi_30:30 日相对强弱指标,判断超买超卖状态dx_30:30 日动向指标,衡量趋势强度close_30_sma和close_60_sma:30 日和 60 日简单移动平均,识别中期趋势
这些指标的选择经过大量实验验证,能在信息量和计算成本间取得平衡。当然,用户可以根据策略需求添加更多指标。
使用 FeatureEngineer 添加指标
FinRL 的 FeatureEngineer 类封装了整个特征工程流程。只需要几行代码,就能完成技术指标计算、数据标准化和特征选择。
from finrl.meta.preprocessor.preprocessors import FeatureEngineer
from finrl.config import INDICATORS
# 创建特征工程器
fe = FeatureEngineer(
df.copy(),
use_technical_indicator=True,
tech_indicator_list=INDICATORS,
use_turbulence=True,
user_defined_feature=False
)
# 执行预处理
df_processed = fe.preprocess_data()
print(f"原始特征数: {len(df.columns)}")
print(f"处理后特征数: {len(df_processed.columns)}")
print("新增特征:", [col for col in df_processed.columns if col not in df.columns])
FeatureEngineer 的构造函数接受多个参数:
use_technical_indicator:是否计算技术指标tech_indicator_list:需要计算的指标列表use_turbulence:是否添加市场波动率指数user_defined_feature:是否使用用户自定义特征
preprocess_data 方法内部会完成所有计算,并确保特征值在合理范围内。对于存在无限大或 NaN 值的特征,会自动用中位数填充。
手动计算技术指标
有时我们需要自定义指标计算逻辑,或者调试某个指标的具体行为。这时可以直接使用 stockstats 库:
from stockstats import StockDataFrame as Sdf
def calculate_custom_indicators(df):
"""
手动计算技术指标,展示底层实现
参数:
df: 原始数据 DataFrame
返回:
包含新增指标的 DataFrame
"""
# 转换为 StockDataFrame
stock = Sdf.retype(df.copy())
# 使用调整后的收盘价作为基准
stock['close'] = stock['adjcp'] if 'adjcp' in stock else stock['close']
# 获取所有股票代码
unique_ticker = stock.tic.unique()
# 为每个指标创建临时 DataFrame
indicators = {}
for indicator in INDICATORS:
indicators[indicator] = pd.DataFrame()
# 按股票分别计算指标
for i in range(len(unique_ticker)):
temp_stock = stock[stock.tic == unique_ticker[i]]
for indicator in INDICATORS:
if indicator in temp_stock.columns:
temp_indicator = temp_stock[indicator]
temp_indicator = pd.DataFrame(temp_indicator)
indicators[indicator] = indicators[indicator].append(
temp_indicator, ignore_index=True
)
# 合并所有指标到原始数据
for indicator, data in indicators.items():
df[indicator] = data.values
return df
# 使用示例
df_with_indicators = calculate_custom_indicators(df_raw)
这个函数展示了 FeatureEngineer 的内部逻辑。它按股票代码分组,分别计算每个指标,最后合并结果。理解这个过程很重要,因为有些指标的计算依赖于正确的分组,跨股票计算会得到无意义的结果。
市场波动率指数(Turbulence Index)
波动率指数是 FinRL 的特色功能,用于衡量市场的极端波动状态。当市场进入危机模式时,传统的技术指标可能失效,此时需要降低仓位或暂停交易。波动率指数通过计算资产收益率的统计异常程度来实现这一目的。
def calculate_turbulence_index(df, lookback=252):
"""
计算市场波动率指数
参数:
df: 多股票数据 DataFrame
lookback: 回溯期,默认 252 个交易日(一年)
返回:
包含波动率指数的 DataFrame
"""
# 按日期和股票代码排序
df = df.sort_values(['date', 'tic']).reset_index(drop=True)
# 创建透视表,每列是一只股票,每行是一个交易日的收盘价
price_pivot = df.pivot_table(
index='date',
columns='tic',
values='close'
)
# 计算收益率
returns = price_pivot.pct_change().dropna()
# 计算波动率指数
turbulence_index = []
for i in range(lookback, len(returns)):
# 获取回溯期内的收益率数据
lookback_returns = returns.iloc[i-lookback:i]
# 计算均值和协方差矩阵
mean_returns = lookback_returns.mean()
cov_matrix = lookback_returns.cov()
# 计算马氏距离(Mahalanobis distance)
current_return = returns.iloc[i]
diff = current_return - mean_returns
turbulence = diff @ np.linalg.inv(cov_matrix) @ diff.T
turbulence_index.append([returns.index[i], turbulence])
# 转换为 DataFrame
turbulence_df = pd.DataFrame(
turbulence_index,
columns=['date', 'turbulence']
)
return turbulence_df
# 使用示例
turbulence_df = calculate_turbulence_index(df_processed)
df_final = df_processed.merge(turbulence_df, on='date')
波动率指数的本质是马氏距离,衡量当前市场状态与近期正常状态的偏离程度。当指数超过预设阈值(如 100)时,可以认为市场进入极端状态。在环境配置中,可以设置 turbulence_threshold 参数,让智能体在极端市场中采取更保守的策略。
协方差矩阵特征
对于投资组合优化任务,还需要计算资产间的协方差矩阵。这反映了不同股票收益率的相关性,是组合风险计算的基础。
def add_covariance_matrix(df, lookback=252):
"""
添加协方差矩阵作为状态特征
参数:
df: 多股票数据 DataFrame
lookback: 回溯期
返回:
包含协方差矩阵的 DataFrame
"""
# 按日期排序并建立索引
df = df.sort_values(['date', 'tic']).reset_index(drop=True)
df.index = df.date.factorize()[0]
cov_list = []
# 对每个时间点计算协方差矩阵
for i in range(lookback, len(df.index.unique())):
# 获取回溯期数据
data_lookback = df.loc[i-lookback:i, :]
# 创建价格透视表
price_lookback = data_lookback.pivot_table(
index='date',
columns='tic',
values='close'
)
# 计算收益率和协方差
return_lookback = price_lookback.pct_change().dropna()
covs = return_lookback.cov().values
cov_list.append(covs)
# 创建协方差 DataFrame
df_cov = pd.DataFrame({
'date': df.date.unique()[lookback:],
'cov_list': cov_list
})
# 合并到主数据
df = df.merge(df_cov, on='date')
df = df.sort_values(['date', 'tic']).reset_index(drop=True)
return df
# 使用示例
df_with_cov = add_covariance_matrix(df_final)
协方差矩阵被展平成一个列表存储在 cov_list 列中。在环境中,这个矩阵会被重新 reshape,用于计算组合风险。对于 30 只股票的组合,协方差矩阵有 900 个元素,显著增加了状态空间的维度,但也提供了更丰富的风险信息。
数据存储与高效加载
经过清洗和特征工程的数据,通常需要保存到磁盘,避免每次训练都重复计算。对于大规模数据,存储格式和加载方式直接影响训练效率。
存储格式选择
FinRL 推荐使用 Parquet 格式存储处理后的数据。相比 CSV,Parquet 有以下优势:
- 列式存储:只读取需要的列,节省 IO 带宽
- 压缩率高:通常比 CSV 小 70% 以上
- 类型安全:保留数据类型信息,避免加载时类型推断
- 支持分块:可以并行读写,适合大数据集
import pandas as pd
def save_processed_data(df, file_path):
"""
保存处理后的数据到 Parquet 格式
参数:
df: 处理后的 DataFrame
file_path: 保存路径,建议以 .parquet 结尾
"""
# 确保日期列是 datetime 类型
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
# 重置索引,避免多层索引导致保存失败
df_to_save = df.reset_index(drop=True)
# 保存为 Parquet 格式
df_to_save.to_parquet(
file_path,
engine='pyarrow',
compression='snappy',
index=False
)
print(f"数据已保存至 {file_path}")
print(f"文件大小: {pd.io.formats.format.get_size(df_to_save)}")
print(f"数据形状: {df_to_save.shape}")
# 使用示例
save_processed_data(df_with_cov, 'data/dow30_processed.parquet')
保存时,我们显式指定使用 pyarrow 引擎和 snappy 压缩算法。snappy 在压缩率和速度间取得了良好平衡。对于协方差矩阵这类复杂对象,需要先转换为字符串或二进制格式,因为 Parquet 不直接支持嵌套数组。
高效加载策略
加载数据时,可以利用 Parquet 的列式特性,只读取训练需要的特征列。对于超大数据集,还可以使用分块读取。
def load_training_data(file_path, required_columns=None):
"""
高效加载训练数据
参数:
file_path: Parquet 文件路径
required_columns: 需要的列列表,None 表示加载所有列
返回:
DataFrame
"""
if required_columns is None:
# 默认加载所有列
df = pd.read_parquet(file_path, engine='pyarrow')
else:
# 只加载需要的列,节省内存和时间
df = pd.read_parquet(
file_path,
engine='pyarrow',
columns=required_columns
)
# 恢复日期索引
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values(['date', 'tic']).reset_index(drop=True)
print(f"数据加载完成,形状: {df.shape}")
print(f"内存占用: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
return df
# 使用示例:只加载必要特征
required_cols = ['date', 'tic', 'close', 'volume', 'macd', 'rsi_30', 'turbulence']
df_train = load_training_data('data/dow30_processed.parquet', required_cols)
对于多股票数据,建议按日期排序并建立索引。这样在环境 step 时,可以通过日期快速定位到当前时间步的数据,避免每次遍历整个 DataFrame。
内存优化技巧
当处理数千只股票、数年分钟线数据时,内存占用可能成为瓶颈。以下是一些优化技巧:
def optimize_memory_usage(df):
"""
优化 DataFrame 内存占用
参数:
df: 原始 DataFrame
返回:
优化后的 DataFrame
"""
df_optimized = df.copy()
# 遍历每列,选择最优数据类型
for col in df_optimized.columns:
col_type = df_optimized[col].dtype
if col_type != object:
c_min = df_optimized[col].min()
c_max = df_optimized[col].max()
if str(col_type)[:3] == 'int':
# 优化整数类型
if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
df_optimized[col] = df_optimized[col].astype(np.int8)
elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
df_optimized[col] = df_optimized[col].astype(np.int16)
elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
df_optimized[col] = df_optimized[col].astype(np.int32)
else:
# 优化浮点数类型
if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
df_optimized[col] = df_optimized[col].astype(np.float32)
# 对股票代码列使用 category 类型
if 'tic' in df_optimized.columns:
df_optimized['tic'] = df_optimized['tic'].astype('category')
# 对日期列使用 datetime 类型
if 'date' in df_optimized.columns:
df_optimized['date'] = pd.to_datetime(df_optimized['date'])
# 打印内存优化报告
original_mem = df.memory_usage(deep=True).sum() / 1024**2
optimized_mem = df_optimized.memory_usage(deep=True).sum() / 1024**2
print(f"原始内存占用: {original_mem:.2f} MB")
print(f"优化后内存占用: {optimized_mem:.2f} MB")
print(f"内存节省: {(1 - optimized_mem/original_mem)*100:.1f}%")
return df_optimized
# 使用示例
df_optimized = optimize_memory_usage(df_train)
这个函数通过向下转换数据类型显著减少内存占用。例如,将 float64 转换为 float32 可以节省 50% 的内存,而精度损失对训练影响很小。股票代码列使用 category 类型,因为它们是重复出现的字符串。对于包含数百万行的数据集,这种优化可以将内存占用从数 GB 降低到几百 MB。
总结
金融数据流水线是连接市场与智能体的桥梁。FinRL 通过统一的数据处理器、自动化的清洗流程和灵活的特征工程,将繁琐的数据准备工作转化为标准化的配置过程。掌握这些工具,意味着我们可以快速验证不同市场、不同频率、不同特征集的策略想法,将更多时间投入到算法设计和调优中。
数据质量直接决定了策略表现的上限。再精巧的算法也无法从垃圾数据中学到有效的交易模式。因此,在实际项目中,建议花足够时间检查数据完整性、验证指标计算逻辑、观察特征分布。TensorBoard 是可视化这些数据质量检查结果的绝佳工具,下一章我们将看到如何将数据流与训练过程无缝集成。
现在,我们已经准备好了干净、丰富、高效的数据,接下来就要设计交易环境,让智能体在模拟市场中学习和成长。环境设计是强化学习应用中最具创造性的部分,它决定了智能体如何观察市场、如何执行动作、如何获得反馈。让我们进入第 4 章,探索 FinRL 的环境架构与配置技巧。