API 参考
在这里可以找到所有 Qlib
接口。
数据
提供者
- 类 qlib.data.data.ProviderBackendMixin
该辅助类旨在基于存储后端使提供者使用更加方便。如果提供者不依赖后端存储,则无需继承此类。
- 类 qlib.data.data.CalendarProvider
日历提供者的基类
提供日历数据。
- calendar(start_time=None, end_time=None, freq='day', future=False)
获取指定市场在给定时间范围内的日历。
- 参数:
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率,可选值:year/quarter/month/week/day。
future (bool) – 是否包含未来的交易日。
- 返回:
日历列表
- 返回类型:
list
- locate_index(start_time: Timestamp | str, end_time: Timestamp | str, freq: str, future: bool = False)
在特定频率的日历中定位起始时间和结束时间的索引。
- 参数:
start_time (pd.Timestamp) – 时间范围的起始时间。
end_time (pd.Timestamp) – 时间范围的结束时间。
freq (str) – 时间频率,可选值:year/quarter/month/week/day。
future (bool) – 是否包含未来的交易日。
- 返回:
pd.Timestamp – 实际的开始时间。
pd.Timestamp – 实际的结束时间。
int – 开始时间的索引。
int – 结束时间的索引。
- load_calendar(freq, future)
从文件加载原始日历时间戳。
- 参数:
freq (str) – 读取日历文件的频率。
future (bool) – 是否包含未来日期。
- 返回:
时间戳列表
- 返回类型:
list
- 类 qlib.data.data.InstrumentProvider
标的资产数据提供者的基类
提供标的资产数据。
- 静态 标的列表(市场: 列表 | 字符串 = '全部', 过滤管道: 列表 | 无 = 无)
获取一个基础市场的通用配置字典,并添加多个动态过滤器。
- 参数:
market (Union[List, str]) –
- 字符串:
市场/行业/指数简称,例如 all/sse/szse/sse50/csi300/csi500。
- 列表:
[“ID1”, “ID2”],股票代码列表。
filter_pipe (list) – 动态过滤器列表。
- 返回:
dict (if isinstance(market, str)) – 股票池配置字典。
{market => 基础市场名称, filter_pipe => 过滤器列表}
示例:
{'market': 'csi500', 'filter_pipe': [{'filter_type': 'ExpressionDFilter', 'rule_expression': '$open<40', 'filter_start_time': None, 'filter_end_time': None, 'keep': False}, {'filter_type': 'NameDFilter', 'name_rule_re': 'SH[0-9]{4}55', 'filter_start_time': None, 'filter_end_time': None}]}
list (if isinstance(market, list)) – 直接返回原始列表。注意:这将使标的列表兼容更多场景,用户代码也会更简洁。
- 抽象 list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)
根据特定的股票池配置列出相关证券。
- 参数:
instruments (dict) – 股票池配置。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
as_list (bool) – 以列表或字典形式返回证券。
- 返回:
包含时间跨度的证券列表或字典
- 返回类型:
dict 或 list
- 类 qlib.data.data.FeatureProvider
特征提供者类
提供特征数据。
- 抽象 feature(instrument, field, start_time, end_time, freq)
获取特征数据。
- 参数:
instrument (str) – 某个特定的证券。
field (str) – 特征的某个特定字段。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率,可选值:year/quarter/month/week/day。
- 返回:
某个特征的数据
- 返回类型:
pd.Series
- 类 qlib.data.data.PITProvider
- 抽象 period_feature(instrument, field, start_index: int, end_index: int, cur_time: Timestamp, period: int | None = None) Series
获取 start_index 和 end_index 之间的历史周期数据序列
- 参数:
start_index (int) – start_index 是相对于 cur_time 最新周期的相对索引
end_index (int) – end_index 在大多数情况下也是相对于 cur_time 最新周期的索引。通常 start_index 和 end_index 为非正值。例如,start_index == -3,end_index == 0,当前周期索引为 cur_idx,则将获取 [start_index + cur_idx, end_index + cur_idx] 区间内的数据。
period (int) – 用于查询特定周期的数据。在 Qlib 中周期以整数表示(例如,202001 可能表示 2020 年第一季度)。注意:period 将覆盖 start_index 和 end_index
- 返回:
索引将是整数,用于表示数据的周期。典型示例如下:TODO
- 返回类型:
pd.Series
- Raises:
FileNotFoundError – 如果查询的数据不存在,则会抛出此异常。
- 类 qlib.data.data.ExpressionProvider
表达式提供者类
提供表达式数据。
- __init__()
- 抽象 表达式(工具, 字段, start_time=无, end_time=无, freq='day') 序列
获取表达式数据。
expression 的职责是解析 field 并加载相应的数据。在加载数据时,应处理数据的时间依赖性。get_expression_instance 通常在此方法中使用。
- 参数:
instrument (str) – 某个特定的证券。
field (str) – 特征的某个特定字段。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率,可选值:year/quarter/month/week/day。
- 返回:
某个表达式的数据
数据有两种格式
带有时间索引的表达式
带有整数索引的表达式
因为时间戳的表现不如
- 返回类型:
pd.Series
- 类 qlib.data.data.DatasetProvider
数据集提供者类
提供数据集数据。
- 抽象 数据集(instruments, fields, start_time=无, end_time=无, freq='day', inst_processors=[])
获取数据集数据。
- 参数:
instruments (列表 或 字典) – 工具的列表/字典或股票池配置的字典。
fields (列表) – 特征实例的列表。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
freq (字符串) – 时间频率。
inst_processors (Iterable[Union[dict, InstProcessor]]) – 每个合约上执行的操作
- 返回:
一个具有 <合约, 时间戳> 索引的 pandas 数据框。
- 返回类型:
pd.DataFrame
- 静态 get_instruments_d(instruments, freq)
解析不同类型输入的合约,输出 instruments_d。输入合约格式错误将引发异常。
- 静态 get_column_names(fields)
从输入字段中获取列名
- 静态 dataset_processor(instruments_d, column_names, start_time, end_time, freq, inst_processors=[])
加载并处理数据,返回数据集。- 默认使用多进程方法。
- 静态 inst_calculator(inst, start_time, end_time, freq, column_names, spans=None, g_config=None, inst_processors=[])
计算单个投资标的的表达式,返回一个数据框结果。如果该表达式之前已计算过,则从缓存中加载。
返回值:一个索引为‘datetime’且包含其他数据列的数据框。
- 类 qlib.data.data.LocalCalendarProvider(remote=False, backend={})
本地日历数据提供类
从本地数据源提供日历数据。
- __init__(remote=False, backend={})
- load_calendar(freq, future)
从文件加载原始日历时间戳。
- 参数:
freq (str) – 读取日历文件的频率。
future (bool) – 是否包含未来日期。
- 返回:
时间戳列表
- 返回类型:
list
- 类 qlib.data.data.LocalInstrumentProvider(backend={})
本地投资标的数据提供类
从本地数据源提供投资标的数据。
- __init__(backend={}) None
- list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)
根据特定的股票池配置列出相关证券。
- 参数:
instruments (dict) – 股票池配置。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
as_list (bool) – 以列表或字典形式返回证券。
- 返回:
包含时间跨度的证券列表或字典
- 返回类型:
dict 或 list
- 类 qlib.data.data.LocalFeatureProvider(remote=False, backend={})
本地特征数据提供类
从本地数据源提供特征数据。
- __init__(remote=False, backend={})
- feature(instrument, field, start_index, end_index, freq)
获取特征数据。
- 参数:
instrument (str) – 某个特定的证券。
field (str) – 特征的某个特定字段。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率,可选值:year/quarter/month/week/day。
- 返回:
某个特征的数据
- 返回类型:
pd.Series
- 类 qlib.data.data.LocalPITProvider
- period_feature(instrument, field, start_index, end_index, cur_time, period=None)
获取 start_index 和 end_index 之间的历史周期数据序列
- 参数:
start_index (int) – start_index 是相对于 cur_time 最新周期的相对索引
end_index (int) – end_index 在大多数情况下也是相对于 cur_time 最新周期的索引。通常 start_index 和 end_index 为非正值。例如,start_index == -3,end_index == 0,当前周期索引为 cur_idx,则将获取 [start_index + cur_idx, end_index + cur_idx] 区间内的数据。
period (int) – 用于查询特定周期的数据。在 Qlib 中周期以整数表示(例如,202001 可能表示 2020 年第一季度)。注意:period 将覆盖 start_index 和 end_index
- 返回:
索引将是整数,用于表示数据的周期。典型示例如下:TODO
- 返回类型:
pd.Series
- Raises:
FileNotFoundError – 如果查询的数据不存在,则会抛出此异常。
- 类 qlib.data.data.LocalExpressionProvider(time2idx=True)
本地表达式数据提供类
从本地数据源提供表达式数据。
- __init__(time2idx=True)
- 表达式(仪器, 字段, start_time=None, end_time=None, freq='day')
获取表达式数据。
expression 的职责是解析 field 并加载相应的数据。在加载数据时,应处理数据的时间依赖性。get_expression_instance 通常在此方法中使用。
- 参数:
instrument (str) – 某个特定的证券。
field (str) – 特征的某个特定字段。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率,可选值:year/quarter/month/week/day。
- 返回:
某个表达式的数据
数据有两种格式
带有时间索引的表达式
带有整数索引的表达式
因为时间戳的表现不如
- 返回类型:
pd.Series
- 类 qlib.data.data.LocalDatasetProvider(对齐时间: 布尔值 = True)
本地数据集数据提供者类
从本地数据源提供数据集数据。
- __init__(对齐时间: 布尔值 = True)
- 参数:
align_time (bool) –
我们是否将时间对齐到日历?某些数据集的频率是灵活的,无法对齐。对于具有固定频率且共享日历的数据,将数据对齐到日历将带来以下好处
将查询对齐到相同的参数,以便可以共享缓存。
- 数据集(instruments, fields, start_time=None, end_time=None, freq='day', inst_processors=[])
获取数据集数据。
- 参数:
instruments (列表 或 字典) – 工具的列表/字典或股票池配置的字典。
fields (列表) – 特征实例的列表。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
freq (字符串) – 时间频率。
inst_processors (Iterable[Union[dict, InstProcessor]]) – 每个合约上执行的操作
- 返回:
一个具有 <合约, 时间戳> 索引的 pandas 数据框。
- 返回类型:
pd.DataFrame
- 静态 multi_cache_walker(instruments, fields, start_time=None, end_time=None, freq='day')
该方法用于为客户端准备表达式缓存,之后客户端将自行从表达式缓存中加载数据。
- 静态 cache_walker(inst, start_time, end_time, freq, column_names)
如果某个合约的表达式尚未计算,则进行计算并写入表达式缓存。
- 类 qlib.data.data.ClientCalendarProvider
客户端日历数据提供类
作为客户端通过向服务器请求来提供日历数据。
- __init__()
- calendar(start_time=None, end_time=None, freq='day', future=False)
获取指定市场在给定时间范围内的日历。
- 参数:
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率,可选值:year/quarter/month/week/day。
future (bool) – 是否包含未来的交易日。
- 返回:
日历列表
- 返回类型:
list
- 类 qlib.data.data.ClientInstrumentProvider
客户端合约数据提供类
作为客户端通过向服务器请求来提供合约数据。
- __init__()
- list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)
根据特定的股票池配置列出相关证券。
- 参数:
instruments (dict) – 股票池配置。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
as_list (bool) – 以列表或字典形式返回证券。
- 返回:
包含时间跨度的证券列表或字典
- 返回类型:
dict 或 list
- 类 qlib.data.data.ClientDatasetProvider
客户端数据集数据提供类
作为客户端通过向服务器请求来提供数据集数据。
- __init__()
- 数据集(instrument, 字段, 开始时间=无, 结束时间=无, 频率='日', 磁盘缓存=0, 返回 URI=否, instrument 处理器=[])
获取数据集数据。
- 参数:
instruments (列表 或 字典) – 工具的列表/字典或股票池配置的字典。
fields (列表) – 特征实例的列表。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
freq (字符串) – 时间频率。
inst_processors (Iterable[Union[dict, InstProcessor]]) – 每个合约上执行的操作
- 返回:
一个具有 <合约, 时间戳> 索引的 pandas 数据框。
- 返回类型:
pd.DataFrame
- 类 qlib.data.data.BaseProvider
本地数据提供类,它是一组允许用户访问数据的接口。由于 PITD 不对外公开给用户,因此未包含在接口中。
为了与旧版 qlib 数据提供器保持兼容。
- 特征(instrument, 字段, 开始时间=无, 结束时间=无, 频率='日', 磁盘缓存=无, instrument 处理器=[])
- 参数:
磁盘缓存 (整数) – 是否跳过(0)/使用(1)/替换(2)磁盘缓存
该函数将尝试使用具有关键字 disk_cache 的缓存方法,如果由于 DatasetD 实例是提供者类而引发类型错误,则会改用提供者方法。
- 类 qlib.data.data.LocalProvider
- features_uri(instruments, fields, start_time, end_time, freq, disk_cache=1)
返回生成的特征/数据集缓存的 URI
- 参数:
disk_cache —
instruments —
fields —
start_time —
end_time —
freq —
- 类 qlib.data.data.ClientProvider
客户端提供者
作为客户端从服务器请求数据。可以发起以下请求:
日历:直接返回一个日历列表
标的(无过滤器):直接返回一个标的列表/字典
标的(带过滤器):返回一个标的列表/字典
特征:返回一个缓存 URI
整体工作流程如下所述:当用户使用客户端提供者发起请求时,客户端提供者将连接服务器并发送请求,随后开始等待响应。服务器会立即返回响应,指示缓存是否可用。只有当客户端收到 feature_available 为 true 的响应时,等待过程才会终止。BUG:每次请求特定数据时,都需要连接服务器、等待响应然后断开连接,无法在单次连接中连续发起多个请求。有关 python-socketIO 客户端的文档,请参考 https://python-socketio.readthedocs.io/en/latest/client.html。
- __init__()
- qlib.data.data.CalendarProviderWrapper
是
CalendarProvider
的别名
- qlib.data.data.InstrumentProviderWrapper
是
InstrumentProvider
的别名
- qlib.data.data.FeatureProviderWrapper
是
FeatureProvider
的别名
- qlib.data.data.PITProviderWrapper
是
PITProvider
的别名
- qlib.data.data.ExpressionProviderWrapper
是
ExpressionProvider
的别名
- qlib.data.data.DatasetProviderWrapper
是
DatasetProvider
的别名
- qlib.data.data.BaseProviderWrapper
是
BaseProvider
的别名
- qlib.data.data.register_all_wrappers(C)
过滤器
- 类 qlib.data.filter.BaseDFilter
动态标的过滤器抽象类
用户可重写此类以构建自己的过滤器
重写 __init__ 方法以输入过滤规则
重写 filter_main 方法,使用规则过滤标的
- __init__()
- 静态 from_config(config)
根据配置字典构造实例。
- 参数:
config (dict) – 配置参数的字典。
- 抽象 to_config()
根据配置字典构造实例。
- 返回:
返回配置参数的字典。
- 返回类型:
dict
- 类 qlib.data.filter.SeriesDFilter(fstart_time=None, fend_time=None, keep=False)
动态标的过滤器抽象类,用于过滤特定特征的时间序列数据
过滤器应提供以下参数:
过滤开始时间
过滤结束时间
过滤规则
重写 __init__ 方法以指定特定规则来过滤时间序列
重写 _getFilterSeries 方法,使用规则过滤时间序列并获取 {inst => series} 形式的字典,或重写 filter_main 方法以实现更高级的序列过滤规则
- __init__(fstart_time=None, fend_time=None, keep=False)
- 过滤器基类的初始化函数。
在 fstart_time 和 fend_time 指定的时间段内,根据特定规则过滤一组标的。
- 参数:
fstart_time (str) – 过滤规则开始作用于标的时间点。
fend_time (str) – 过滤规则停止作用于标的时间点。
keep (bool) – 当标的的特征在过滤时间段内不存在时,是否保留该标的。
- filter_main(instruments, start_time=None, end_time=None)
实现该方法以过滤标的物。
- 参数:
instruments (dict) – 要过滤的输入标的物。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
- 返回:
过滤后的标的物,结构与输入的 instruments 相同。
- 返回类型:
dict
- class qlib.data.filter.NameDFilter(name_rule_re, fstart_time=None, fend_time=None)
名称动态标的物过滤器
根据规定的名称格式过滤标的物。
需要提供一个名称规则的正则表达式。
- __init__(name_rule_re, fstart_time=None, fend_time=None)
名称过滤器类的初始化函数
- 参数:
name_rule_re (str) – 名称规则的正则表达式。
- 静态 from_config(config)
根据配置字典构造实例。
- 参数:
config (dict) – 配置参数的字典。
- to_config()
根据配置字典构造实例。
- 返回:
返回配置参数的字典。
- 返回类型:
dict
- 类 qlib.data.filter.ExpressionDFilter(rule_expression, fstart_time=None, fend_time=None, keep=False)
表达式动态标的过滤器
根据特定表达式对标的进行过滤。
一个表达式规则,用于指示需要某个特征字段。
示例
基础特征过滤 :rule_expression = ‘$close/$open>5’
横截面特征过滤 :rule_expression = ‘$rank($close)<10’
时间序列特征过滤 :rule_expression = ‘$Ref($close, 3)>100’
- __init__(rule_expression, fstart_time=None, fend_time=None, keep=False)
表达式过滤器类的初始化函数
- 参数:
fstart_time (str) – 从此时间开始过滤特征。
fend_time (str) – 过滤特征到此时间为止。
rule_expression (str) – 规则的输入表达式。
- 静态 from_config(config)
根据配置字典构造实例。
- 参数:
config (dict) – 配置参数的字典。
- to_config()
根据配置字典构造实例。
- 返回:
返回配置参数的字典。
- 返回类型:
dict
类
- 类 qlib.data.base.表达式
表达式基类
Expression 类用于处理以下格式的数据计算:每个标的具有两个维度的数据,
特征
时间:可以是观测时间或期间时间。
期间时间是为“时点数据”数据库设计的。例如,期间时间可能是 2014Q4,其数值可以在多个时间点被观测到(由于数据修正,在不同时间可能观测到不同的值)。
- load(instrument, start_index, end_index, *args)
load feature 此函数负责根据表达式引擎加载特征/表达式。
具体实现将分为两个部分:
缓存数据,处理错误。
这部分被所有表达式共享,并在 Expression 中实现。
根据具体表达式处理和计算数据。
这部分在每个表达式中各不相同,并在各个表达式中分别实现。
表达式引擎被不同的数据共享。不同的数据在 args 中会有不同的额外信息。
- 参数:
instrument (str) – 证券代码。
start_index (str) – 特征起始索引 [以日历表示]。
end_index (str) – 特征结束索引 [以日历表示]。
information (*args 可能包含以下内容) –
data (2) 若在 PIT 中使用) –
- freq: str
特征频率。
arguments (包含以下内容) –
- freq: str
特征频率。
data –
- cur_pit:
该参数专为点-in-time(PIT)数据设计。
- period: int
用于查询特定周期。在 Qlib 中周期以整数表示。(例如,202001 可能表示 2020 年第一季度)
arguments –
- cur_pit:
该参数专为点-in-time(PIT)数据设计。
- period: int
用于查询特定周期。在 Qlib 中周期以整数表示。(例如,202001 可能表示 2020 年第一季度)
- 返回:
特征序列:序列的索引为日历索引
- 返回类型:
pd.Series
- 抽象 get_longest_back_rolling()
获取特征访问历史数据的最长长度
该功能旨在首先确定在特定范围内计算特征所需的数据范围。然而,像 Ref(Ref($close, -1), 1) 这样的情况无法正确处理。
因此,此方法仅用于检测所需历史数据的长度。
- 抽象 get_extended_window_size()
get_extend_window_size
为了在范围 [start_index, end_index] 内计算此操作符,我们必须获取 叶特征 在范围 [start_index - lft_etd, end_index + rght_etd] 内的数据。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- 类 qlib.data.base.特征(名称=None)
静态表达式
这类特征将从提供者处加载数据
- __init__(名称=None)
- get_longest_back_rolling()
获取特征访问历史数据的最长长度
该功能旨在首先确定在特定范围内计算特征所需的数据范围。然而,像 Ref(Ref($close, -1), 1) 这样的情况无法正确处理。
因此,此方法仅用于检测所需历史数据的长度。
- get_extended_window_size()
get_extend_window_size
为了在范围 [start_index, end_index] 内计算此操作符,我们必须获取 叶特征 在范围 [start_index - lft_etd, end_index + rght_etd] 内的数据。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- 类 qlib.data.base.PFeature(名称=None)
- 类 qlib.data.base.ExpressionOps
操作符表达式
这类特征将在运行时使用操作符进行特征构建。
操作符
- 类 qlib.data.ops.ElemOperator(特征)
逐元素操作符
- 参数:
feature (Expression) – 特征实例
- 返回:
特征操作输出
- 返回类型:
- __init__(特征)
- get_longest_back_rolling()
获取特征访问历史数据的最长长度
该功能旨在首先确定在特定范围内计算特征所需的数据范围。然而,像 Ref(Ref($close, -1), 1) 这样的情况无法正确处理。
因此,此方法仅用于检测所需历史数据的长度。
- get_extended_window_size()
get_extend_window_size
为了在范围 [start_index, end_index] 内计算此操作符,我们必须获取 叶特征 在范围 [start_index - lft_etd, end_index + rght_etd] 内的数据。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- 类 qlib.data.ops.ChangeInstrument(instrument, feature)
更换标的物操作符。在某些情况下,用户可能希望在计算时切换到另一个标的物,例如计算某只股票相对于市场指数的贝塔值。这需要将特征的计算从股票(原始标的)更改为指数(参考标的)。:param instrument: 例如 SH000300(沪深 300 指数),或 ^GPSC(标普 500 指数)。:type instrument: 新的标的物,后续操作将在其上执行。:param feature: :type feature: 需要在新标的物上计算的特征。
- 返回:
特征操作输出
- 返回类型:
- __init__(instrument, feature)
- load(instrument, start_index, end_index, *args)
load feature 此函数负责根据表达式引擎加载特征/表达式。
具体实现将分为两个部分:
缓存数据,处理错误。
这部分被所有表达式共享,并在 Expression 中实现。
根据具体表达式处理和计算数据。
这部分在每个表达式中各不相同,并在各个表达式中分别实现。
表达式引擎被不同的数据共享。不同的数据在 args 中会有不同的额外信息。
- 参数:
instrument (str) – 证券代码。
start_index (str) – 特征起始索引 [以日历表示]。
end_index (str) – 特征结束索引 [以日历表示]。
information (*args 可能包含以下内容) –
data (2) 若在 PIT 中使用) –
- freq: str
特征频率。
arguments (包含以下内容) –
- freq: str
特征频率。
data –
- cur_pit:
该参数专为点-in-time(PIT)数据设计。
- period: int
用于查询特定周期。在 Qlib 中周期以整数表示。(例如,202001 可能表示 2020 年第一季度)
arguments –
- cur_pit:
该参数专为点-in-time(PIT)数据设计。
- period: int
用于查询特定周期。在 Qlib 中周期以整数表示。(例如,202001 可能表示 2020 年第一季度)
- 返回:
特征序列:序列的索引为日历索引
- 返回类型:
pd.Series
- 类 qlib.data.ops.NpElemOperator(feature, func)
Numpy 逐元素操作符
- 参数:
feature (Expression) – 特征实例
func (str) – numpy 特征操作方法
- 返回:
特征操作输出
- 返回类型:
- __init__(feature, func)
- 类 qlib.data.ops.Abs(特征)
特征绝对值
- 参数:
feature (Expression) – 特征实例
- 返回:
一个输出为绝对值的特征实例
- 返回类型:
- __init__(特征)
- 类 qlib.data.ops.Sign(特征)
特征符号
- 参数:
feature (Expression) – 特征实例
- 返回:
一个输出符号的特征实例
- 返回类型:
- __init__(特征)
- 类 qlib.data.ops.Log(特征)
特征对数
- 参数:
feature (Expression) – 特征实例
- 返回:
一个输出对数的特征实例
- 返回类型:
- __init__(特征)
- 类 qlib.data.ops.Mask(feature, instrument)
特征掩码
- 参数:
feature (Expression) – 特征实例
instrument (str) – 标的物掩码
- 返回:
带有掩码标的物的特征实例
- 返回类型:
- __init__(feature, instrument)
- 类 qlib.data.ops.Not(特征)
非运算符
- 参数:
feature (Expression) – 特征实例
- 返回:
特征元素级取反输出
- 返回类型:
- __init__(特征)
- 类 qlib.data.ops.PairOperator(feature_left, feature_right)
成对操作符
- 参数:
feature_left (Expression) – 特征实例或数值
feature_right (Expression) – 特征实例或数值
- 返回:
两个特征的操作结果输出
- 返回类型:
- __init__(feature_left, feature_right)
- get_longest_back_rolling()
获取特征访问历史数据的最长长度
该功能旨在首先确定在特定范围内计算特征所需的数据范围。然而,像 Ref(Ref($close, -1), 1) 这样的情况无法正确处理。
因此,此方法仅用于检测所需历史数据的长度。
- get_extended_window_size()
get_extend_window_size
为了在范围 [start_index, end_index] 内计算此操作符,我们必须获取 叶特征 在范围 [start_index - lft_etd, end_index + rght_etd] 内的数据。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- 类 qlib.data.ops.NpPairOperator(feature_left, feature_right, func)
NumPy 成对操作符
- 参数:
feature_left (Expression) – 特征实例或数值
feature_right (Expression) – 特征实例或数值
func (str) – 操作函数
- 返回:
两个特征的操作结果输出
- 返回类型:
- __init__(feature_left, feature_right, func)
- 类 qlib.data.ops.Power(feature_left, feature_right)
幂运算符
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
将 feature_left 中的底数提升为 feature_right 中的指数次幂
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.Add(feature_left, feature_right)
加法运算符
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
两个特征的和
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.Sub(feature_left, feature_right)
减法运算符
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
两个特征的差
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.Mul(feature_left, feature_right)
乘法运算符
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
两个特征的积
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.Div(feature_left, feature_right)
除法运算符
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
两个特征的商
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.Greater(feature_left, feature_right)
Greater 算子
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
从输入的两个特征中选取较大的元素
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.Less(feature_left, feature_right)
Less 算子
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
从输入的两个特征中选取较小的元素
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.Gt(feature_left, feature_right)
大于算子
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
布尔序列,表示 左侧 > 右侧
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.Ge(feature_left, feature_right)
大于等于算子
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
布尔序列,表示 左侧 >= 右侧
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.Lt(feature_left, feature_right)
小于算子
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
布尔序列,表示 左侧 < 右侧
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.Le(feature_left, feature_right)
小于等于运算符
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
布尔序列,表示 left <= right
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.Eq(feature_left, feature_right)
等于运算符
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
布尔序列,表示 left == right
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.Ne(feature_left, feature_right)
不等于运算符
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
布尔序列,表示 left != right
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.And(feature_left, feature_right)
与运算符
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
两个特征逐行进行 & 操作的输出
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.Or(feature_left, feature_right)
或运算符
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- 返回:
两个特征逐行进行 | 操作的输出
- 返回类型:
- __init__(feature_left, feature_right)
- 类 qlib.data.ops.If(condition, feature_left, feature_right)
If 操作符
- 参数:
condition (Expression) – 布尔值特征实例,用作条件判断
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- __init__(condition, feature_left, feature_right)
- get_longest_back_rolling()
获取特征访问历史数据的最长长度
该功能旨在首先确定在特定范围内计算特征所需的数据范围。然而,像 Ref(Ref($close, -1), 1) 这样的情况无法正确处理。
因此,此方法仅用于检测所需历史数据的长度。
- get_extended_window_size()
get_extend_window_size
为了在范围 [start_index, end_index] 内计算此操作符,我们必须获取 叶特征 在范围 [start_index - lft_etd, end_index + rght_etd] 内的数据。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- 类 qlib.data.ops.Rolling(feature, N, func)
滚动操作符。滚动(rolling)和扩展(expanding)的含义与 pandas 中一致。当窗口大小设为 0 时,操作符的行为应遵循 expanding;否则,遵循 rolling。
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
func (str) – 滚动计算方法
- 返回:
滚动输出
- 返回类型:
- __init__(feature, N, func)
- get_longest_back_rolling()
获取特征访问历史数据的最长长度
该功能旨在首先确定在特定范围内计算特征所需的数据范围。然而,像 Ref(Ref($close, -1), 1) 这样的情况无法正确处理。
因此,此方法仅用于检测所需历史数据的长度。
- get_extended_window_size()
get_extend_window_size
为了在范围 [start_index, end_index] 内计算此操作符,我们必须获取 叶特征 在范围 [start_index - lft_etd, end_index + rght_etd] 内的数据。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- 类 qlib.data.ops.Ref(feature, N)
特征引用
- 参数:
feature (Expression) – 特征实例
N (int) – N = 0 表示获取最早的数据;N > 0 表示获取 N 个周期前的数据;N < 0 表示获取未来数据
- 返回:
一个包含目标引用的特征实例
- 返回类型:
- __init__(feature, N)
- get_longest_back_rolling()
获取特征访问历史数据的最长长度
该功能旨在首先确定在特定范围内计算特征所需的数据范围。然而,像 Ref(Ref($close, -1), 1) 这样的情况无法正确处理。
因此,此方法仅用于检测所需历史数据的长度。
- get_extended_window_size()
get_extend_window_size
为了在范围 [start_index, end_index] 内计算此操作符,我们必须获取 叶特征 在范围 [start_index - lft_etd, end_index + rght_etd] 内的数据。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- 类 qlib.data.ops.Mean(feature, N)
滚动均值 (MA)
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
一个包含滚动平均值的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Sum(feature, N)
滚动求和
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
一个包含滚动求和的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Std(feature, N)
滚动标准差
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
一个包含滚动标准差的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Var(feature, N)
滚动方差
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
一个包含滚动方差的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Skew(feature, N)
滚动偏度
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
一个包含滚动偏度的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Kurt(feature, N)
滚动峰度
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
带有滚动峰度的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Max(feature, N)
滚动最大值
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
带有滚动最大值的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.IdxMax(feature, N)
滚动最大值索引
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
带有滚动最大值索引的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Min(feature, N)
滚动最小值
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
带有滚动最小值的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.IdxMin(feature, N)
滚动最小值索引
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
带有滚动最小值索引的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Quantile(feature, N, qscore)
滚动分位数
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
具有滚动分位数的特征实例
- 返回类型:
- __init__(feature, N, qscore)
- 类 qlib.data.ops.Med(feature, N)
滚动中位数
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
具有滚动中位数的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Mad(feature, N)
滚动平均绝对偏差
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
具有滚动平均绝对偏差的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Rank(feature, N)
滚动排名(百分位)
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
具有滚动排名的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Count(feature, N)
滚动计数
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
一个具有滚动计数非 NaN 元素数量的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Delta(feature, N)
滚动 Delta
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
一个在滚动窗口中以结束值减去起始值的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Slope(feature, N)
滚动斜率 该算子计算 idx 和 feature 之间的斜率。(例如:[<feature_t1>, <feature_t2>, <feature_t3>] 与 [1, 2, 3])
使用示例:- “Slope($close, %d)/$close”
# TODO: # 某些用户可能希望成对滚动,如 Slope(A, B, N)
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
一个具有给定窗口线性回归斜率的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Rsquare(feature, N)
滚动 R 值平方
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
一个具有给定窗口线性回归 r 值平方的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.Resi(feature, N)
滚动回归残差
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
一个具有给定窗口回归残差的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.WMA(feature, N)
滚动 WMA
- 参数:
feature (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
一个具有加权移动平均输出的特征实例
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.EMA(feature, N)
滚动指数均值(EMA)
- 参数:
feature (Expression) – 特征实例
N (int, float) – 滚动窗口大小
- 返回:
一个特征实例,表示给定窗口内的回归 r 值平方
- 返回类型:
- __init__(feature, N)
- 类 qlib.data.ops.PairRolling(feature_left, feature_right, N, func)
成对滚动操作符
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
一个特征实例,表示两个输入特征的滚动输出
- 返回类型:
- __init__(feature_left, feature_right, N, func)
- get_longest_back_rolling()
获取特征访问历史数据的最长长度
该功能旨在首先确定在特定范围内计算特征所需的数据范围。然而,像 Ref(Ref($close, -1), 1) 这样的情况无法正确处理。
因此,此方法仅用于检测所需历史数据的长度。
- get_extended_window_size()
get_extend_window_size
为了在范围 [start_index, end_index] 内计算此操作符,我们必须获取 叶特征 在范围 [start_index - lft_etd, end_index + rght_etd] 内的数据。
- 返回:
lft_etd, rght_etd
- 返回类型:
(int, int)
- 类 qlib.data.ops.Corr(feature_left, feature_right, N)
滚动相关性
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
一个特征实例,表示两个输入特征的滚动相关性
- 返回类型:
- __init__(feature_left, feature_right, N)
- 类 qlib.data.ops.Cov(feature_left, feature_right, N)
滚动协方差
- 参数:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
N (int) – 滚动窗口大小
- 返回:
一个特征实例,表示两个输入特征的滚动最大值
- 返回类型:
- __init__(feature_left, feature_right, N)
- 类 qlib.data.ops.TResample(feature, freq, func)
- __init__(feature, freq, func)
将数据重采样到目标频率。使用 pandas 的 resample 方法。
重采样后的时间戳将位于时间区间的起始位置。
- 参数:
feature (Expression) – 用于计算特征的表达式
freq (str) – 将传递给 resample 方法,用于基于指定频率进行重采样
func (method) – 获取重采样值的方法。某些表达式被高频使用
- 类 qlib.data.ops.操作封装器
操作符包装器
- __init__()
- 注册(ops_list: 列表[类型[ExpressionOps] | dict])
注册操作符
- 参数:
ops_list (List[Union[Type[ExpressionOps], dict]]) –
如果 ops_list 的类型是 List[Type[ExpressionOps]],则 ops_list 中的每个元素代表一个操作符类,且该类应为 ExpressionOps 的子类。
如果 ops_list 的类型是 List[dict],则 ops_list 中的每个元素代表一个操作符的配置,其格式如下:
{ "class": class_name, "module_path": path, }
注意:class 应为操作符的类名,module_path 应为 Python 模块或文件路径。
- qlib.data.ops.注册所有操作(C)
注册所有操作符
缓存
- 类 qlib.data.cache.MemCache(mem_cache_size_limit=None, limit_type='length')
内存缓存。
- __init__(mem_cache_size_limit=None, limit_type='length')
- 参数:
mem_cache_size_limit – 缓存最大大小。
limit_type – length 或 sizeof;length(调用函数:len),size(调用函数:sys.getsizeof)。
- 类 qlib.data.cache.表达式缓存(提供者)
表达式缓存机制基类。
该类用于使用自定义的表达式缓存机制包装表达式提供者。
注意
重写 _uri 和 _expression 方法以创建您自己的表达式缓存机制。
- expression(instrument, field, start_time, end_time, freq)
获取表达式数据。
注意
与表达式提供者中的 expression 方法接口相同。
- update(cache_uri: str | Path, freq: str = 'day')
将表达式缓存更新至最新的日历。
重写此方法以定义如何根据用户自定义的缓存机制更新表达式缓存。
- 参数:
cache_uri (str 或 Path) – 表达式缓存文件的完整 URI(包含目录路径)。
freq (str) –
- 返回:
0(成功更新)/ 1(无需更新)/ 2(更新失败)。
- 返回类型:
int
- 类 qlib.data.cache.数据集缓存(提供者)
数据集缓存机制基类。
该类用于包装数据集提供者,并支持用户自定义的数据集缓存机制。
注意
重写 _uri 和 _dataset 方法,以创建您自己的数据集缓存机制。
- dataset(instruments, fields, start_time=None, end_time=None, freq='day', disk_cache=1, inst_processors=[])
获取特征数据集。
注意
与数据集提供者中的 dataset 方法接口相同。
注意
服务器使用 redis_lock 来确保不会触发读写冲突,但不考虑客户端读取者的情况。
- update(cache_uri: str | Path, freq: str = 'day')
将数据集缓存更新到最新的日历。
重写此方法以定义如何根据用户自己的缓存机制更新数据集缓存。
- 参数:
cache_uri (str 或 Path) – 数据集缓存文件的完整 URI(包含目录路径)。
freq (str) –
- 返回:
0(成功更新)/ 1(无需更新)/ 2(更新失败)
- 返回类型:
int
- static cache_to_origin_data(data, fields)
将缓存数据转换为原始数据
- 参数:
data – pd.DataFrame,缓存的数据。
fields – 特征字段。
- 返回:
pd.DataFrame。
- static normalize_uri_args(instruments, fields, freq)
标准化 URI 参数
- 类 qlib.data.cache.DiskExpressionCache(provider, **kwargs)
为服务器准备的缓存机制。
- __init__(provider, **kwargs)
- gen_expression_cache(expression_data, cache_path, instrument, field, freq, last_update)
使用二进制文件保存,类似于特征数据。
- update(sid, cache_uri, freq: str = 'day')
将表达式缓存更新至最新的日历。
重写此方法以定义如何根据用户自定义的缓存机制更新表达式缓存。
- 参数:
cache_uri (str 或 Path) – 表达式缓存文件的完整 URI(包含目录路径)。
freq (str) –
- 返回:
0(成功更新)/ 1(无需更新)/ 2(更新失败)。
- 返回类型:
int
- 类 qlib.data.cache.DiskDatasetCache(provider, **kwargs)
为服务器准备的缓存机制。
- __init__(provider, **kwargs)
- 类方法 read_data_from_cache(cache_path: str | Path, start_time, end_time, fields)
从缓存读取数据
该函数可以从磁盘缓存数据集中读取数据
- 参数:
cache_path –
start_time —
end_time —
fields – 数据集缓存中的字段顺序是排序过的,因此需要重新排列列以保持一致。
- 返回:
- 类 IndexManager(cache_path: str | Path)
此类未考虑锁机制,请在外部代码中处理锁。此类是磁盘数据的代理。
- __init__(cache_path: str | Path)
- gen_dataset_cache(cache_path: str | Path, instruments, fields, freq, inst_processors=[])
注意
此函数不考虑缓存的读写锁,请在函数外部获取锁。
缓存格式包含三个部分(以下为典型文件名)。
索引:cache/d41366901e25de3ec47297f12e2ba11d.index
文件内容可能采用以下格式(pandas.Series)
start end 1999-11-10 00:00:00 0 1 1999-11-11 00:00:00 1 2 1999-11-12 00:00:00 2 3 ...
注意
起始时间闭合,结束时间开放!!!!!
每行包含两个元素 <start_index, end_index>,并以时间戳作为其索引。
它表示对应于时间戳的数据的起始索引(包含)和结束索引(不包含)
元数据:cache/d41366901e25de3ec47297f12e2ba11d.meta
数据:cache/d41366901e25de3ec47297f12e2ba11d
这是一个按时间排序的 HDF 文件
- 参数:
cache_path – 存储缓存的路径。
instruments – 要存储缓存的标的。
fields – 用于存储缓存的字段。
freq – 用于存储缓存的频率。
inst_processors – 仪器处理器。
:return type pd.DataFrame; 返回的 DataFrame 的字段与函数参数保持一致。
- update(cache_uri, freq: str = 'day')
将数据集缓存更新到最新的日历。
重写此方法以定义如何根据用户自己的缓存机制更新数据集缓存。
- 参数:
cache_uri (str 或 Path) – 数据集缓存文件的完整 URI(包含目录路径)。
freq (str) –
- 返回:
0(成功更新)/ 1(无需更新)/ 2(更新失败)
- 返回类型:
int
存储
- 类 qlib.data.storage.storage.基础存储
- class qlib.data.storage.storage.CalendarStorage(freq: str, future: bool, **kwargs)
CalendarStorage 的方法与其同名 List 方法的行为保持一致。
- __init__(freq: str, future: bool, **kwargs)
- 属性 数据: 可迭代对象[str]
获取所有数据
- Raises:
ValueError – 如果数据(存储)不存在,则抛出 ValueError
- 索引(值: str) int
- Raises:
ValueError – 如果数据(存储)不存在,则抛出 ValueError
- 类 qlib.data.storage.storage.InstrumentStorage(market: str, freq: str, **kwargs)
- __init__(market: str, freq: str, **kwargs)
- 属性 数据: 字典[str, 列表[元组[str, str]]]
获取所有数据
- Raises:
ValueError – 如果数据(存储)不存在,则抛出 ValueError
- update([E, ]**F) 无。 从映射/可迭代对象 E 和 F 更新 D。
备注
如果 E 存在且具有 .keys() 方法,则执行:for k in E: D[k] = E[k]
如果 E 存在但没有 .keys() 方法,则执行:for (k, v) in E: D[k] = v
无论哪种情况,之后都会执行:for k, v in F.items(): D[k] = v
- 类 qlib.data.storage.storage.FeatureStorage(instrument: str, field: str, freq: str, **kwargs)
- __init__(instrument: str, field: str, freq: str, **kwargs)
- 属性 数据: 序列
获取所有数据
备注
如果数据(storage)不存在,则返回空的 pd.Series:return pd.Series(dtype=np.float32)
- 属性 start_index: int | None
获取 FeatureStorage 的起始索引
备注
如果数据(storage)不存在,则返回 None
- 属性 end_index: int | None
获取 FeatureStorage 的结束索引
备注
数据范围的右端索引(两端均为闭区间)
下一个数据追加位置将是 end_index + 1
如果数据(storage)不存在,则返回 None
- write(data_array: List | ndarray | Tuple, index: int | None = None)
将 data_array 写入 FeatureStorage,从 index 开始。
备注
如果 index 为 None,则将 data_array 追加到 feature 中。
如果 len(data_array) == 0,则直接返回。
如果 (index - self.end_index) >= 1,则 self[end_index+1: index] 将被 np.nan 填充。
示例
feature: 3 4 4 5 5 6 >>> self.write([6, 7], index=6) feature: 3 4 4 5 5 6 6 6 7 7 >>> self.write([8], index=9) feature: 3 4 4 5 5 6 6 6 7 7 8 np.nan 9 8 >>> self.write([1, np.nan], index=3) feature: 3 1 4 np.nan 5 6 6 6 7 7 8 np.nan 9 8
- rebase(start_index: int | None = None, end_index: int | None = None)
重新设置 FeatureStorage 的 start_index 和 end_index。
start_index 和 end_index 是闭区间:[start_index, end_index]
示例
feature: 3 4 4 5 5 6 >>> self.rebase(start_index=4) feature: 4 5 5 6 >>> self.rebase(start_index=3) feature: 3 np.nan 4 5 5 6 >>> self.write([3], index=3) feature: 3 3 4 5 5 6 >>> self.rebase(end_index=4) feature: 3 3 4 5 >>> self.write([6, 7, 8], index=4) feature: 3 3 4 6 5 7 6 8 >>> self.rebase(start_index=4, end_index=5) feature: 4 6 5 7
- 重写(数据: 列表 | ndarray | 元组, 索引: 整数)
使用提供的数据覆盖 FeatureStorage 中的所有数据
- 参数:
数据 (Union[列表, np.ndarray, 元组]) – 数据
索引 (整数) – 数据起始索引
- 类 qlib.data.storage.file_storage.文件存储混入
FileStorageMixin,适用于 FileXXXStorage 子类,子类需具备 provider_uri、freq、storage_name、file_name 属性
- 检查()
检查 self.uri
- Raises:
ValueError –
- 类 qlib.data.storage.file_storage.FileCalendarStorage(freq: 字符串, future: 布尔值, provider_uri: 字典 | 无 = 无, **kwargs)
- __init__(freq: str, future: bool, provider_uri: dict | None = None, **kwargs)
- 属性 数据: 列表[str]
获取所有数据
- Raises:
ValueError – 如果数据(存储)不存在,则抛出 ValueError
- 索引(值: str) int
- Raises:
ValueError – 如果数据(存储)不存在,则抛出 ValueError
- 类 qlib.data.storage.file_storage.FileInstrumentStorage(market: str, freq: str, provider_uri: dict | None = None, **kwargs)
- __init__(市场: str, 频率: str, 提供者 URI: dict | None = None, **kwargs)
- 属性 数据: 字典[str, 列表[元组[str, str]]]
获取所有数据
- Raises:
ValueError – 如果数据(存储)不存在,则抛出 ValueError
- update([E, ]**F) 无。 从映射/可迭代对象 E 和 F 更新 D。
备注
如果 E 存在且具有 .keys() 方法,则执行:for k in E: D[k] = E[k]
如果 E 存在但没有 .keys() 方法,则执行:for (k, v) in E: D[k] = v
无论哪种情况,之后都会执行:for k, v in F.items(): D[k] = v
- 类 qlib.data.storage.file_storage.FileFeatureStorage(标的: str, 字段: str, 频率: str, 提供者 URI: dict | None = None, **kwargs)
- __init__(instrument: str, field: str, freq: str, provider_uri: dict | None = None, **kwargs)
- 属性 数据: 序列
获取所有数据
备注
如果数据(storage)不存在,则返回空的 pd.Series:return pd.Series(dtype=np.float32)
- write(data_array: List | ndarray, index: int | None = None) None
将 data_array 写入 FeatureStorage,从 index 开始。
备注
如果 index 为 None,则将 data_array 追加到 feature 中。
如果 len(data_array) == 0,则直接返回。
如果 (index - self.end_index) >= 1,则 self[end_index+1: index] 将被 np.nan 填充。
示例
feature: 3 4 4 5 5 6 >>> self.write([6, 7], index=6) feature: 3 4 4 5 5 6 6 6 7 7 >>> self.write([8], index=9) feature: 3 4 4 5 5 6 6 6 7 7 8 np.nan 9 8 >>> self.write([1, np.nan], index=3) feature: 3 1 4 np.nan 5 6 6 6 7 7 8 np.nan 9 8
- 属性 start_index: int | None
获取 FeatureStorage 的起始索引
备注
如果数据(storage)不存在,则返回 None
- 属性 end_index: int | None
获取 FeatureStorage 的结束索引
备注
数据范围的右端索引(两端均为闭区间)
下一个数据追加位置将是 end_index + 1
如果数据(storage)不存在,则返回 None
数据集
数据集类
- 类 qlib.data.dataset.__init__.数据集(**kwargs)
为模型训练和推理准备数据。
- __init__(**kwargs)
初始化旨在完成以下步骤:
- 初始化子实例以及数据集的状态(用于准备数据的信息)
用于准备数据的关键状态名称不应以‘_’开头,以便在序列化时能够保存到磁盘上。
- 设置数据
与数据相关的属性名应以‘_’开头,以便在序列化时不会被保存到磁盘。
数据可以指定用于计算准备阶段所需关键数据的信息。
- config(**kwargs)
config 用于配置那些无法从数据中学习得到的参数。
- setup_data(**kwargs)
设置数据。
我们将 setup_data 函数拆分以应对以下情况:
用户拥有一个已将学习状态保存在磁盘上的 Dataset 对象。
用户从磁盘加载 Dataset 对象。
用户调用 setup_data 来加载新数据。
用户基于之前的状态为模型准备数据。
- prepare(**kwargs) object
数据集的类型取决于模型(可以是 pd.DataFrame、pytorch.DataLoader 等)。参数应指定准备数据的范围。该方法应当:- 处理数据
返回处理后的数据
- 返回:
返回对象
- 返回类型:
object
- 类 qlib.data.dataset.__init__.DatasetH(handler: Dict | DataHandler, segments: Dict[str, Tuple], fetch_kwargs: Dict = {}, **kwargs)
带有数据处理(H)andler 的数据集
用户应尽量将数据预处理函数放入 handler 中。只有以下数据处理函数应保留在 Dataset 中:
处理逻辑与特定模型相关。
处理逻辑与数据划分相关。
- __init__(handler: Dict | DataHandler, segments: Dict[str, Tuple], fetch_kwargs: Dict = {}, **kwargs)
初始化底层数据。
- 参数:
handler (Union[dict, DataHandler]) –
handler 可以是:
DataHandler 的实例
DataHandler 的配置。请参考 DataHandler
segments (dict) –
描述数据分段的选项。以下是一些示例:
1) 'segments': { 'train': ("2008-01-01", "2014-12-31"), 'valid': ("2017-01-01", "2020-08-01",), 'test': ("2015-01-01", "2016-12-31",), } 2) 'segments': { 'insample': ("2008-01-01", "2014-12-31"), 'outsample': ("2017-01-01", "2020-08-01",), }
- config(handler_kwargs: dict | None = None, **kwargs)
初始化 DatasetH
- 参数:
handler_kwargs (dict) –
DataHandler 的配置,可能包含以下参数:
DataHandler.conf_data 的参数,例如 'instruments'、'start_time' 和 'end_time'。
kwargs (dict) –
DatasetH 的配置,例如
- segments字典
分段的配置,与 self.__init__ 中的 'segments' 相同
- setup_data(handler_kwargs: dict | None = None, **kwargs)
初始化数据
- 参数:
handler_kwargs (dict) –
DataHandler 的初始化参数,可能包含以下参数:
init_type : Handler 的初始化类型
enable_cache : 是否启用缓存
- prepare(segments: List[str] | Tuple[str] | str | slice | Index, col_set='__all', data_key='infer', **kwargs) List[DataFrame] | DataFrame
为学习和推理准备数据。
- 参数:
segments (Union[List[Text], Tuple[Text], Text, slice]) –
描述要准备的数据范围,以下是一些示例:
“train”
[“train”, “valid”]
col_set (str) –
该 col_set 将在获取数据时传递给 self.handler。TODO:实现自动化:
为测试数据选择 DK_I
为训练数据选择 DK_L。
data_key (str) – 要获取的数据:DK_* 默认值为 DK_I,表示获取用于推理的数据。
kwargs –
- kwargs 可能包含的参数:
- flt_colstr
仅存在于 TSDatasetH 中,可用于添加一列数据(True 或 False)以过滤数据。此参数仅在实例为 TSDatasetH 时支持。
- 返回类型:
Union[List[pd.DataFrame], pd.DataFrame]
- Raises:
NotImplementedError: –
数据加载器
- 类 qlib.data.dataset.loader.DataLoader
DataLoader 用于从原始数据源加载原始数据。
- 抽象 load(instruments, start_time=None, end_time=None) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多级索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
- 参数:
instruments (str 或 dict) – 可以是市场名称,也可以是由 InstrumentProvider 生成的标的配置文件。如果 instruments 的值为 None,则表示不进行过滤。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
- 返回:
从底层数据源加载数据
- 返回类型:
pd.DataFrame
- Raises:
KeyError: – 如果不支持 instruments 过滤,则抛出 KeyError
- 类 qlib.data.dataset.loader.DLWParser(config: list | tuple | dict)
(D)ata(L)oader (W)ith (P)arser 的缩写,用于特征和名称的解析
提取此类以便 QlibDataLoader 和其他数据加载器(例如 QdbDataLoader)可以共享字段。
- __init__(config: list | tuple | dict)
- 参数:
config (Union[list, tuple, dict]) –
该配置将用于描述字段和列名
<config> := { "group_name1": <fields_info1> "group_name2": <fields_info2> } or <config> := <fields_info> <fields_info> := ["expr", ...] | (["expr", ...], ["col_name", ...]) # NOTE: list or tuple will be treated as the things when parsing
- 抽象 load_group_df(instruments, exprs: list, names: list, start_time: str | Timestamp | None = None, end_time: str | Timestamp | None = None, gp_name: str | None = None) DataFrame
加载特定组的数据框
- 参数:
instruments – 金融工具。
exprs (list) – 描述数据内容的表达式。
names (list) – 数据的名称。
- 返回:
查询得到的数据框。
- 返回类型:
pd.DataFrame
- 加载(instruments=无, start_time=无, end_time=无) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多级索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
- 参数:
instruments (str 或 dict) – 可以是市场名称,也可以是由 InstrumentProvider 生成的标的配置文件。如果 instruments 的值为 None,则表示不进行过滤。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
- 返回:
从底层数据源加载数据
- 返回类型:
pd.DataFrame
- Raises:
KeyError: – 如果不支持 instruments 过滤,则抛出 KeyError
- 类 qlib.data.dataset.loader.QlibDataLoader(config: Tuple[list, tuple, dict], filter_pipe: List | None = None, swap_level: bool = True, freq: str | dict = 'day', inst_processors: dict | list | None = None)
与 QlibDataLoader 相同。字段可通过 config 定义
- __init__(config: Tuple[list, tuple, dict], filter_pipe: List | None = None, swap_level: bool | dict = 'day', inst_processors: dict | list | None = None)
- 参数:
config (Tuple[list, tuple, dict]) – 请参考 DLWParser 的文档
filter_pipe – 用于筛选投资标的的过滤管道
swap_level – 是否交换 MultiIndex 的层级
freq (dict 或 str) – 如果 type(config) == dict 且 type(freq) == str,则使用 freq 加载配置数据。如果 type(config) == dict 且 type(freq) == dict,则使用 freq[<组名>] 加载 config[<组名>] 的数据。
inst_processors (dict | list) – 如果 inst_processors 不为 None 且 type(config) == dict,则使用 inst_processors[<组名>] 加载 config[<组名>] 的数据。如果 inst_processors 是列表,则会将其应用于所有组。
- load_group_df(instruments, exprs: list, names: list, start_time: str | Timestamp | None = None, end_time: str | Timestamp | None = None, gp_name: str | None = None) DataFrame
加载特定组的数据框
- 参数:
instruments – 金融工具。
exprs (list) – 描述数据内容的表达式。
names (list) – 数据的名称。
- 返回:
查询得到的数据框。
- 返回类型:
pd.DataFrame
- 类 qlib.data.dataset.loader.StaticDataLoader(config: dict | str | DataFrame, join='outer')
支持从文件或直接提供的数据中加载数据的 DataLoader。
- __init__(config: dict | str | DataFrame, join='outer')
- 参数:
config (dict) – {fields_group: <路径或对象>}
join (str) – 如何对齐不同的数据框
- 加载(instruments=无, start_time=无, end_time=无) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多级索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
- 参数:
instruments (str 或 dict) – 可以是市场名称,也可以是由 InstrumentProvider 生成的标的配置文件。如果 instruments 的值为 None,则表示不进行过滤。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
- 返回:
从底层数据源加载数据
- 返回类型:
pd.DataFrame
- Raises:
KeyError: – 如果不支持 instruments 过滤,则抛出 KeyError
- 类 qlib.data.dataset.loader.NestedDataLoader(dataloader_l: List[Dict], join='left')
我们有多个 DataLoader,可以使用此类将它们组合起来。
- __init__(dataloader_l: List[Dict], join='left') None
- 参数:
dataloader_l (list[dict]) –
一个数据加载器的列表,例如
nd = NestedDataLoader( dataloader_l=[ { "class": "qlib.contrib.data.loader.Alpha158DL", }, { "class": "qlib.contrib.data.loader.Alpha360DL", "kwargs": { "config": { "label": ( ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"]) } } } ] )
join – 在合并时会传递给 pd.concat。
- 加载(instruments=无, start_time=无, end_time=无) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多级索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
- 参数:
instruments (str 或 dict) – 可以是市场名称,也可以是由 InstrumentProvider 生成的标的配置文件。如果 instruments 的值为 None,则表示不进行过滤。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
- 返回:
从底层数据源加载数据
- 返回类型:
pd.DataFrame
- Raises:
KeyError: – 如果不支持 instruments 过滤,则抛出 KeyError
- class qlib.data.dataset.loader.DataLoaderDH(handler_config: dict, fetch_kwargs: dict = {}, is_group=False)
基于 (D)ata (H)andler 的 DataLoader。它用于从数据处理器中加载多个数据——如果你只想从单个数据处理器加载数据,可以将其写在单个数据处理器中。
待办事项:是什么让这个模块使用起来不够简便。
对于在线场景
底层的数据处理器应进行配置,但数据加载器并未提供相应的接口和钩子。
- __init__(handler_config: dict, fetch_kwargs: dict = {}, is_group=False)
- 参数:
handler_config (dict) –
handler_config 将用于描述处理器(handlers)
<handler_config> := { "group_name1": <handler> "group_name2": <handler> } or <handler_config> := <handler> <handler> := DataHandler Instance | DataHandler Config
fetch_kwargs (dict) – fetch_kwargs 将用于描述 fetch 方法的不同参数,例如 col_set、squeeze、data_key 等。
is_group (bool) – is_group 将用于描述 handler_config 的键是否为分组
- 加载(instruments=无, start_time=无, end_time=无) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多级索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
- 参数:
instruments (str 或 dict) – 可以是市场名称,也可以是由 InstrumentProvider 生成的标的配置文件。如果 instruments 的值为 None,则表示不进行过滤。
start_time (str) – 时间范围的起始时间。
end_time (str) – 时间范围的结束时间。
- 返回:
从底层数据源加载数据
- 返回类型:
pd.DataFrame
- Raises:
KeyError: – 如果不支持 instruments 过滤,则抛出 KeyError
数据处理器
- 类 qlib.data.dataset.handler.DataHandler(instruments=无, start_time=无, end_time=无, data_loader: dict | str | DataLoader | 无 = 无, init_data=True, fetch_orig=True)
使用处理器的步骤:1. 初始化数据处理器(通过调用 init);2. 使用数据。
数据处理器尝试维护一个具有两级结构的处理器:datetime 和 instruments。
支持任意顺序的索引层级(顺序将在数据中体现)。当数据框的索引名称缺失时,将使用 <datetime, instruments> 的顺序。
数据示例:列的多级索引是可选的。
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
提升数据处理器性能的小技巧:使用 col_set=CS_RAW 获取数据将返回原始数据,可能避免在调用 loc 时 pandas 复制数据。
- __init__(instruments=None, start_time=None, end_time=None, data_loader: dict | str | DataLoader | None = None, init_data=True, fetch_orig=True)
- 参数:
instruments – 要获取的股票列表。
start_time – 原始数据的起始时间。
end_time – 原始数据的结束时间。
data_loader (Union[dict, str, DataLoader]) – 用于加载数据的数据加载器。
init_data – 在构造函数中初始化原始数据。
fetch_orig (bool) – 如果可能,返回原始数据而非副本。
- config(**kwargs)
数据的配置。# 从数据源加载哪些数据
当从数据集中加载已序列化的处理器时,将使用此方法。数据将使用不同的时间范围进行初始化。
- setup_data(enable_cache: 布尔值 = False)
在多次运行初始化的情况下设置数据
它负责维护以下变量:1) self._data
- 参数:
enable_cache (bool) –
默认值为 false:
如果 enable_cache == True:
处理后的数据将被保存到磁盘,当下次调用 init 时,处理器将直接从磁盘加载缓存的数据
- fetch(selector: 时间戳 | 切片 | 字符串 | 索引 = slice(None, None, None), level: 字符串 | 整数 = 'datetime', col_set: 字符串 | 列表[字符串] = '__all', squeeze: 布尔值 = False, proc_func: 可调用对象 | 无 = 无) DataFrame
从底层数据源获取数据
设计动机:- 为底层数据提供统一的接口。- 有可能使接口更加友好。- 用户可以通过这一额外层提升数据获取的性能
- 参数:
selector (Union[pd.Timestamp, slice, str]) –
描述如何通过索引选择数据,可分为以下几类
获取单个索引
获取一段索引范围
一个切片范围
pd.Index 用于指定特定索引
可能发生以下冲突
['20200101', '20210101'] 是表示选择这个切片范围,还是仅选择这两天?
切片具有更高的优先级
level (Union[str, int]) – 指定按哪个索引层级来选择数据
col_set (Union[str, List[str]]) –
if isinstance(col_set, str):
选择一组有意义的 pd.Index 列(例如特征、字段)
if col_set == CS_RAW:
将返回原始数据集。
if isinstance(col_set, List[str]):
选择多组有意义的列,返回的数据具有多层结构
proc_func (Callable) –
在获取数据前提供一个用于处理数据的钩子(hook)
一个示例说明该钩子的必要性:
某个数据集学习了一些处理器,用于处理与数据分割相关的数据。
每次准备数据时都会应用这些处理器。
这些学习到的处理器要求数据框在拟合和应用时保持相同的格式。
然而数据格式会根据参数变化而改变。
因此这些处理器应当应用于底层数据。
squeeze (bool) – 是否压缩列和索引
- 返回类型:
pd.DataFrame。
- get_cols(col_set='__all') list
获取列名
- 参数:
col_set (str) – 选择一组有意义的列。(例如:特征、列)
- 返回:
列名列表
- 返回类型:
list
- get_range_selector(cur_date: Timestamp | str, periods: int) slice
根据周期数获取范围选择器
- 参数:
cur_date (pd.Timestamp 或 str) – 当前日期
periods (int) – 周期数量
- get_range_iterator(periods: int, min_periods: int | None = None, **kwargs) Iterator[Tuple[Timestamp, DataFrame]]
获取指定周期的切片数据迭代器
- 参数:
periods (int) – 周期数量。
min_periods (int) – 切片数据框的最小周期数。
kwargs (dict) – 将被传递给 self.fetch。
- 类 qlib.data.dataset.handler.DataHandlerLP(instruments=无, start_time=无, end_time=无, data_loader: 字典 | 字符串 | DataLoader | 无 = 无, infer_processors: 列表 = [], learn_processors: 列表 = [], shared_processors: 列表 = [], process_type='append', drop_raw=False, **kwargs)
带有(可)学习(处)理器的 DataHandler
该处理器将以 pd.DataFrame 格式生成三组数据。
DK_R / self._data:从加载器中加载的原始数据
DK_I / self._infer:为推理处理过的数据
DK_L / self._learn:为模型学习处理过的数据
使用不同的处理器流程分别用于学习和推理的原因如下,以下是一些示例。
用于学习和推理的标的范围可能不同。
某些样本的处理可能依赖于标签(例如,某些触及涨跌停的样本可能需要额外处理或被剔除)。
这些处理器仅应用于学习阶段。
关于数据处理器的提示
为了降低内存开销
drop_raw=True:这将在原始数据上进行就地修改;
请注意,像 self._infer 或 self._learn 这样的已处理数据,与 Qlib 的 Dataset 中的 segments(如“train”和“test”)概念不同
像 self._infer 或 self._learn 这样的已处理数据,是指使用不同处理器处理得到的底层数据
Qlib 的 Dataset 中的 segments(如“train”和“test”)仅仅是在查询数据时的时间分段(“train”通常在时间序列上早于“test”)
例如,你可以在“train”时间段内查询由 infer_processors 处理过的 data._infer
- __init__(instruments=None, start_time=None, end_time=None, data_loader: dict | str | DataLoader | None = None, infer_processors: List = [], learn_processors: List = [], shared_processors: List = [], process_type='append', drop_raw=False, **kwargs)
- 参数:
infer_processors (列表) –
用于生成推理数据的处理器的<描述信息>列表
<描述信息>示例:
1) classname & kwargs: { "class": "MinMaxNorm", "kwargs": { "fit_start_time": "20080101", "fit_end_time": "20121231" } } 2) Only classname: "DropnaFeature" 3) object instance of Processor
learn_processors (列表) – 与 infer_processors 类似,但用于生成模型学习所需的数据
process_type (字符串) –
PTYPE_I = ‘independent’
self._infer 将由 infer_processors 处理
self._learn 将由 learn_processors 处理
PTYPE_A = ‘append’
self._infer 将由 infer_processors 处理
self._learn 将由 infer_processors + learn_processors 共同处理
(例如,self._infer 由 learn_processors 处理)
drop_raw (布尔值) – 是否丢弃原始数据
- fit()
在不处理数据的情况下进行拟合
- fit_process_data()
拟合并处理数据
fit 的输入将是前一个处理器的输出
- process_data(with_fit: 布尔值 = False)
处理数据。如有必要,调用 processor.fit
符号说明:(数据) [处理器]
# self.process_type == DataHandlerLP.PTYPE_I 时的数据处理流程
(self._data)-[shared_processors]-(_shared_df)-[learn_processors]-(_learn_df) \ -[infer_processors]-(_infer_df)
# self.process_type == DataHandlerLP.PTYPE_A 时的数据处理流程
(self._data)-[shared_processors]-(_shared_df)-[infer_processors]-(_infer_df)-[learn_processors]-(_learn_df)
- 参数:
with_fit (布尔值) – fit 的输入将是前一个处理器的输出
- config(processor_kwargs: dict | None = None, **kwargs)
数据的配置。# 从数据源加载哪些数据
当从数据集中加载已序列化的处理器时,将使用此方法。数据将使用不同的时间范围进行初始化。
- setup_data(init_type: str = 'fit_seq', **kwargs)
在多次初始化运行时设置数据
- 参数:
init_type (字符串) – 上述列出的 IT_* 类型。
enable_cache (bool) –
默认值为 false:
如果 enable_cache == True:
处理后的数据将被保存到磁盘,当下次调用 init 时,处理器将直接从磁盘加载缓存的数据
- fetch(selector: Timestamp | slice | str = slice(None, None, None), level: str | int = 'datetime', col_set='__all', data_key: Literal['raw', 'infer', 'learn'] = 'infer', squeeze: bool = False, proc_func: Callable | None = None) DataFrame
从底层数据源获取数据
- 参数:
selector (Union[pd.Timestamp, slice, str]) – 描述如何通过索引选择数据。
level (Union[str, int]) – 选择数据的索引层级。
col_set (str) – 选择一组有意义的列。(例如:特征、列)。
data_key (str) – 要获取的数据:DK_*。
proc_func (Callable) – 请参考 DataHandler.fetch 的文档。
- 返回类型:
pd.DataFrame
- get_cols(col_set='__all', data_key: Literal['raw', 'infer', 'learn'] = 'infer') list
获取列名
- 参数:
col_set (str) – 选择一组有意义的列。(例如:特征、列)。
data_key (DATA_KEY_TYPE) – 要获取的数据:DK_*。
- 返回:
列名列表
- 返回类型:
list
- 类方法 转换(处理器: DataHandlerLP) DataHandlerLP
动机
用户在其自定义包中创建了一个 datahandler。然后他希望与其他用户共享已处理的 handler,而无需引入包依赖和复杂的数据处理逻辑。
该类通过将类转换为 DataHandlerLP 并仅保留已处理的数据来实现这一目标。
- 参数:
handler (DataHandlerLP) – DataHandlerLP 的一个子类
- 返回:
转换后的已处理数据
- 返回类型:
- 类方法 从 DataFrame 创建(df: DataFrame) DataHandlerLP
动机:- 当用户想要快速获得一个数据处理器时。
创建的数据处理器将只有一个共享的 DataFrame,且不包含任何处理器。创建处理器后,用户通常希望将其导出以便重复使用。以下是一个典型的使用场景
from qlib.data.dataset import DataHandlerLP dh = DataHandlerLP.from_df(df) dh.to_pickle(fname, dump_all=True)
待办事项:- StaticDataLoader 非常慢。其实没有必要再次复制数据……
处理器
- qlib.data.dataset.processor.get_group_columns(df: DataFrame, group: str | None)
从具有多级索引列的 DataFrame 中获取一组列
- 参数:
df (pd.DataFrame) – 包含多级列的 DataFrame。
group (str) – 特征组的名称,即组索引第一级的值。
- 类 qlib.data.dataset.processor.处理器
- fit(df: DataFrame | None = None)
学习数据处理参数
- 参数:
df (pd.DataFrame) – 当我们逐个使用处理器进行拟合和处理数据时,fit 函数依赖于前一个处理器的输出,即 df。
- is_for_infer() 布尔值
该处理器是否可用于推理?某些处理器不能用于推理。
- 返回:
如果它可用于推理。
- 返回类型:
布尔值
- readonly() 布尔值
处理器在处理时是否将输入数据视为只读(即不修改输入数据)
了解只读信息有助于 Handler 避免不必要的数据复制
- config(**kwargs)
配置可序列化的对象
- 参数:
keys (kwargs 可能包含以下内容) –
- dump_allbool
对象是否会转储所有属性
- excludelist
哪些属性不会被转储
- includelist
哪些属性会被转储
recursive (bool) – 配置是否递归生效
- 类 qlib.data.dataset.processor.DropnaProcessor(fields_group=None)
- __init__(fields_group=None)
- readonly()
处理器在处理时是否将输入数据视为只读(即不修改输入数据)
了解只读信息有助于 Handler 避免不必要的数据复制
- 类 qlib.data.dataset.processor.DropnaLabel(fields_group='label')
- __init__(fields_group='label')
- is_for_infer() 布尔值
样本根据标签被丢弃,因此不能用于推理
- 类 qlib.data.dataset.processor.DropCol(col_list=[])
- __init__(col_list=[])
- readonly()
处理器在处理时是否将输入数据视为只读(即不修改输入数据)
了解只读信息有助于 Handler 避免不必要的数据复制
- 类 qlib.data.dataset.processor.FilterCol(fields_group='feature', col_list=[])
- __init__(fields_group='feature', col_list=[])
- readonly()
处理器在处理时是否将输入数据视为只读(即不修改输入数据)
了解只读信息有助于 Handler 避免不必要的数据复制
- 类 qlib.data.dataset.processor.TanhProcess
使用 tanh 处理噪声数据
- 类 qlib.data.dataset.processor.ProcessInf
处理无穷值
- 类 qlib.data.dataset.processor.Fillna(fields_group=None, fill_value=0)
处理 NaN
- __init__(fields_group=None, fill_value=0)
- 类 qlib.data.dataset.processor.MinMaxNorm(fit_start_time, fit_end_time, fields_group=None)
- __init__(fit_start_time, fit_end_time, fields_group=None)
- fit(df: DataFrame | None = None)
学习数据处理参数
- 参数:
df (pd.DataFrame) – 当我们逐个使用处理器进行拟合和处理数据时,fit 函数依赖于前一个处理器的输出,即 df。
- 类 qlib.data.dataset.processor.ZScoreNorm(fit_start_time, fit_end_time, fields_group=None)
ZScore 归一化
- __init__(fit_start_time, fit_end_time, fields_group=None)
- fit(df: DataFrame | None = None)
学习数据处理参数
- 参数:
df (pd.DataFrame) – 当我们逐个使用处理器进行拟合和处理数据时,fit 函数依赖于前一个处理器的输出,即 df。
- 类 qlib.data.dataset.processor.RobustZScoreNorm(fit_start_time, fit_end_time, fields_group=None, clip_outlier=True)
稳健 ZScore 归一化
- 使用稳健统计量进行 Z-Score 归一化:
mean(x) = median(x) std(x) = MAD(x) * 1.4826
- 参考:
- __init__(fit_start_time, fit_end_time, fields_group=None, clip_outlier=True)
- fit(df: DataFrame | None = None)
学习数据处理参数
- 参数:
df (pd.DataFrame) – 当我们逐个使用处理器进行拟合和处理数据时,fit 函数依赖于前一个处理器的输出,即 df。
- 类 qlib.data.dataset.processor.CSZScoreNorm(fields_group=None, method='zscore')
横截面 ZScore 标准化
- __init__(fields_group=None, method='zscore')
- 类 qlib.data.dataset.processor.CSRankNorm(fields_group=None)
横截面排序标准化。“横截面”常用于描述数据操作,对不同股票进行的操作通常称为横截面操作。
例如,CSRankNorm 是一种按天分组并对每天所有股票的数据进行横向排序的操作。
关于 3.46 和 0.5 的说明
import numpy as np import pandas as pd x = np.random.random(10000) # for any variable x_rank = pd.Series(x).rank(pct=True) # if it is converted to rank, it will be a uniform distributed x_rank_norm = (x_rank - x_rank.mean()) / x_rank.std() # Normally, we will normalize it to make it like normal distribution x_rank.mean() # accounts for 0.5 1 / x_rank.std() # accounts for 3.46
- __init__(fields_group=None)
- 类 qlib.data.dataset.processor.HashStockFormat
将数据从 DataFrame 处理为哈希股票格式的存储
- 类 qlib.data.dataset.processor.TimeRangeFlt(start_time: Timestamp | str | None = None, end_time: Timestamp | str | None = None, freq: str = 'day')
这是一个用于筛选股票的过滤器。仅保留从 start_time 到 end_time 期间存在的数据(不检查期间中间是否存在)。警告:可能会导致数据泄露!!!
- __init__(start_time: Timestamp | str | None = None, end_time: Timestamp | str | None = None, freq: str = 'day')
- 参数:
start_time (Optional[Union[pd.Timestamp, str]]) – 数据的起始时间必须早于或等于 start_time;若为 None,则表示不会根据 start_time 过滤数据
end_time (Optional[Union[pd.Timestamp, str]]) – 与 start_time 类似
freq (str) – 日历的频率
贡献者
模型
- 类 qlib.model.base.模型
可学习的模型
- fit(dataset: Dataset, reweighter: Reweighter)
从基础模型中学习模型
注意
已学习模型的属性名不应以‘_’开头,以便模型可以被保存到磁盘。
以下代码示例展示了如何从 dataset 中获取 x_train、y_train 和 w_train:
# get features and labels df_train, df_valid = dataset.prepare( ["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L ) x_train, y_train = df_train["feature"], df_train["label"] x_valid, y_valid = df_valid["feature"], df_valid["label"] # get weights try: wdf_train, wdf_valid = dataset.prepare(["train", "valid"], col_set=["weight"], data_key=DataHandlerLP.DK_L) w_train, w_valid = wdf_train["weight"], wdf_valid["weight"] except KeyError as e: w_train = pd.DataFrame(np.ones_like(y_train.values), index=y_train.index) w_valid = pd.DataFrame(np.ones_like(y_valid.values), index=y_valid.index)
- 参数:
dataset (Dataset) – 该数据集将生成用于模型训练的已处理数据。
- 类 qlib.model.base.微调模型
模型(F)ine(t)unable(可微调)
- 抽象 finetune(dataset: 数据集)
基于给定数据集对模型进行微调
使用 qlib.workflow.R 对模型进行微调的典型用例
# start exp to train init model with R.start(experiment_name="init models"): model.fit(dataset) R.save_objects(init_model=model) rid = R.get_recorder().id # Finetune model based on previous trained model with R.start(experiment_name="finetune model"): recorder = R.get_recorder(recorder_id=rid, experiment_name="init models") model = recorder.load_object("init_model") model.finetune(dataset, num_boost_round=10)
- 参数:
dataset (Dataset) – 该数据集将生成用于模型训练的已处理数据集。
策略
- 类 qlib.contrib.strategy.TopkDropoutStrategy(*, topk, n_drop, method_sell='bottom', method_buy='top', hold_thresh=1, only_tradable=False, forbid_all_trade_at_limit=True, **kwargs)
- __init__(*, topk, n_drop, method_sell='bottom', method_buy='top', hold_thresh=1, only_tradable=False, forbid_all_trade_at_limit=True, **kwargs)
- 参数:
topk (int) – 投资组合中股票的数量。
n_drop (int) – 每个交易日需要替换的股票数量。
method_sell (str) – 卖出股票的淘汰方法,可选 random(随机)或 bottom(末位)。
method_buy (str) – 买入股票的筛选方法,可选 random(随机)或 top(首位)。
hold_thresh (int) – 卖出股票前的最小持有天数,将检查 current.get_stock_count(order.stock_id) >= self.hold_thresh。
only_tradable (bool) –
策略在买卖时是否仅考虑可交易的股票。
如果 only_tradable 为 True:
策略将根据股票的可交易状态进行决策,并避免买卖不可交易的股票。
否则:
策略在做出买卖决策时不会检查股票的可交易状态。
forbid_all_trade_at_limit (bool) –
当股票达到涨停或跌停时,是否禁止所有交易。
如果 forbid_all_trade_at_limit 为 True:
当股价达到涨停或跌停时,策略将不进行任何交易,即使现实中允许,也不会在涨停时卖出或跌停时买入。
否则:
策略将在涨停时卖出,在跌停时买入。
- generate_trade_decision(execute_result=None)
在每个交易周期生成交易决策
- 参数:
execute_result (List[object], 可选) –
交易决策的执行结果,默认为 None
首次调用 generate_trade_decision 时,execute_result 可以为 None
- 类 qlib.contrib.strategy.WeightStrategyBase(*, order_generator_cls_or_obj=<类 'qlib.contrib.strategy.order_generator.OrderGenWOInteract'>, **kwargs)
- __init__(*, order_generator_cls_or_obj=<类 'qlib.contrib.strategy.order_generator.OrderGenWOInteract'>, **kwargs)
- signal(信号):
用于描述一个信号的信息。请参考 qlib.backtest.signal.create_signal_from 的文档,策略的决策将基于给定的信号。
- trade_exchange交易所
提供市场信息的交易所,用于处理订单并生成报告
如果 trade_exchange 为 None,则 self.trade_exchange 将被设置为 common_infra
它允许在不同的执行过程中使用不同的 trade_exchange。
例如:
在日频执行中,日频交易所和分钟频交易所都可用,但推荐使用日频交易所,因为它运行更快。
在分钟频执行中,日频交易所不可用,仅推荐使用分钟频交易所。
- generate_target_weight_position(score, current, trade_start_time, trade_end_time)
根据当日的得分和当前持仓生成目标持仓,生成时不考虑现金部分。
- 参数:
score (pd.Series) – 当日的预测得分,索引为股票代码,包含 'score' 列。
current (Position()) – 当前持仓。
trade_start_time (pd.Timestamp) – 交易开始时间。
trade_end_time (pd.Timestamp) – 交易结束时间。
- generate_trade_decision(execute_result=None)
在每个交易周期生成交易决策
- 参数:
execute_result (List[object], 可选) –
交易决策的执行结果,默认为 None
首次调用 generate_trade_decision 时,execute_result 可以为 None
- 类 qlib.contrib.strategy.EnhancedIndexingStrategy(*, riskmodel_root, market='csi500', turn_limit=None, name_mapping={}, optimizer_kwargs={}, verbose=False, **kwargs)
增强型指数策略
增强型指数投资结合了主动管理和被动管理的优点,旨在使投资组合收益超越基准指数(例如标普 500),同时控制风险敞口(即跟踪误差)。
用户需要准备如下所示的风险模型数据:
├── /path/to/riskmodel ├──── 20210101 ├────── factor_exp.{csv|pkl|h5} ├────── factor_cov.{csv|pkl|h5} ├────── specific_risk.{csv|pkl|h5} ├────── blacklist.{csv|pkl|h5} # optional
风险模型数据可从风险数据提供商处获取。您也可以使用 qlib.model.riskmodel.structured.StructuredCovEstimator 来生成这些数据。
- 参数:
riskmodel_path (str) – 风险模型路径
name_mapping (dict) – 替代的文件名映射
- __init__(*, riskmodel_root, market='csi500', turn_limit=None, name_mapping={}, optimizer_kwargs={}, verbose=False, **kwargs)
- signal(信号):
用于描述一个信号的信息。请参考 qlib.backtest.signal.create_signal_from 的文档,策略的决策将基于给定的信号。
- trade_exchange交易所
提供市场信息的交易所,用于处理订单并生成报告
如果 trade_exchange 为 None,则 self.trade_exchange 将被设置为 common_infra
它允许在不同的执行过程中使用不同的 trade_exchange。
例如:
在日频执行中,日频交易所和分钟频交易所都可用,但推荐使用日频交易所,因为它运行更快。
在分钟频执行中,日频交易所不可用,仅推荐使用分钟频交易所。
- generate_target_weight_position(score, current, trade_start_time, trade_end_time)
根据当日的得分和当前持仓生成目标持仓,生成时不考虑现金部分。
- 参数:
score (pd.Series) – 当日的预测得分,索引为股票代码,包含 'score' 列。
current (Position()) – 当前持仓。
trade_start_time (pd.Timestamp) – 交易开始时间。
trade_end_time (pd.Timestamp) – 交易结束时间。
- 类 qlib.contrib.strategy.TWAPStrategy(outer_trade_decision: BaseTradeDecision = None, level_infra: LevelInfrastructure = None, common_infra: CommonInfrastructure = None, trade_exchange: Exchange = None)
用于交易的 TWAP 策略
注意
该 TWAP 策略在交易时会向上取整。当总交易单位金额小于交易步长时,这将使 TWAP 交易策略更早地生成订单。
- reset(outer_trade_decision: BaseTradeDecision | None = None, **kwargs)
- 参数:
outer_trade_decision (BaseTradeDecision,可选) –
- generate_trade_decision(execute_result=None)
在每个交易周期生成交易决策
- 参数:
execute_result (List[object], 可选) –
交易决策的执行结果,默认为 None
首次调用 generate_trade_decision 时,execute_result 可以为 None
- 类 qlib.contrib.strategy.SBBStrategyBase(outer_trade_decision: BaseTradeDecision = None, level_infra: LevelInfrastructure = None, common_infra: CommonInfrastructure = None, trade_exchange: Exchange = None)
在每两个相邻的交易 K 线中选择(S)表现更优者进行买入或卖出(B)
- reset(outer_trade_decision: BaseTradeDecision | None = None, **kwargs)
- 参数:
outer_trade_decision (BaseTradeDecision,可选) –
- generate_trade_decision(execute_result=None)
在每个交易周期生成交易决策
- 参数:
execute_result (List[object], 可选) –
交易决策的执行结果,默认为 None
首次调用 generate_trade_decision 时,execute_result 可以为 None
- 类 qlib.contrib.strategy.SBBStrategyEMA(outer_trade_decision: BaseTradeDecision | None = None, instruments: List | str = 'csi300', freq: str = 'day', trade_exchange: Exchange | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs)
在每两个相邻的交易柱中,根据(EMA)信号选择(S)更优的一个进行卖出或买入。
- __init__(outer_trade_decision: BaseTradeDecision | None = None, instruments: List | str = 'csi300', freq: str = 'day', trade_exchange: Exchange | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs)
- 参数:
instruments (Union[List, str], 可选) – EMA 信号的标的, 默认为 “csi300”
freq (str, 可选) – EMA 信号的频率,默认为 “day”。注意:freq 可能与 time_per_step 不同
- reset_level_infra(level_infra)
重置层级共享的基础设施 - 重置交易日历后,信号将会发生变化
- 类 qlib.contrib.strategy.SoftTopkStrategy(model, dataset, topk, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWInteract'>, max_sold_weight=1.0, risk_degree=0.95, buy_method='first_fill', trade_exchange=None, level_infra=None, common_infra=None, **kwargs)
- __init__(model, dataset, topk, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWInteract'>, max_sold_weight=1.0, risk_degree=0.95, buy_method='first_fill', trade_exchange=None, level_infra=None, common_infra=None, **kwargs)
- 参数:
topk (int) – 要买入的前 N 只股票
risk_degree (float) –
投资金额占总资金的比例 buy_method:
rank_fill:优先为排名靠前的股票分配权重(1/topk 最大);average_fill:平均地为排名靠前的股票分配权重。
- get_risk_degree(trade_step=None)
返回投资时将使用的资金占总资金的比例。动态的 risk_degree 会导致市场择时。
- generate_target_weight_position(score, current, trade_start_time, trade_end_time)
- 参数:
score – 当前交易日的预测得分,pd.Series,索引为 stock_id,包含 'score' 列
current – 当前持仓,使用 Position() 类
trade_date –
交易日期
根据当前日期的得分和现有持仓生成目标持仓
持仓计算中未考虑缓存
评估
- qlib.contrib.evaluate.risk_analysis(r, N: int | None = None, freq: str = 'day')
风险分析说明:年化收益的计算方式与传统定义不同,这是设计上的实现。Qlib 采用累加而非累乘的方式来累积收益,以避免累积曲线呈指数级偏斜。Qlib 中所有年化收益的计算均遵循此原则。
待办事项:添加一个参数,以支持使用收益累乘方式计算指标。
- 参数:
r (pandas.Series) – 日收益率序列。
N (int) – 年化信息比率的缩放系数(日:252,周:50,月:12),N 和 freq 至少提供其一
freq (str) – 用于计算缩放系数的分析频率,N 和 freq 至少提供其一
- qlib.contrib.evaluate.indicator_analysis(df, method='mean')
分析交易的统计时间序列指标
- 参数:
df (pandas.DataFrame) –
- 列:如 [‘pa’, ‘pos’, ‘ffr’, ‘deal_amount’, ‘value’]。
- 必需字段:
’pa’ 表示交易指标中的价格优势
’pos’ 表示交易指标中的正向率
’ffr’ 表示交易指标中的成交率
- 可选字段:
’deal_amount’ 表示总成交数量,仅在 method 为 ‘amount_weighted’ 时需要
’value’ 表示总交易价值,仅在 method 为 ‘value_weighted’ 时需要
索引:Index(datetime)
method (str, 可选) –
pa/ffr 的统计方法,默认为 “mean”
若 method 为 ‘mean’,则计算每个交易指标的均值统计结果
若 method 为 ‘amount_weighted’,则按成交数量加权计算每个交易指标的均值统计结果
若 method 为 ‘value_weighted’,则按交易价值加权计算每个交易指标的均值统计结果
注意:pos 的统计方法始终为 “mean”
- 返回:
各交易指标的统计值
- 返回类型:
pd.DataFrame
- qlib.contrib.evaluate.backtest_daily(start_time: str | Timestamp, end_time: str | Timestamp, strategy: str | dict | BaseStrategy, executor: str | dict | BaseExecutor | None = None, account: float | int | Position = 100000000.0, benchmark: str = 'SH000300', exchange_kwargs: dict | None = None, pos_type: str = 'Position')
初始化策略和执行器,然后执行日频回测
- 参数:
start_time (Union[str, pd.Timestamp]) – 回测的闭区间开始时间 注意:该时间将应用于最外层执行器的日历。
end_time (Union[str, pd.Timestamp]) – 回测的闭区间结束时间 注意:该时间将应用于最外层执行器的日历。例如 Executor[day](Executor[1min]),设置 end_time == 20XX0301 将包含 20XX0301 当天的所有分钟数据
strategy (Union[str, dict, BaseStrategy]) –
用于初始化最外层投资组合策略。更多信息请参考 init_instance_by_config 的文档。
例如
# dict strategy = { "class": "TopkDropoutStrategy", "module_path": "qlib.contrib.strategy.signal_strategy", "kwargs": { "signal": (model, dataset), "topk": 50, "n_drop": 5, }, } # BaseStrategy pred_score = pd.read_pickle("score.pkl")["score"] STRATEGY_CONFIG = { "topk": 50, "n_drop": 5, "signal": pred_score, } strategy = TopkDropoutStrategy(**STRATEGY_CONFIG) # str example. # 1) specify a pickle object # - path like 'file:///<path to pickle file>/obj.pkl' # 2) specify a class name # - "ClassName": getattr(module, "ClassName")() will be used. # 3) specify module path with class name # - "a.b.c.ClassName" getattr(<a.b.c.module>, "ClassName")() will be used.
executor (Union[str, dict, BaseExecutor]) – 用于初始化最外层执行器。
benchmark (str) – 用于生成报告的基准指数。
account (Union[float, int, Position]) –
描述如何创建账户的信息
对于 float 或 int:
使用仅含初始现金的账户
对于 Position:
使用包含已有持仓的账户
exchange_kwargs (dict) –
用于初始化交易所的参数。例如
exchange_kwargs = { "freq": freq, "limit_threshold": None, # limit_threshold is None, using C.limit_threshold "deal_price": None, # deal_price is None, using C.deal_price "open_cost": 0.0005, "close_cost": 0.0015, "min_cost": 5, }
pos_type (str) – 持仓的类型。
- 返回:
report_normal (pd.DataFrame) – 回测报告
positions_normal (pd.DataFrame) – 回测持仓
- qlib.contrib.evaluate.long_short_backtest(pred, topk=50, shift=1, open_cost=0, close_cost=0, trade_unit=None, limit_threshold=None, min_cost=5, subscribe_fields=[], extract_codes=False)
多空策略的回测
- 参数:
pred – 在第 T 天生成的交易信号。
topk – 做空排名前 topk 的证券,做多排名前 topk 的证券。
deal_price – 用于成交的交易价格。
shift – 是否将预测结果向后移动一天。若 shift==1,则交易日为 T+1。
open_cost – 开仓交易成本。
close_cost – 平仓交易成本。
trade_unit – 中国 A 股为 100。
limit_threshold – 涨跌停限制,例如 0.1(即 10%),做多和做空采用相同的限制。
min_cost – 最小交易成本。
subscribe_fields – 订阅字段。
extract_codes – 布尔值。是否将从 pred 中提取的代码传递给交易所。注意:使用离线 Qlib 时速度更快。
- 返回:
回测结果,以字典形式表示:{ “long”: 多头收益(超额), “short”: 空头收益(超额), “long_short”: 多空收益 }
报告
工作流
实验管理器
- 类 qlib.workflow.expm.ExpManager(uri: str, default_exp_name: str | None)
这是用于管理实验的 ExpManager 类,其 API 设计参考了 mlflow(链接:https://mlflow.org/docs/latest/python_api/mlflow.html)。
ExpManager 被设计为单例模式(顺便提一下,我们可以使用不同的 uri 拥有多个 Experiment,用户可以从不同 uri 获取不同的实验,并比较它们的记录)。全局配置(即 C)也是一个单例。
因此我们尝试将它们保持一致。它们共享同一个变量,称为 默认 uri。有关变量共享的详细信息,请参见 ExpManager.default_uri。
当用户启动一个实验时,可能希望将 uri 设置为特定的 uri(在此期间会覆盖 默认 uri),之后再取消设置该 特定 uri,并恢复使用 默认 uri。ExpManager._active_exp_uri 就是这个 特定 uri。
- __init__(uri: str, default_exp_name: str | None)
- start_exp(*, experiment_id: str | None = None, experiment_name: str | None = None, recorder_id: str | None = None, recorder_name: str | None = None, uri: str | None = None, resume: bool = False, **kwargs) Experiment
启动一个实验。该方法首先调用 get_or_create 来获取或创建一个实验,然后将其设置为激活状态。
在 start_exp 中包含了对 _active_exp_uri 的维护,其余实现应在子类的 _end_exp 中完成。
- 参数:
experiment_id (str) – 当前激活实验的 ID。
experiment_name (str) – 当前激活实验的名称。
recorder_id (str) – 要启动的记录器的 ID。
recorder_name (str) – 要启动的记录器的名称。
uri (str) – 当前的跟踪 URI。
resume (boolean) – 是否恢复实验和记录器。
- 返回类型:
一个激活状态的实验。
- end_exp(recorder_status: str = 'SCHEDULED', **kwargs)
结束一个激活状态的实验。
在 end_exp 中包含了对 _active_exp_uri 的维护,其余实现应在子类的 _end_exp 中完成。
- 参数:
experiment_name (str) – 当前激活实验的名称。
recorder_status (str) – 实验中激活记录器的状态。
- create_exp(experiment_name: str | None = None)
创建一个实验。
- 参数:
experiment_name (str) – 实验名称,必须唯一。
- 返回类型:
一个实验对象。
- Raises:
ExpAlreadyExistError –
- search_records(experiment_ids=None, **kwargs)
获取符合实验搜索条件的记录的 pandas DataFrame。输入为用户希望应用的搜索条件。
- 返回:
一个 pandas.DataFrame 格式的记录,其中每个指标(metric)、参数(parameter)和标签(tag)
都被展开为独立的列,列名分别为 metrics.*, params.*, 和 tags.*
对于没有特定指标、参数或标签的记录,其值将分别为 (NumPy) NaN、None 或 None。
值将分别为 (NumPy) NaN、None 或 None。
- get_exp(*, experiment_id=None, experiment_name=None, create: bool = True, start: bool = False)
检索一个实验。该方法包括获取一个活跃的实验,或根据指定条件获取或创建一个特定的实验。
当用户指定了实验的 ID 和名称时,该方法会尝试返回对应的特定实验。当用户未提供实验 ID 或名称时,该方法会尝试返回当前活跃的实验。create 参数决定了如果该实验尚未创建,是否根据用户的指定自动创建一个新实验。
如果 create 为 True:
如果存在活跃实验:
未指定 ID 或名称,则返回当前活跃实验。
如果指定了 ID 或名称,则返回指定的实验;若未找到该实验,则创建一个具有给定 ID 或名称的新实验。如果将 start 设置为 True,则该实验会被设为活跃状态。
如果不存在活跃实验:
未指定 ID 或名称,则创建一个默认实验。
如果指定了 ID 或名称,则返回指定的实验;若未找到该实验,则创建一个具有给定 ID 或名称的新实验。如果将 start 设置为 True,则该实验会被设为活跃状态。
否则,如果 create 为 False:
如果存在活跃实验:
未指定 ID 或名称,则返回当前活跃实验。
如果指定了 ID 或名称,则返回指定的实验;若未找到该实验,则抛出错误。
如果不存在活跃实验:
未指定 ID 或名称。如果默认实验存在,则返回它;否则抛出错误。
如果指定了 ID 或名称,则返回指定的实验;若未找到该实验,则抛出错误。
- 参数:
experiment_id (str) – 要返回的实验的 ID。
experiment_name (str) – 要返回的实验的名称。
create (boolean) – 如果实验尚未创建,则自动创建。
start (boolean) – 如果创建了新实验,则启动该实验。
- 返回类型:
一个实验对象。
- delete_exp(experiment_id=None, experiment_name=None)
删除一个实验。
- 参数:
experiment_id (str) – 实验的 ID。
experiment_name (str) – 实验的名称。
- 属性 default_uri
从 qlib.config.C 获取默认的跟踪 URI
- 属性 uri
获取默认或当前的跟踪 URI。
- 返回类型:
跟踪 URI 字符串。
- list_experiments()
列出所有现有的实验。
- 返回类型:
一个字典(名称 -> 实验),包含存储的所有实验信息。
实验
- class qlib.workflow.exp.Experiment(id, name)
这是每个运行实验所对应的实验类。该 API 的设计参考了 mlflow。(链接:https://mlflow.org/docs/latest/python_api/mlflow.html)
- __init__(id, name)
- start(*, recorder_id=None, recorder_name=None, resume=False)
启动实验并将其设置为激活状态。此方法还将启动一个新的记录器(recorder)。
- 参数:
recorder_id (str) – 要创建的记录器的 ID。
recorder_name (str) – 要创建的记录器的名称。
resume (bool) – 是否恢复第一个记录器
- 返回类型:
一个活跃的记录器。
- 结束(记录器状态='SCHEDULED')
结束实验。
- 参数:
recorder_status (str) – 结束时设置记录器的状态(SCHEDULED、RUNNING、FINISHED、FAILED)。
- create_recorder(recorder_name=None)
为每个实验创建一个记录器。
- 参数:
recorder_name (str) – 要创建的记录器的名称。
- 返回类型:
一个记录器对象。
- search_records(**kwargs)
获取符合实验搜索条件的记录的 pandas DataFrame。输入为用户希望应用的搜索条件。
- 返回:
一个 pandas.DataFrame 格式的记录,其中每个指标(metric)、参数(parameter)和标签(tag)
都被展开为独立的列,列名分别为 metrics.*, params.*, 和 tags.*
对于没有特定指标、参数或标签的记录,其值将分别为 (NumPy) NaN、None 或 None。
值将分别为 (NumPy) NaN、None 或 None。
- delete_recorder(recorder_id)
为每个实验创建一个记录器。
- 参数:
recorder_id (str) – 要删除的记录器的 ID。
- get_recorder(recorder_id=None, recorder_name=None, create: bool = True, start: bool = False) Recorder
获取用户的记录器。当用户提供 recorder_id 或 recorder_name 时,该方法会尝试返回指定的记录器;当用户未提供 recorder_id 或 recorder_name 时,该方法会尝试返回当前活跃的记录器。create 参数决定当记录器尚未创建时,是否根据用户指定自动创建新的记录器。
如果 create 为 True:
如果存在活跃记录器:
未指定 ID 或名称,返回活跃记录器。
若指定了 ID 或名称,返回对应的记录器。若未找到对应记录器,则创建一个具有给定 ID 或名称的新记录器。如果 start 设置为 True,则将该记录器设为活跃状态。
如果不存在活跃记录器:
未指定 ID 或名称,创建一个新的记录器。
若指定了 ID 或名称,返回对应的记录器。若未找到对应记录器,则创建一个具有给定 ID 或名称的新记录器。如果 start 设置为 True,则将该记录器设为活跃状态。
否则,如果 create 为 False:
如果存在活跃记录器:
未指定 ID 或名称,返回活跃记录器。
若指定了 ID 或名称,返回对应的记录器。若未找到对应记录器,则抛出错误。
如果不存在活跃记录器:
未指定 ID 或名称,抛出错误。
若指定了 ID 或名称,返回对应的记录器。若未找到对应记录器,则抛出错误。
- 参数:
recorder_id (str) – 要删除的记录器的 ID。
recorder_name (str) – 要删除的记录器名称。
create (boolean) – 如果记录器尚未创建,则创建该记录器。
start (boolean) – 如果创建了新的记录器,则启动它。
- 返回类型:
一个记录器对象。
- list_recorders(rtype: Literal['dict', 'list'] = 'dict', **flt_kwargs) List[Recorder] | Dict[str, Recorder]
列出此实验中所有现有的记录器。调用此方法前,请先获取实验实例。如果用户想使用方法 R.list_recorders(),请参考 QlibRecorder 中的相关 API 文档。
- flt_kwargsdict
根据条件过滤记录器,例如 list_recorders(status=Recorder.STATUS_FI)
- 返回:
- 如果 rtype == “dict”:
一个字典(id -> 记录器),包含所存储的记录器信息。
- elif rtype == “list”:
一个 Recorder 列表。
- 返回类型:
返回类型取决于 rtype。
Recorder
- class qlib.workflow.recorder.Recorder(experiment_id, name)
这是用于记录实验的 Recorder 类。其 API 设计类似于 mlflow。(链接:https://mlflow.org/docs/latest/python_api/mlflow.html)
记录器(recorder)的状态可以是 SCHEDULED(已调度)、RUNNING(运行中)、FINISHED(已完成)或 FAILED(失败)。
- __init__(experiment_id, name)
- save_objects(local_path=None, artifact_path=None, **kwargs)
将预测文件或模型检查点等对象保存到产物 URI 中。用户可以通过关键字参数(name:value)的方式保存对象。
请参考 qlib.workflow:R.save_objects 的文档。
- 参数:
local_path (str) – 如果提供该参数,则将文件或目录保存到产物 URI 中。
artifact_path=None (str) – 产物在 URI 中存储的相对路径。
- load_object(名称)
加载预测文件或模型检查点等对象。
- 参数:
name (str) – 要加载的文件名称。
- 返回类型:
已保存的对象。
- start_run()
开始运行或恢复 Recorder。返回值可用作 with 语句块中的上下文管理器;否则,必须调用 end_run() 来终止当前运行。(参见 mlflow 中的 ActiveRun 类)
- 返回类型:
一个正在运行的活动对象(例如 mlflow.ActiveRun 对象)。
- end_run()
结束一个活跃的 Recorder。
- log_params(**kwargs)
为当前运行记录一批参数。
- 参数:
arguments (keyword) – 要记录为参数的键值对。
- log_metrics(step=None, **kwargs)
为当前运行记录多个指标。
- 参数:
arguments (keyword) – 要记录为指标的键值对。
- log_artifact(local_path: str, artifact_path: str | None = None)
将本地文件或目录记录为当前活跃运行的产物。
- 参数:
local_path (str) – 要写入的文件路径。
artifact_path (Optional[str]) – 如果提供,表示要写入的
artifact_uri
中的目录。
- set_tags(**kwargs)
为当前运行记录一批标签。
- 参数:
arguments (keyword) – 要作为标签记录的键值对。
- delete_tags(*keys)
从一次运行中删除若干标签。
- 参数:
keys (series of strs of the keys) – 所有要删除的标签名称。
- list_artifacts(artifact_path: str | None = None)
列出记录器的所有产物。
- 参数:
artifact_path (str) – 产物在 URI 中存储的相对路径。
- 返回类型:
被存储的产物信息列表(名称、路径等)。
- download_artifact(path: str, dst_path: str | None = None) str
从一次运行中下载一个产物文件或目录到本地目录(如适用),并返回其本地路径。
- 参数:
path (str) – 所需产物的相对源路径。
dst_path (可选[str]) – 要下载指定产物的本地文件系统目标目录的绝对路径。该目录必须已存在。如果未指定,产物将被下载到本地文件系统上一个新创建的、唯一命名的目录中。
- 返回:
所需产物的本地路径。
- 返回类型:
str
- list_metrics()
列出记录器的所有指标。
- 返回类型:
正在存储的指标字典。
- list_params()
列出记录器的所有参数。
- 返回类型:
正在存储的参数字典。
- list_tags()
列出记录器的所有标签。
- 返回类型:
正在存储的标签字典。
记录模板
- 类 qlib.workflow.record_temp.记录模板(recorder)
这是记录模板类,允许用户以特定格式生成实验结果,例如 IC 和回测结果。
- save(**kwargs)
其行为与 self.recorder.save_objects 相同。但它提供了更简便的接口,因为用户无需关心 get_path 和 artifact_path。
- __init__(recorder)
- generate(**kwargs)
生成某些记录(如 IC、回测等),并将其保存。
- 参数:
kwargs –
- load(name: str, parents: bool = True)
其行为与 self.recorder.load_object 相同。但它提供了更简便的接口,因为用户无需关心 get_path 和 artifact_path。
- 参数:
name (str) – 要加载的文件的名称。
parents (bool) – 每个记录器都有不同的 artifact_path。因此当设置为 True 时,会递归地在父类中查找路径,子类中的路径优先级更高。
- 返回类型:
已存储的记录。
- list()
列出支持的产物。用户无需考虑 self.get_path。
- 返回类型:
所有支持的产物的列表。
- check(include_self: bool = False, parents: bool = True)
检查记录是否正确生成并保存。它在以下示例中非常有用
在生成新内容之前,检查依赖文件是否完整。
检查最终文件是否已完成。
- 参数:
include_self (bool) – 是否包含由自身生成的文件
parents (bool) – 是否检查父级
- Raises:
FileNotFoundError – 记录是否已正确存储。
- class qlib.workflow.record_temp.SignalRecord(model=None, dataset=None, recorder=None)
这是生成信号预测的信号记录类。该类继承自
RecordTemp
类。- __init__(model=None, dataset=None, recorder=None)
- generate(**kwargs)
生成某些记录(如 IC、回测等),并将其保存。
- 参数:
kwargs –
- list()
列出支持的产物。用户无需考虑 self.get_path。
- 返回类型:
所有支持的产物的列表。
- 类 qlib.workflow.record_temp.ACRecordTemp(recorder, skip_existing=False)
自动检查记录模板
- __init__(recorder, skip_existing=False)
- generate(*args, **kwargs)
自动检查文件并运行具体的生成任务
- 类 qlib.workflow.record_temp.HFSignalRecord(recorder, **kwargs)
这是信号分析记录类,用于生成 IC、IR 等分析结果。该类继承自
RecordTemp
类。- depend_cls
的别名
SignalRecord
- __init__(recorder, **kwargs)
- generate()
生成某些记录(如 IC、回测等),并将其保存。
- 参数:
kwargs –
- list()
列出支持的产物。用户无需考虑 self.get_path。
- 返回类型:
所有支持的产物的列表。
- 类 qlib.workflow.record_temp.SigAnaRecord(recorder, ana_long_short=False, ann_scaler=252, label_col=0, skip_existing=False)
这是信号分析记录类,用于生成 IC、IR 等分析结果。该类继承自
RecordTemp
类。- depend_cls
的别名
SignalRecord
- __init__(recorder, ana_long_short=False, ann_scaler=252, label_col=0, skip_existing=False)
- list()
列出支持的产物。用户无需考虑 self.get_path。
- 返回类型:
所有支持的产物的列表。
- 类 qlib.workflow.record_temp.PortAnaRecord(recorder, config=None, risk_analysis_freq: List | str | None = None, indicator_analysis_freq: List | str | None = None, indicator_analysis_method=None, skip_existing=False, **kwargs)
这是一个投资组合分析记录类,用于生成回测等分析结果。该类继承自
RecordTemp
类。以下文件将被存储在 recorder 中
report_normal.pkl & positions_normal.pkl:
回测的收益报告和详细持仓,由 qlib/contrib/evaluate.py:backtest 返回
port_analysis.pkl :投资组合的风险分析结果,由 qlib/contrib/evaluate.py:risk_analysis 返回
- depend_cls
的别名
SignalRecord
- __init__(recorder, config=None, risk_analysis_freq: List | str | None = None, indicator_analysis_freq: List | str | None = None, indicator_analysis_method=None, skip_existing=False, **kwargs)
- config[“strategy”]字典
定义策略类及其参数(kwargs)。
- config[“executor”]字典
定义执行器类及其参数(kwargs)。
- config[“backtest”]字典
定义回测所需的参数(kwargs)。
- risk_analysis_freqstr|List[str]
报告中的风险分析频率
- indicator_analysis_freqstr|List[str]
报告中的指标分析频率
- indicator_analysis_methodstr,可选,默认为 None
可选值包括 'mean'、'amount_weighted'、'value_weighted'
- list()
列出支持的产物。用户无需考虑 self.get_path。
- 返回类型:
所有支持的产物的列表。
- 类 qlib.workflow.record_temp.MultiPassPortAnaRecord(recorder, pass_num=10, shuffle_init_score=True, **kwargs)
这是多次运行回测并生成分析结果(如回测结果)的多轮投资组合分析记录类。该类继承自
PortAnaRecord
类。如果启用了 shuffle_init_score,则第一次回测日期的预测得分将被随机打乱,从而使初始持仓为随机状态。shuffle_init_score 仅在信号用作 <PRED> 占位符时生效。该占位符将被 recorder 中保存的 pred.pkl 文件替换。
- 参数:
recorder (Recorder) – 用于保存回测结果的记录器。
pass_num (int) – 回测的轮数。
shuffle_init_score (bool) – 是否对第一次回测日期的预测得分进行打乱。
- depend_cls
的别名
SignalRecord
- __init__(recorder, pass_num=10, shuffle_init_score=True, **kwargs)
- 参数:
recorder (Recorder) – 用于保存回测结果的记录器。
pass_num (int) – 回测的轮数。
shuffle_init_score (bool) – 是否对第一次回测日期的预测得分进行打乱。
- list()
列出支持的产物。用户无需考虑 self.get_path。
- 返回类型:
所有支持的产物的列表。
任务管理
任务生成器
TaskGenerator 模块可以根据 TaskGen 和一些任务模板生成大量任务。
- qlib.workflow.task.gen.task_generator(tasks, generators) list
使用 TaskGen 列表和任务模板列表生成不同的任务。
例如:
有 3 个任务模板 a、b、c 和 2 个 TaskGen A、B。A 会从一个模板生成 2 个任务,B 会从一个模板生成 3 个任务。task_generator([a, b, c], [A, B]) 最终将生成 3*2*3 = 18 个任务。
- 类 qlib.workflow.task.gen.任务生成器
用于生成不同任务的基类
示例 1:
输入:特定任务模板和滚动步数
输出:任务的滚动版本
示例 2:
输入:特定任务模板和损失函数列表
输出:一组具有不同损失函数的任务
- 抽象 generate(task: dict) 列表[dict]
基于任务模板生成不同的任务
- 参数:
task (dict) – 一个任务模板
- 返回:
任务列表
- 返回类型:
List[dict]
- qlib.workflow.task.gen.handler_mod(task: dict, rolling_gen)
在使用 RollingGen 时帮助修改 handler 的结束时间,用于处理以下情况
Handler 的数据结束时间早于数据集 test_data 的时间段。
为了解决这个问题,会扩展 handler 数据的结束时间。
如果 handler 的 end_time 为 None,则无需更改其结束时间。
- 参数:
task (dict) – 一个任务模板
rg (RollingGen) – RollingGen 的一个实例
- qlib.workflow.task.gen.trunc_segments(ta: TimeAdjuster, segments: Dict[str, Timestamp], days, test_key='test')
为了避免未来信息泄露,应根据测试开始时间对时间段进行截断
注意
该函数将原地修改segments
- class qlib.workflow.task.gen.RollingGen(step: int = 40, rtype: str = 'expanding', ds_extra_mod_func: None | ~typing.Callable = <function handler_mod>, test_key='test', train_key='train', trunc_days: int | None = None, task_copy_func: ~typing.Callable = <function deepcopy>)
- __init__(step: int = 40, rtype: str = 'expanding', ds_extra_mod_func: None | ~typing.Callable = <函数 handler_mod>, test_key='test', train_key='train', trunc_days: int | None = None, task_copy_func: ~typing.Callable = <函数 deepcopy>)
生成用于滚动的数据任务
- 参数:
step (int) – 滚动的步长
rtype (str) – 滚动类型(expanding:扩展型,sliding:滑动窗口)
ds_extra_mod_func (Callable) – 一个类似 handler_mod(task: dict, rg: RollingGen) 的方法,在生成任务后执行额外操作。例如,使用
handler_mod
修改数据集处理器的结束时间。trunc_days (int) – 截去部分数据以避免未来信息泄露
task_copy_func (Callable) – 用于复制整个任务的函数。当用户希望在多个任务之间共享某些内容时,这非常有用
- gen_following_tasks(task: dict, test_end: Timestamp) List[dict]
为task生成从当前到 test_end 的后续滚动任务
- 参数:
task (dict) – Qlib 任务格式
test_end (pd.Timestamp) – 最新的滚动任务包含 test_end
- 返回:
task`(`task 本身不包括在内) 的后续任务
- 返回类型:
List[dict]
- generate(task: dict) 列表[dict]
将任务转换为滚动任务。
- 参数:
task (dict) –
描述一个任务的字典。例如。
DEFAULT_TASK = { "model": { "class": "LGBModel", "module_path": "qlib.contrib.model.gbdt", }, "dataset": { "class": "DatasetH", "module_path": "qlib.data.dataset", "kwargs": { "handler": { "class": "Alpha158", "module_path": "qlib.contrib.data.handler", "kwargs": { "start_time": "2008-01-01", "end_time": "2020-08-01", "fit_start_time": "2008-01-01", "fit_end_time": "2014-12-31", "instruments": "csi100", }, }, "segments": { "train": ("2008-01-01", "2014-12-31"), "valid": ("2015-01-01", "2016-12-20"), # Please avoid leaking the future test data into validation "test": ("2017-01-01", "2020-08-01"), }, }, }, "record": [ { "class": "SignalRecord", "module_path": "qlib.workflow.record_temp", }, ] }
- 返回:
List[dict]
- 返回类型:
任务列表
- class qlib.workflow.task.gen.MultiHorizonGenBase(horizon: List[int] = [5], label_leak_n=2)
- __init__(horizon: List[int] = [5], label_leak_n=2)
该任务生成器尝试基于已有任务为不同预测周期生成新任务
- 参数:
horizon (List[int]) – 任务可能的预测周期(horizon)
label_leak_n (int) – 在做出预测后,需要多少天才能获得完整的标签信息。例如:- 用户在第 T 天收盘后做出预测 - 标签是第 T + 1 天买入股票并在第 T + 2 天卖出的收益 - 此时 label_leak_n 为 2(即利用该样本时会泄露未来两天的信息)
- 抽象 set_horizon(task: dict, hr: int)
该方法旨在就地修改任务 in place
- 参数:
task (dict) – Qlib 的任务
hr (int) – 任务的 horizon(预测周期)
- generate(task: dict)
基于任务模板生成不同的任务
- 参数:
task (dict) – 一个任务模板
- 返回:
任务列表
- 返回类型:
List[dict]
TaskManager
TaskManager 能够自动获取未使用的任务,并管理一组任务的生命周期,包括错误处理。这些功能支持任务并发运行,并确保每个任务仅被使用一次。Task Manager 将所有任务存储在 MongoDB 中。用户在使用此模块时MUST先完成 MongoDB 的配置。
TaskManager 中的一个任务包含三个部分:任务描述(desc,用于定义任务)、任务状态(status)和任务结果(result)。用户可以通过任务描述和任务结果来获取任务。
- 类 qlib.workflow.task.manage.任务管理器(task_pool: str)
以下是 TaskManager 创建任务时的任务格式示例
{ 'def': pickle serialized task definition. using pickle will make it easier 'filter': json-like data. This is for filtering the tasks. 'status': 'waiting' | 'running' | 'done' 'res': pickle serialized task result, }
任务管理器假设你只会更新已获取的任务。通过 MongoDB 的获取与更新操作,可保证数据更新的安全性。
该类可用作命令行工具。以下是一些示例。你可以通过以下命令查看 manage 模块的帮助信息:python -m qlib.workflow.task.manage -h # 显示 manage 模块 CLI 的使用手册;python -m qlib.workflow.task.manage wait -h # 显示 manage 模块中 wait 命令的使用手册
python -m qlib.workflow.task.manage -t <pool_name> wait python -m qlib.workflow.task.manage -t <pool_name> task_stat
注意
假设:MongoDB 中的数据是经过编码的,而从 MongoDB 输出的数据是已解码的
共有四种状态,分别是:
STATUS_WAITING:等待训练
STATUS_RUNNING:正在训练
STATUS_PART_DONE:已完成部分步骤,等待下一步
STATUS_DONE:所有工作已完成
- __init__(task_pool: str)
初始化 Task Manager 时,请务必首先配置 MongoDB 的 URL 和数据库名称。一个 TaskManager 实例服务于特定的任务池。该模块的静态方法则作用于整个 MongoDB。
- 参数:
task_pool (str) – MongoDB 中的集合(Collection)名称
- 静态 list() list
列出数据库中所有的集合(task_pool)。
- 返回:
list
- replace_task(task, new_task)
使用一个新任务替换旧任务
- 参数:
task – 旧任务
new_task – 新任务
- insert_task(task)
插入一个任务。
- 参数:
task – 待插入的任务
- 返回:
pymongo.results.InsertOneResult
- insert_task_def(task_def)
向 task_pool 中插入一个任务
- 参数:
task_def (dict) – 任务定义
- 返回类型:
pymongo.results.InsertOneResult
- create_task(task_def_l, dry_run=False, print_nt=False) List[str]
如果 task_def_l 中的任务是新的,则将新任务插入 task_pool,并记录 inserted_id;如果任务已存在,则仅查询其 _id。
- 参数:
task_def_l (list) – 任务列表
dry_run (bool) – 是否将这些新任务插入到任务池中
print_nt (bool) – 是否打印新任务
- 返回:
task_def_l 中各个任务的 _id 列表
- 返回类型:
List[str]
- fetch_task(query={}, status='waiting') dict
使用 query 来获取任务。
- 参数:
query (dict,可选) – 查询字典,默认为 {}。
status (str,可选) – 状态描述,默认为 STATUS_WAITING。
- 返回:
解码后的任务(集合中的文档)
- 返回类型:
dict
- safe_fetch_task(query={}, status='waiting')
使用带有 contextmanager 的 query 从 task_pool 中获取任务
- 参数:
query (dict) – 查询条件字典
- 返回:
dict
- 返回类型:
解码后的任务(集合中的文档)
- query(query={}, decode=True)
查询集合中的任务。如果遍历生成器耗时过长,此函数可能会抛出异常 pymongo.errors.CursorNotFound: cursor id not found
python -m qlib.workflow.task.manage -t <你的任务池> query ‘{“_id”: “615498be837d0053acbc5d58”}’
- 参数:
query (dict) – 查询条件字典
decode (bool) –
- 返回:
dict
- 返回类型:
解码后的任务(集合中的文档)
- re_query(_id) dict
使用 _id 查询任务。
- 参数:
_id (str) – 文档的 _id
- 返回:
解码后的任务(集合中的文档)
- 返回类型:
dict
- commit_task_res(task, res, status='done')
将结果提交到 task['res']。
- 参数:
task ([类型]) – [描述]
res (object) – 你想要保存的结果
status (str, 可选) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_DONE。
- return_task(task, status='waiting')
将任务重置为指定状态。通常用于错误处理。
- 参数:
task ([类型]) – [描述]
status (str, 可选) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_WAITING。
- remove(query={})
使用查询条件删除任务
- 参数:
query (dict) – 查询条件字典
- task_stat(query={}) dict
统计每种状态下的任务数量。
- 参数:
query (dict, 可选) – 查询字典。默认为 {}。
- 返回:
dict
- reset_waiting(query={})
将所有正在运行的任务重置为等待状态。可用于某些运行中的任务意外退出后的恢复。
- 参数:
query (dict, 可选) – 查询字典。默认为 {}。
- 设置优先级(任务, 优先级: 整数)
为任务设置优先级
- 参数:
任务 (字典) – 来自数据库的任务查询
优先级 (整数) – 目标优先级
- 等待(query={})
在多进程环境下,主进程可能无法从 TaskManager 获取任何任务,因为仍有一些任务正在运行。因此,主进程应等待,直到其他进程或机器完成所有任务的训练。
- 参数:
query (dict, 可选) – 查询字典。默认为 {}。
- qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)
当任务池非空(存在 WAITING 状态的任务)时,使用 task_func 来获取并运行 task_pool 中的任务
调用此方法后,会出现以下四种情况(before_status -> after_status):
STATUS_WAITING -> STATUS_DONE:将 task[“def”] 作为 task_func 的参数,表示该任务尚未开始
STATUS_WAITING -> STATUS_PART_DONE:将 task[“def”] 作为 task_func 的参数
STATUS_PART_DONE -> STATUS_PART_DONE:将 task[“res”] 作为 task_func 的参数,表示该任务已开始但未完成
STATUS_PART_DONE -> STATUS_DONE:将 task[“res”] 作为 task_func 的参数
- 参数:
task_func (Callable) –
def (task_def, **kwargs) -> <将被提交的结果>
用于执行任务的函数
task_pool (str) – 任务池的名称(MongoDB 中的 Collection)
query (dict) – 获取任务时将使用该字典查询 task_pool
force_release (bool) – 程序是否会强制释放资源
before_status (str:) – 处于 before_status 的任务将被获取并进行训练。可选值为 STATUS_WAITING、STATUS_PART_DONE。
after_status (str:) – 训练完成后的任务将变为 after_status。可选值为 STATUS_WAITING、STATUS_PART_DONE。
kwargs – task_func 的参数
Trainer
Trainer 将训练一系列任务并返回一系列模型记录器。每个 Trainer 包含两个步骤:train
(创建模型记录器)和 end_train
(修改模型记录器)。
这是一种称为 DelayTrainer
的概念,可用于在线模拟中的并行训练。在 DelayTrainer
中,第一步仅将必要的信息保存到模型记录器中,而第二步将在最后执行,用于完成一些并发且耗时的操作,例如模型拟合。
Qlib
提供了两种 Trainer:TrainerR
是最简单的方式,TrainerRM
则基于 TaskManager,可自动管理任务的生命周期。
- qlib.model.trainer.begin_task_train(task_config: dict, experiment_name: str, recorder_name: str | None = None) Recorder
开始任务训练,启动一个记录器并保存任务配置。
- 参数:
task_config (dict) – 任务的配置
experiment_name (str) – 实验的名称
recorder_name (str) – 指定的名称将作为记录器名称。若为 None,则使用 rid。
- 返回:
模型记录器
- 返回类型:
- qlib.model.trainer.end_task_train(rec: Recorder, experiment_name: str) Recorder
完成任务训练,包括实际的模型拟合与保存。
- qlib.model.trainer.task_train(task_config: dict, experiment_name: str, recorder_name: str | None = None) Recorder
基于任务的训练,将分为两个步骤进行。
- 参数:
task_config (dict) – 任务的配置。
experiment_name (str) – 实验的名称
recorder_name (str) – 记录器的名称
- 返回:
记录器
- 返回类型:
记录器实例
- 类 qlib.model.trainer.训练器
训练器可以训练一组模型。Trainer 和 DelayTrainer 的区别在于实际训练何时完成。
- __init__()
- train(tasks: list, *args, **kwargs) list
给定任务定义的列表,开始训练并返回模型。
对于 Trainer,实际的训练在此方法中完成;对于 DelayTrainer,此方法仅进行一些准备工作。
- 参数:
tasks – 任务列表
- 返回:
模型列表
- 返回类型:
list
- end_train(models: list, *args, **kwargs) list
给定一个模型列表,在训练结束时完成一些必要的收尾工作。这些模型可能是 Recorder、文本文件、数据库等。
对于 Trainer,此方法中执行一些收尾操作;对于 DelayTrainer,实际的训练在此方法中完成。
- 参数:
models – 模型列表
- 返回:
模型列表
- 返回类型:
list
- is_delay() 布尔值
指示 Trainer 是否会延迟执行 end_train。
- 返回:
如果是 DelayTrainer
- 返回类型:
布尔值
- has_worker() 布尔值
某些训练器具有后端工作进程以支持并行训练。此方法可用于判断工作进程是否已启用。
- 返回:
如果工作进程已启用
- 返回类型:
布尔值
- worker()
启动工作进程
- Raises:
NotImplementedError: – 如果不支持工作进程
- 类 qlib.model.trainer.TrainerR(experiment_name: str | None = None, train_func: ~typing.Callable = <函数 task_train>, call_in_subproc: bool = False, default_rec_name: str | None = None)
基于 (R)ecorder 的训练器。它将按线性顺序训练一系列任务,并返回一组模型记录器。
假设:模型由 task 定义,结果将保存到 Recorder 中。
- __init__(experiment_name: str | None = None, train_func: ~typing.Callable = <函数 task_train>, call_in_subproc: bool = False, default_rec_name: str | None = None)
初始化 TrainerR。
- 参数:
experiment_name (str,可选) – 实验的默认名称。
train_func (Callable,可选) – 默认训练方法。默认为 task_train。
call_in_subproc (bool) – 在子进程中调用该过程,以强制释放内存
- train(tasks: list, train_func: Callable | None = None, experiment_name: str | None = None, **kwargs) List[Recorder]
给定一个tasks列表,返回一个训练好的 Recorder 列表,顺序可以保证。
- 参数:
tasks (list) – 基于task字典定义的列表
train_func (Callable) – 训练方法,至少需要tasks和experiment_name参数。若为 None,则使用默认训练方法。
experiment_name (str) – 实验名称,若为 None 则使用默认名称。
kwargs – 传递给 train_func 的参数。
- 返回:
Recorder 的列表
- 返回类型:
List[Recorder]
- class qlib.model.trainer.DelayTrainerR(experiment_name: str | None = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, **kwargs)
基于 TrainerR 的延迟实现,意味着 train 方法可能仅执行一些准备工作,而真正的模型训练将在 end_train 方法中进行。
- __init__(experiment_name: str | None = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, **kwargs)
初始化 TrainerRM。
- 参数:
experiment_name (str) – 实验的默认名称。
train_func (Callable, 可选) – 默认的训练方法。默认为 begin_task_train。
end_train_func (Callable, 可选) – 默认的 end_train 方法。默认为 end_task_train。
- end_train(models, end_train_func=None, experiment_name: str | None = None, **kwargs) List[Recorder]
接收一个 Recorder 列表并返回一个训练完成的 Recorder 列表。该类将完成真实数据加载和模型拟合。
- 参数:
models (list) – Recorder 的列表,任务已保存在其中
end_train_func (Callable,可选) – 需要至少包含 recorders 和 experiment_name 参数的 end_train 方法。默认为 None,表示使用 self.end_train_func。
experiment_name (str) – 实验名称,若为 None 则使用默认名称。
kwargs – 传递给 end_train_func 的参数。
- 返回:
Recorder 的列表
- 返回类型:
List[Recorder]
- 类 qlib.model.trainer.TrainerRM(experiment_name: str | None = None, task_pool: str | None = None, train_func=<函数 task_train>, skip_run_task: bool = False, default_rec_name: str | None = None)
基于 (R)ecorder 和 Task(M)anager 的训练器。它可以通过多进程方式训练一系列任务,并返回一系列模型记录器。
假设:task 将被保存到 TaskManager 中,并且 task 会从 TaskManager 中被取出并进行训练。
- __init__(experiment_name: str | None = None, task_pool: str | None = None, train_func=<函数 task_train>, skip_run_task: bool = False, default_rec_name: str | None = None)
初始化 TrainerR。
- 参数:
experiment_name (str) – 实验的默认名称。
task_pool (str) – TaskManager 中的任务池名称。若为 None,则使用与 experiment_name 相同的名称。
train_func (Callable,可选) – 默认训练方法。默认为 task_train。
skip_run_task (bool) – 如果 skip_run_task == True:仅在工作进程中运行 run_task;否则跳过 run_task。
- train(tasks: list, train_func: Callable | None = None, experiment_name: str | None = None, before_status: str = 'waiting', after_status: str = 'done', default_rec_name: str | None = None, **kwargs) List[Recorder]
给定一个tasks列表,返回一个训练好的 Recorder 列表,顺序可以保证。
该方法默认使用单个进程,但 TaskManager 提供了一种并行训练的优秀方式。用户可以自定义他们的 train_func 以实现多进程甚至多机训练。
- 参数:
tasks (list) – 基于task字典定义的列表
train_func (Callable) – 训练方法,至少需要tasks和experiment_name参数。若为 None,则使用默认训练方法。
experiment_name (str) – 实验名称,若为 None 则使用默认名称。
before_status (str) – 将获取并训练处于 before_status 状态的任务。可选值为 STATUS_WAITING、STATUS_PART_DONE。
after_status (str) – 训练完成后任务将变为 after_status 状态。可选值为 STATUS_WAITING、STATUS_PART_DONE。
kwargs – 传递给 train_func 的参数。
- 返回:
Recorder 的列表
- 返回类型:
List[Recorder]
- end_train(recs: list, **kwargs) List[Recorder]
为记录器设置 STATUS_END 标签。
- 参数:
recs (list) – 已训练的 recorder 列表。
- 返回:
与参数相同的列表。
- 返回类型:
List[Recorder]
- worker(train_func: Callable | None = None, experiment_name: str | None = None)
train 的多进程方法。它可以与 train 共享同一个任务池,并可在其他进程或其他机器上运行。
- 参数:
train_func (Callable) – 训练方法,至少需要tasks和experiment_name参数。若为 None,则使用默认训练方法。
experiment_name (str) – 实验名称,若为 None 则使用默认名称。
- has_worker() 布尔值
某些训练器具有后端工作进程以支持并行训练。此方法可用于判断工作进程是否已启用。
- 返回:
如果工作进程已启用
- 返回类型:
布尔值
- 类 qlib.model.trainer.DelayTrainerRM(experiment_name: str | None = None, task_pool: str | None = None, train_func=<函数 begin_task_train>, end_train_func=<函数 end_task_train>, skip_run_task: bool = False, **kwargs)
基于 TrainerRM 的延迟实现,意味着 train 方法可能仅执行一些准备工作,而真正的模型训练将在 end_train 方法中进行。
- __init__(experiment_name: str | None = None, task_pool: str | None = None, train_func=<函数 begin_task_train>, end_train_func=<函数 end_task_train>, skip_run_task: bool = False, **kwargs)
初始化 DelayTrainerRM。
- 参数:
experiment_name (str) – 实验的默认名称。
task_pool (str) – TaskManager 中的任务池名称。若为 None,则使用与 experiment_name 相同的名称。
train_func (Callable, 可选) – 默认的训练方法。默认为 begin_task_train。
end_train_func (Callable, 可选) – 默认的 end_train 方法。默认为 end_task_train。
skip_run_task (bool) – 如果 skip_run_task == True:仅在工作进程中运行 run_task;否则跳过 run_task。例如,在 CPU 虚拟机上启动训练器,然后等待任务在 GPU 虚拟机上完成。
- train(tasks: list, train_func=None, experiment_name: str | None = None, **kwargs) List[Recorder]
与 TrainerRM 的 train 方法相同,after_status 将为 STATUS_PART_DONE。
- 参数:
tasks (list) – 基于 task 字典定义的列表
train_func (Callable) – 训练方法,至少需要 tasks 和 experiment_name 参数。默认为 None,表示使用 self.train_func。
experiment_name (str) – 实验名称,若为 None 则使用默认名称。
- 返回:
Recorder 的列表
- 返回类型:
List[Recorder]
- end_train(recs, end_train_func=None, experiment_name: str | None = None, **kwargs) List[Recorder]
接收一个 Recorder 列表并返回一个训练完成的 Recorder 列表。该类将完成真实数据加载和模型拟合。
- 参数:
recs (列表) – Recorder 的列表,任务已保存到其中。
end_train_func (可调用对象,可选) – 需要至少包含 recorders 和 experiment_name 参数的 end_train 方法。默认为 None,表示使用 self.end_train_func。
experiment_name (str) – 实验名称,若为 None 则使用默认名称。
kwargs – 传递给 end_train_func 的参数。
- 返回:
Recorder 的列表
- 返回类型:
List[Recorder]
- worker(end_train_func=None, experiment_name: str | None = None)
end_train 的多进程方法。它可以与 end_train 共享同一个任务池,并可在其他进程或其他机器上运行。
- 参数:
end_train_func (可调用对象,可选) – 需要至少包含 recorders 和 experiment_name 参数的 end_train 方法。默认为 None,表示使用 self.end_train_func。
experiment_name (str) – 实验名称,若为 None 则使用默认名称。
- has_worker() 布尔值
某些训练器具有后端工作进程以支持并行训练。此方法可用于判断工作进程是否已启用。
- 返回:
如果工作进程已启用
- 返回类型:
布尔值
Collector
Collector 模块可以从各处收集对象并进行处理,例如合并、分组、取平均值等。
- 类 qlib.workflow.task.collect.收集器(process_list=[])
用于收集不同结果的收集器。
- __init__(process_list=[])
初始化 Collector。
- 参数:
process_list (列表 或 可调用对象) – 处理器的列表或一个处理器实例,用于处理字典。
- collect() dict
收集结果并返回一个形如 {key: things} 的字典
- 返回:
收集完成后的字典。
例如:
{“prediction”: pd.Series}
{“IC”: {“Xgboost”: pd.Series, “LSTM”: pd.Series}}
…
- 返回类型:
dict
- 静态 process_collect(collected_dict, process_list=[], *args, **kwargs) dict
对 collect 返回的字典进行一系列处理,并返回形如 {key: things} 的字典。例如,你可以进行分组和集成。
- 参数:
collected_dict (dict) – 由 collect 返回的字典
process_list (list 或 Callable) – 处理器列表或单个处理器实例,用于处理字典。处理器的执行顺序与列表顺序一致。例如:[Group1(…, Ensemble1()), Group2(…, Ensemble2())]
- 返回:
处理完成后的字典。
- 返回类型:
dict
- 类 qlib.workflow.task.collect.MergeCollector(collector_dict: Dict[str, Collector], process_list: List[Callable] = [], merge_func=None)
一个用于收集其他收集器结果的收集器
例如:
我们有两个收集器,分别命名为 A 和 B。A 可以收集 {“prediction”: pd.Series},B 可以收集 {“IC”: {“Xgboost”: pd.Series, “LSTM”: pd.Series}}。经过该类的 collect 操作后,我们可以收集到 {“A_prediction”: pd.Series, “B_IC”: {“Xgboost”: pd.Series, “LSTM”: pd.Series}}
…
- __init__(collector_dict: Dict[str, Collector], process_list: List[Callable] = [], merge_func=None)
初始化 MergeCollector。
- 参数:
collector_dict (Dict[str,Collector]) – 形如 {collector_key, Collector} 的字典
process_list (List[Callable]) – 处理器的列表或处理器实例,用于处理字典。
merge_func (Callable) – 用于生成最外层键的方法。传入的参数为来自 collector_dict 的
collector_key
和每个收集器收集后的key
。若为 None,则使用元组连接它们,例如 “ABC”+(“a”,”b”) -> (“ABC”, (“a”,”b”))。
- collect() dict
收集 collector_dict 中所有收集器的结果,并将最外层的键更改为重新组合后的键。
- 返回:
收集完成后的字典。
- 返回类型:
dict
- 类 qlib.workflow.task.collect.RecorderCollector(experiment, process_list=[], rec_key_func=None, rec_filter_func=None, artifacts_path={'pred': 'pred.pkl'}, artifacts_key=None, list_kwargs={}, status: Iterable = {'FINISHED'})
- __init__(experiment, process_list=[], rec_key_func=None, rec_filter_func=None, artifacts_path={'pred': 'pred.pkl'}, artifacts_key=None, list_kwargs={}, status: Iterable = {'FINISHED'})
初始化 RecorderCollector。
- 参数:
experiment – (Experiment 或 str):一个 Experiment 实例,或 Experiment 的名称(可调用对象):一个可调用的函数,返回一个实验列表
process_list (列表 或 可调用对象) – 处理器的列表或一个处理器实例,用于处理字典。
rec_key_func (Callable) – 用于获取记录器(recorder)键的函数。如果为 None,则使用 recorder 的 ID。
rec_filter_func (Callable, 可选) – 通过返回 True 或 False 来过滤 recorder。默认为 None。
artifacts_path (dict, 可选) – Recorder 中产物的名称及其路径。默认为 {“pred”: “pred.pkl”, “IC”: “sig_analysis/ic.pkl”}。
artifacts_key (str 或 List, 可选) – 指定要获取的产物键。如果为 None,则获取所有产物。
list_kwargs (str) – list_recorders 函数的参数。
status (Iterable) – 仅收集具有特定状态的 recorder。None 表示收集所有 recorder。
- collect(artifacts_key=None, rec_filter_func=None, only_exist=True) dict
根据记录器在过滤后收集不同的产物。
- 参数:
artifacts_key (str 或 List,可选) – 要获取的产物键。如果为 None,则使用默认值。
rec_filter_func (Callable,可选) – 通过返回 True 或 False 来过滤记录器。如果为 None,则使用默认值。
only_exist (bool,可选) – 是否仅在记录器实际存在时才收集产物。如果为 True,则加载时出现异常的记录器将不会被收集;但如果为 False,则会抛出异常。
- 返回:
收集后的字典,格式如 {artifact: {rec_key: object}}
- 返回类型:
dict
- get_exp_name() str
获取实验名称
- 返回:
实验名称
- 返回类型:
str
分组
分组(Group)可以根据 group_func 将一组对象进行分组并转换为字典。分组后,我们提供了一种方法来对它们进行归约。
例如:
分组示例:{(A,B,C1): object, (A,B,C2): object} → {(A,B): {C1: object, C2: object}};归约示例:{(A,B): {C1: object, C2: object}} → {(A,B): object}
- class qlib.model.ens.group.Group(group_func=None, ens: Ensemble | None = None)
基于字典对对象进行分组
- __init__(group_func=None, ens: Ensemble | None = None)
初始化分组。
- 参数:
group_func (Callable,可选) –
接收一个字典,并返回分组键以及该组中的一个元素。
例如:{(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}}
None. (默认值为)–
ens (Ensemble,可选) – 如果不为 None,则在分组后对分组的值进行集成处理。
- group(*args, **kwargs) dict
将一组对象进行分组并转换为字典。
例如:{(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}}
- 返回:
分组后的字典
- 返回类型:
dict
- reduce(*args, **kwargs) dict
对已分组的字典进行归约。
例如:{(A,B): {C1: object, C2: object}} -> {(A,B): object}
- 返回:
归约后的字典
- 返回类型:
dict
集成
集成模块可以将集成中的对象进行合并。例如,如果有多个子模型的预测结果,我们可能需要将它们合并为一个集成预测。
- 类 qlib.model.ens.ensemble.集成器
将 ensemble_dict 合并为一个集成对象。
例如:{Rollinga_b: object, Rollingb_c: object} -> object
调用此类时:
- 参数:
ensemble_dict (dict):等待合并的集成字典,格式如 {name: things}
- 返回值:
object:集成后的对象
- 类 qlib.model.ens.ensemble.单键集成器
如果字典中只有一个键值对,则提取该对象,使结果更易读。{唯一键: 唯一值} -> 唯一值
如果键的数量多于 1 个或少于 1 个,则不进行任何操作。即使可以递归运行此操作以使字典更易读。
注意:默认情况下会递归执行。
调用此类时:
- 参数:
ensemble_dict (dict):字典。字典的键将被忽略。
- 返回值:
dict:更易读的字典。
- 类 qlib.model.ens.ensemble.滚动集成器
将类似 prediction 或 IC 的滚动数据框字典合并为一个集成。
注意:字典的值必须是 pd.DataFrame,并且索引中包含“datetime”。
调用此类时:
- 参数:
ensemble_dict (dict):形如 {“A”: pd.DataFrame, “B”: pd.DataFrame} 的字典。字典的键将被忽略。
- 返回值:
pd.DataFrame:滚动操作的完整结果。
- 类 qlib.model.ens.ensemble.平均集成器
对形状相同的 DataFrame 字典(如 prediction 或 IC)进行平均和标准化,形成集成。
注意:字典的值必须是 pd.DataFrame,并且索引中包含“datetime”。如果是嵌套字典,则先将其展平。
调用此类时:
- 参数:
ensemble_dict (dict):形如 {“A”: pd.DataFrame, “B”: pd.DataFrame} 的字典。字典的键将被忽略。
- 返回值:
pd.DataFrame:平均和标准化后的完整结果。
工具
一些用于任务管理的工具。
- qlib.workflow.task.utils.获取 MongoDB() 数据库
获取 MongoDB 中的数据库,这意味着你需要首先声明数据库的地址和名称。
例如:
使用 qlib.init():
mongo_conf = { "task_url": task_url, # your MongoDB url "task_db_name": task_db_name, # database name } qlib.init(..., mongo=mongo_conf)
在调用 qlib.init() 之后:
C["mongo"] = { "task_url" : "mongodb://localhost:27017/", "task_db_name" : "rolling_db" }
- 返回:
数据库实例
- 返回类型:
数据库
- qlib.workflow.task.utils.list_recorders(experiment, rec_filter_func=None)
列出实验中所有能通过过滤器的记录器。
- 参数:
experiment (str 或 Experiment) – 实验的名称或实例
rec_filter_func (Callable,可选) – 若返回 True,则保留该记录器。默认为 None。
- 返回:
过滤后的字典 {rid: recorder}。
- 返回类型:
dict
- 类 qlib.workflow.task.utils.TimeAdjuster(future=True, end_time=None)
查找合适的日期并调整日期。
- __init__(future=True, end_time=None)
- set_end_time(end_time=None)
设置结束时间。若为 None,则使用日历的结束时间。
- 参数:
end_time —
- get(idx: int)
根据索引获取日期时间。
- 参数:
idx (int) – 日历的索引
- max() Timestamp
返回日历中的最大日期时间
- align_idx(time_point, tp_type='start') int
将 time_point 在日历中的索引对齐。
- 参数:
time_point –
tp_type (str) –
- 返回:
索引
- 返回类型:
int
- cal_interval(time_point_A, time_point_B) int
计算交易日间隔(time_point_A - time_point_B)
- 参数:
time_point_A – time_point_A
time_point_B – time_point_B(是 time_point_A 的过去时间点)
- 返回:
A 和 B 之间的时间间隔
- 返回类型:
int
- align_time(time_point, tp_type='start') Timestamp
将 time_point 对齐到日历中的交易日期
- 参数:
time_point – 时间点
tp_type – 字符串类型的时间点(“start”,“end”)
- 返回:
pd.Timestamp
- align_seg(segment: dict | tuple) dict | tuple
将给定日期对齐到交易日期
例如:
input: {'train': ('2008-01-01', '2014-12-31'), 'valid': ('2015-01-01', '2016-12-31'), 'test': ('2017-01-01', '2020-08-01')} output: {'train': (Timestamp('2008-01-02 00:00:00'), Timestamp('2014-12-31 00:00:00')), 'valid': (Timestamp('2015-01-05 00:00:00'), Timestamp('2016-12-30 00:00:00')), 'test': (Timestamp('2017-01-03 00:00:00'), Timestamp('2020-07-31 00:00:00'))}
- 参数:
segment –
- 返回:
Union[dict, tuple]
- 返回类型:
在给定的起止日期之间,对应的起始和结束交易日期(pd.Timestamp)。
- truncate(segment: tuple, test_start, days: int) tuple
根据 test_start 日期截断时间段
- 参数:
segment (tuple) – 时间段
test_start –
days (int) – 需要截断的交易日数,该时间段内的数据可能需要基于 test_start 的 days 天数据。例如,如果标签包含未来 2 天的信息,预测周期为 1 天(例如预测目标是 Ref($close, -2)/Ref($close, -1) - 1),则 days 应为 2 + 1 == 3 天。
- 返回:
tuple
- 返回类型:
新的时间段
- shift(seg: 元组, step: 整数, rtype='sliding') 元组
移动时间段的日期时间
如果段中存在 None(表示无界索引),此方法将返回 None。
- 参数:
seg – 日期时间段
step (int) – 滚动步长
rtype (str) – 滚动类型(“sliding” 或 “expanding”)
- 返回:
tuple
- 返回类型:
新的时间段
- Raises:
KeyError: – 如果索引(包括起始和结束)超出 self.cal 范围,shift 将引发错误
- qlib.workflow.task.utils.replace_task_handler_with_cache(task: 字典, cache_dir: 字符串 | 路径 = '.') 字典
将任务中的处理器替换为缓存处理器。它会自动缓存文件并将其保存在 cache_dir 中。
>>> import qlib >>> qlib.auto_init() >>> import datetime >>> # it is simplified task >>> task = {"dataset": {"kwargs":{'handler': {'class': 'Alpha158', 'module_path': 'qlib.contrib.data.handler', 'kwargs': {'start_time': datetime.date(2008, 1, 1), 'end_time': datetime.date(2020, 8, 1), 'fit_start_time': datetime.date(2008, 1, 1), 'fit_end_time': datetime.date(2014, 12, 31), 'instruments': 'CSI300'}}}}} >>> new_task = replace_task_handler_with_cache(task) >>> print(new_task) {'dataset': {'kwargs': {'handler': 'file...Alpha158.3584f5f8b4.pkl'}}}
在线服务
在线管理器
OnlineManager 可以管理一组 在线策略 并动态运行它们。
随着时间的推移,决策模型也可能发生变化。在本模块中,我们将这些持续贡献的模型称为 在线 模型。在每个周期(例如每天或每分钟)中,在线 模型可能会更新,其预测结果也需要相应刷新。因此,本模块提供了一系列方法来控制这一过程。
该模块还提供了一种在历史数据中模拟在线策略的方法,这意味着你可以验证自己的策略或寻找更优的策略。
在不同场景下使用不同训练器共有四种情况:
场景 |
描述 |
---|---|
在线 + 训练器(Trainer) |
当你希望执行一个真实的运行流程时,训练器将帮助你训练模型。它会按任务和策略逐一进行模型训练。 |
在线 + 延迟训练器(DelayTrainer) |
延迟训练器会跳过具体的训练步骤,直到所有策略都准备好了各自的任务。这使得用户可以在 routine 或 first_train 的末尾集中并行训练所有任务。否则,每当各个策略准备任务时,这些函数可能会被阻塞。 |
模拟 + 训练器(Trainer) |
其行为方式与 在线 + 训练器 相同,唯一的区别是它用于模拟/回测,而非在线交易。 |
模拟 + 延迟训练器(DelayTrainer) |
当你的模型没有时间依赖性时,可以使用延迟训练器以实现多任务并行处理。这意味着在模拟结束时,所有流程中的任务都可以真正地完成训练。信号将在不同的时间段内被妥善准备(取决于是否有新模型上线)。 |
以下是一些伪代码,用于展示每种情况的工作流程
- 为简化说明
策略中仅使用了一个策略
update_online_pred 仅在在线模式下调用,在其他情况下被忽略
在线 + 训练器(Trainer)
tasks = first_train()
models = trainer.train(tasks)
trainer.end_train(models)
for day in online_trading_days:
# OnlineManager.routine
models = trainer.train(strategy.prepare_tasks()) # for each strategy
strategy.prepare_online_models(models) # for each strategy
trainer.end_train(models)
prepare_signals() # prepare trading signals daily
在线 + 延迟训练器:工作流程与 在线 + 训练器 相同。
模拟 + 延迟训练器(DelayTrainer)
# simulate
tasks = first_train()
models = trainer.train(tasks)
for day in historical_calendars:
# OnlineManager.routine
models = trainer.train(strategy.prepare_tasks()) # for each strategy
strategy.prepare_online_models(models) # for each strategy
# delay_prepare()
# FIXME: Currently the delay_prepare is not implemented in a proper way.
trainer.end_train(<for all previous models>)
prepare_signals()
我们能否简化当前的工作流程?
能否减少任务的状态数量?
对于每个任务,我们有三个阶段(即:任务、部分训练后的任务、最终训练完成的任务)
- 类 qlib.workflow.online.manager.OnlineManager(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')
OnlineManager 可以通过在线策略管理在线模型。它还提供了历史记录功能,用于记录哪些模型在何时处于在线状态。
- __init__(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')
初始化 OnlineManager。每个 OnlineManager 至少需要一个 OnlineStrategy。
- 参数:
strategies (Union[OnlineStrategy, List[OnlineStrategy]]) – 一个 OnlineStrategy 实例或 OnlineStrategy 的列表
begin_time (Union[str,pd.Timestamp], 可选) – OnlineManager 将从该时间开始运行。默认为 None,表示使用最新日期。
trainer (qlib.model.trainer.Trainer) – 用于训练任务的训练器。若为 None,则使用 TrainerR。
freq (str, 可选) – 数据频率。默认为 “day”。
- first_train(strategies: List[OnlineStrategy] | None = None, model_kwargs: dict = {})
从每个策略的 first_tasks 方法中获取任务并进行训练。如果使用 DelayTrainer,可以在所有策略的 first_tasks 执行完毕后一并完成训练。
- 参数:
strategies (List[OnlineStrategy]) – 策略列表(添加策略时需要此参数)。若为 None,则使用默认策略。
model_kwargs (dict) – prepare_online_models 所需的参数
- routine(cur_time: str | Timestamp | None = None, task_kwargs: dict = {}, model_kwargs: dict = {}, signal_kwargs: dict = {})
每个策略的典型更新流程,并记录在线历史。
例行操作后的典型更新流程,例如按天或按月进行。该流程包括:更新预测 -> 准备任务 -> 准备在线模型 -> 准备信号。
如果使用 DelayTrainer,可以在每个策略执行完 prepare_tasks 后统一完成训练。
- 参数:
cur_time (Union[str,pd.Timestamp], 可选) – 在指定时间运行该例行方法。默认为 None。
task_kwargs (dict) – 传递给 prepare_tasks 方法的参数。
model_kwargs (dict) – prepare_online_models 所需的参数
signal_kwargs (dict) – 传递给 prepare_signals 方法的参数。
- get_collector(**kwargs) 合并收集器
获取 Collector 实例,用于收集每个策略的结果。该收集器可作为信号准备的基础。
- 参数:
**kwargs – get_collector 方法的参数。
- 返回:
用于合并其他收集器的收集器。
- 返回类型:
- add_strategy(strategies: 在线策略 | 列表[在线策略])
向 OnlineManager 添加一些新策略。
- 参数:
strategy (Union[OnlineStrategy, List[OnlineStrategy]]) – OnlineStrategy 的列表
- prepare_signals(prepare_func: ~typing.Callable = <qlib.model.ens.ensemble.AverageEnsemble 对象>, over_write=False)
在完成上一个周期(箱线图中的一个箱子)的数据准备后,即该周期结束时,我们可以为下一个周期准备交易信号。
注意:给定一组预测结果时,所有早于这些预测结束时间的信号都将被妥善准备。
即使最新的信号已存在,最新的计算结果也会将其覆盖。
注意
给定某一时刻的预测,所有早于该时刻的信号都将被妥善准备。
- 参数:
prepare_func (Callable, 可选) – 从收集后的字典中获取信号。默认为 AverageEnsemble(),MergeCollector 收集的结果必须是 {xxx: pred} 格式。
over_write (bool, 可选) – 如果为 True,新信号将覆盖旧信号;如果为 False,新信号将追加到现有信号末尾。默认为 False。
- 返回:
这些信号。
- 返回类型:
pd.DataFrame
- get_signals() 序列 | DataFrame
获取已准备好的在线信号。
- 返回:
若每个时间点仅有一个信号,则返回 pd.Series;若有多个信号(例如买入和卖出操作使用不同的交易信号),则返回 pd.DataFrame。
- 返回类型:
Union[pd.Series, pd.DataFrame]
- simulate(end_time=None, frequency='day', task_kwargs={}, model_kwargs={}, signal_kwargs={}) Series | DataFrame
从当前时间开始,该方法将模拟 OnlineManager 中的每一个例行任务,直到结束时间。
考虑到并行训练,模型和信号可以在所有例行模拟完成后进行准备。
延迟训练方式可以是
DelayTrainer
,而延迟准备信号的方式可以是delay_prepare
。- 参数:
end_time – 仿真结束的时间
frequency – 日历频率
task_kwargs (dict) – 传递给 prepare_tasks 方法的参数。
model_kwargs (dict) – prepare_online_models 所需的参数
signal_kwargs (dict) – 传递给 prepare_signals 方法的参数。
- 返回:
若每个时间点仅有一个信号,则返回 pd.Series;若有多个信号(例如买入和卖出操作使用不同的交易信号),则返回 pd.DataFrame。
- 返回类型:
Union[pd.Series, pd.DataFrame]
- delay_prepare(model_kwargs={}, signal_kwargs={})
如果存在等待准备的模型或信号,则准备所有模型和信号。
- 参数:
model_kwargs – 传递给 end_train 的参数
signal_kwargs – 传递给 prepare_signals 的参数
在线策略
OnlineStrategy 模块是在线服务的一个组成部分。
- 类 qlib.workflow.online.strategy。在线策略(name_id: str)
OnlineStrategy 与 Online Manager 协同工作,决定任务如何生成、模型如何更新以及信号如何准备。
- __init__(name_id: str)
初始化 OnlineStrategy。该模块必须使用 Trainer 来完成模型训练。
- 参数:
name_id (str) – 唯一的名称或 ID。
trainer (qlib.model.trainer.Trainer, 可选) – Trainer 的一个实例。默认为 None。
- prepare_tasks(cur_time, **kwargs) List[dict]
在一个例行任务结束后,检查是否需要基于当前时间(None 表示最新时间)准备和训练一些新任务。返回等待训练的新任务列表。
你可以通过 OnlineTool.online_models 找到最新的在线模型。
- prepare_online_models(trained_models, cur_time=None) List[object]
从已训练的模型中选择一些模型并将其设置为在线模型。这是一个典型的实现,用于将所有已训练的模型上线,你可以重写该方法以实现更复杂的逻辑。如果你仍需要之前的在线模型,可以通过 OnlineTool.online_models 获取。
注意:将所有在线模型重置为已训练模型。如果没有已训练的模型,则不执行任何操作。
- 注意:
当前的实现非常简单。以下是一个更复杂的情况,更接近实际应用场景:1. 在 test_start 前一天(时间戳 T)训练新模型;2. 在 test_start 时刻(通常为时间戳 T + 1)切换模型。
- 参数:
models (list) – 模型列表。
cur_time (pd.Dataframe) – 来自 OnlineManager 的当前时间。若为 None,则表示使用最新时间。
- 返回:
一个包含在线模型的列表。
- 返回类型:
List[object]
- first_tasks() 列表[dict]
首先生成一系列任务并返回它们。
- 类 qlib.workflow.online.strategy.RollingStrategy(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)
此示例策略始终使用最新的滚动模型 sas 在线模型。
- __init__(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)
初始化 RollingStrategy。
假设:name_id 的字符串、实验名称以及训练器的实验名称相同。
- 参数:
name_id (str) – 唯一的名称或 ID,也将作为实验的名称。
task_template (Union[dict, List[dict]]) – 一个任务模板列表或单个模板,将用于通过 rolling_gen 生成多个任务。
rolling_gen (RollingGen) – RollingGen 的一个实例
- get_collector(process_list=[<qlib.model.ens.group.RollingGroup 对象>], rec_key_func=None, rec_filter_func=None, artifacts_key=None)
获取 Collector 实例以收集结果。返回的收集器必须能够区分不同模型的结果。
假设:模型可以通过模型名称和滚动测试时间段加以区分。如果您不希望依赖此假设,请实现您自己的方法或使用其他 rec_key_func。
- 参数:
rec_key_func (Callable) – 用于获取记录器(recorder)键的函数。如果为 None,则使用 recorder 的 ID。
rec_filter_func (Callable, 可选) – 通过返回 True 或 False 来过滤 recorder。默认为 None。
artifacts_key (List[str], 可选) – 您想要获取的 artifacts 键。如果为 None,则获取所有 artifacts。
- first_tasks() 列表[dict]
使用 rolling_gen 基于 task_template 生成不同的任务。
- 返回:
任务列表
- 返回类型:
List[dict]
- prepare_tasks(cur_time) 列表[dict]
基于 cur_time 准备新任务(None 表示最新时间)。
您可以通过 OnlineToolR.online_models 找到最新的在线模型。
- 返回:
一个包含新任务的列表。
- 返回类型:
List[dict]
在线工具
OnlineTool 是一个用于设置和取消设置一系列 在线 模型的模块。在线 模型是在某些时间点具有决定性的模型,可随时间变化而更新。这使得我们能够随着市场风格的变化使用高效的子模型。
- 类 qlib.workflow.online.utils.在线工具
OnlineTool 将管理包含模型记录器的实验中的 在线 模型。
- __init__()
初始化 OnlineTool。
- set_online_tag(tag, recorder: list | object)
将标签 tag 设置给模型,以标识其是否在线。
- 参数:
tag (str) – 来自 ONLINE_TAG、OFFLINE_TAG 的标签
recorder (Union[list,object]) – 模型的记录器(recorder)
- get_online_tag(recorder: object) str
根据模型记录器返回其在线标签。
- 参数:
recorder (Object) – 模型的记录器
- 返回:
在线标签
- 返回类型:
str
- reset_online_tag(recorder: list | object)
将所有模型设为离线,并将其记录器状态重置为‘在线’。
- 参数:
recorder (Union[list,object]) – 要重置为‘在线’状态的记录器
- online_models() list
获取当前处于 在线 状态的模型
- 返回:
一个包含所有 在线 模型的列表。
- 返回类型:
list
- update_online_pred(to_date=None)
将 在线 模型的预测更新到 to_date。
- 参数:
to_date (pd.Timestamp) – 此日期之前的预测将被更新。若为 None,则更新至最新。
- 类 qlib.workflow.online.utils.在线工具 R(default_exp_name: str | None = None)
基于 (R)ecorder 实现的 OnlineTool。
- __init__(default_exp_name: str | None = None)
初始化 OnlineToolR。
- 参数:
default_exp_name (str) – 默认实验名称。
- set_online_tag(tag, recorder: Recorder | List)
将标签 tag 设置到模型的记录器中,以标识其是否在线。
- 参数:
tag (str) – 可选标签包括 ONLINE_TAG、NEXT_ONLINE_TAG、OFFLINE_TAG
recorder (Union[Recorder, List]) – Recorder 对象的列表或单个 Recorder 实例
- get_online_tag(recorder: 记录器) str
根据模型记录器返回其在线标签。
- 参数:
recorder (Recorder) – 一个 recorder 实例
- 返回:
在线标签
- 返回类型:
str
- reset_online_tag(recorder: Recorder | List, exp_name: str | None = None)
将所有模型设为离线,并将其记录器状态重置为‘在线’。
- 参数:
recorder (Union[Recorder, List]) – 您想要重置为“在线”状态的记录器。
exp_name (str) – 实验名称。如果为 None,则使用默认实验名称。
- online_models(exp_name: str | None = None) list
获取当前处于 在线 状态的模型
- 参数:
exp_name (str) – 实验名称。如果为 None,则使用默认实验名称。
- 返回:
一个包含所有 在线 模型的列表。
- 返回类型:
list
- update_online_pred(to_date=None, from_date=None, exp_name: str | None = None)
将在线模型的预测更新至指定日期。
- 参数:
to_date (pd.Timestamp) – 该日期之前的预测将被更新。若为 None,则更新至日历中的最新时间。
exp_name (str) – 实验名称。如果为 None,则使用默认实验名称。
RecordUpdater
Updater 是一个用于在股票数据更新时,更新预测等产物的模块。
- 类 qlib.workflow.online.update.RMD 加载器(rec: 记录器)
记录器 模型 数据集 加载器
- get_dataset(start_time, end_time, segments=None, unprepared_dataset: DatasetH | None = None) DatasetH
加载、配置并设置数据集。
该数据集用于推理。
- 参数:
start_time – 底层数据的起始时间
end_time – 底层数据的结束时间
segments – 字典类型,表示数据集的分段配置。由于是时间序列数据集(TSDatasetH),测试分段可能与 start_time 和 end_time 不同
unprepared_dataset – 可选[DatasetH],如果用户不希望从记录器加载数据集,请指定用户的数据集
- 返回:
DatasetH 的实例
- 返回类型:
- class qlib.workflow.online.update.RecordUpdater(record: Recorder, *args, **kwargs)
更新特定的记录器
- 抽象 update(*args, **kwargs)
更新特定记录器的信息
- 类 qlib.workflow.online.update.DSBasedUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
基于数据集的更新器
为基于 Qlib 数据集的数据提供更新功能
假设
基于 Qlib 数据集
待更新的数据是一个多级索引的 pd.DataFrame,例如标签、预测值。
LABEL0 datetime instrument 2021-05-10 SH600000 0.006965 SH600004 0.003407 ... ... 2021-05-28 SZ300498 0.015748 SZ300676 -0.001321
- __init__(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
初始化 PredUpdater。
以下情况的预期行为:
如果 to_date 大于日历中的最大日期,则数据将更新到最新日期
如果存在 from_date 之前或 to_date 之后的数据,则仅影响 from_date 和 to_date 之间的数据。
- 参数:
record – 记录器(Recorder)
to_date –
将预测更新到 to_date
如果 to_date 为 None:
数据将更新到最新日期。
from_date –
更新将从 from_date 开始
如果 from_date 为 None:
更新将在历史数据中最新日期的下一个时间点进行
hist_ref –
int 有时,数据集会存在历史依赖。此参数由用户指定历史依赖的长度。如果用户未指定该参数,Updater 将尝试加载数据集以自动确定 hist_ref
注意
起始时间不包含在 hist_ref 内;因此在大多数情况下,hist_ref 等于 step_len - 1
loader_cls – type 用于加载模型和数据集的类
- prepare_data(unprepared_dataset: DatasetH | None = None) DatasetH
加载数据集 - 如果指定了 unprepared_dataset,则直接准备数据集 - 否则,
将此功能分离将使数据集更易于复用
- 返回:
DatasetH 的实例
- 返回类型:
- update(数据集: DatasetH | 无 = 无, 写入: 布尔值 = 真, 返回新数据: 布尔值 = 假) 对象 | 无
- 参数:
数据集 (DatasetH) – DatasetH:DatasetH 的实例。若为 None,则重新准备。
写入 (布尔值) – 是否执行写入操作
返回新数据 (布尔值) – 是否返回更新后的数据
- 返回:
更新后的数据集
- 返回类型:
可选[对象]
- 抽象 get_update_data(dataset: 数据集) DataFrame
根据给定的数据集返回更新后的数据
get_update_data 与 update 的区别 — update_date 仅包含某些特定于数据的特性,而 update 包含一些通用的常规步骤(例如准备数据集、检查等)
- 类 qlib.workflow.online.update.PredUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <类 'qlib.workflow.online.update.RMDLoader'>)
更新记录器(Recorder)中的预测结果
- get_update_data(dataset: 数据集) DataFrame
根据给定的数据集返回更新后的数据
get_update_data 与 update 的区别 — update_date 仅包含某些特定于数据的特性,而 update 包含一些通用的常规步骤(例如准备数据集、检查等)
- 类 qlib.workflow.online.update.LabelUpdater(record: Recorder, to_date=None, **kwargs)
更新记录器中的标签
假设 - 标签由 record_temp.SignalRecord 生成。
- __init__(record: Recorder, to_date=None, **kwargs)
初始化 PredUpdater。
以下情况的预期行为:
如果 to_date 大于日历中的最大日期,则数据将更新到最新日期
如果存在 from_date 之前或 to_date 之后的数据,则仅影响 from_date 和 to_date 之间的数据。
- 参数:
record – 记录器(Recorder)
to_date –
将预测更新到 to_date
如果 to_date 为 None:
数据将更新到最新日期。
from_date –
更新将从 from_date 开始
如果 from_date 为 None:
更新将在历史数据中最新日期的下一个时间点进行
hist_ref –
int 有时,数据集会存在历史依赖。此参数由用户指定历史依赖的长度。如果用户未指定该参数,Updater 将尝试加载数据集以自动确定 hist_ref
注意
起始时间不包含在 hist_ref 内;因此在大多数情况下,hist_ref 等于 step_len - 1
loader_cls – type 用于加载模型和数据集的类
- get_update_data(dataset: 数据集) DataFrame
根据给定的数据集返回更新后的数据
get_update_data 与 update 的区别 — update_date 仅包含某些特定于数据的特性,而 update 包含一些通用的常规步骤(例如准备数据集、检查等)
工具
可序列化
- 类 qlib.utils.serial.可序列化
Serializable 会改变 pickle 的行为。
在转储时判断一个属性是否会被保留或丢弃的规则。优先级更高的规则排在前面:- 在 config 属性列表中 -> 始终被丢弃 - 在 include 属性列表中 -> 始终被保留 - 在 exclude 属性列表中 -> 始终被丢弃 - 名称不以 _ 开头 -> 被保留 - 名称以 _ 开头 -> 如果 dump_all 为 true 则保留,否则丢弃
它提供了一种语法糖,用于区分用户不希望保留的属性。例如,一个可学习的 DataHandler 在转储到磁盘时只希望保存参数而不保存数据。
- __init__()
- 属性 dump_all
对象是否会转储所有属性
- config(recursive=False, **kwargs)
配置可序列化的对象
- 参数:
keys (kwargs 可能包含以下内容) –
- dump_allbool
对象是否会转储所有属性
- excludelist
哪些属性不会被转储
- includelist
哪些属性会被转储
recursive (bool) – 配置是否递归生效
- to_pickle(path: Path | str, **kwargs)
将自身转储为一个 pickle 文件。
path (Union[Path, str]): 要转储的路径
kwargs 可能包含以下键
- dump_allbool
对象是否会转储所有属性
- excludelist
哪些属性不会被转储
- includelist
哪些属性会被转储
- 类方法 load(filepath)
从文件路径加载可序列化的类。
- 参数:
filepath (str) – 文件的路径
- Raises:
TypeError – 被反序列化的文件必须是 type(cls) 类型
- 返回:
type(cls) 的实例
- 返回类型:
type(cls)
- 类方法 get_backend()
返回 Serializable 类的真实后端。pickle_backend 的值可以是 “pickle” 或 “dill”。
- 返回:
基于 pickle_backend 的 pickle 或 dill 模块
- 返回类型:
module
- static general_dump(obj, path: Path | str)
一个通用的对象转储方法
- 参数:
obj (object) – 要被转储的对象
path (Union[路径, 字符串]) – 数据将被转储的目标路径
强化学习
基础组件
- 类 qlib.rl.解释器
解释器(Interpreter)是模拟器产生的状态与强化学习策略所需状态之间的媒介。解释器具有双向功能:
从模拟器状态转换为策略状态(即观测值),参见
StateInterpreter
。从策略动作转换为模拟器可接受的动作,参见
ActionInterpreter
。
建议继承两个子类之一来定义您自己的解释器。该超类仅用于 isinstance 检查。
推荐解释器保持无状态,这意味着在解释器中使用
self.xxx
存储临时信息是一种不良实践。未来我们可能会通过调用self.env.register_state()
来支持注册一些与解释器相关的状态,但这不在第一版的计划中。
- 类 qlib.rl.StateInterpreter(*args, **kwds)
将 Qlib 执行器的执行结果解释为强化学习环境状态的状态解释器
- 验证(观测值: 观测类型) None
验证某个观测值是否属于预定义的观测空间。
- 解释(模拟器状态: 状态类型) 观测类型
解释模拟器的状态。
- 参数:
simulator_state – 通过
simulator.get_state()
获取。- 返回类型:
策略所需的状态。应符合
observation_space
中定义的状态空间。
- 类 qlib.rl.ActionInterpreter(*args, **kwds)
将强化学习智能体的动作解释为 Qlib 订单的动作解释器
- 验证(动作: 策略动作类型) None
验证某个动作是否属于预定义的动作空间。
- interpret(simulator_state: StateType, action: PolicyActType) ActType
将策略动作转换为模拟器动作。
- 参数:
simulator_state – 通过
simulator.get_state()
获取。action – 策略给出的原始动作。
- 返回类型:
模拟器所需的动作。
- class qlib.rl.Reward(*args, **kwds)
奖励计算组件,接受一个参数:模拟器的状态。返回一个实数:奖励值。
子类应实现
reward(simulator_state)
方法,以自定义其奖励计算逻辑。- reward(模拟器状态: SimulatorState) float
请实现此方法以定义你自己的奖励函数。
- 类 qlib.rl.奖励组合(rewards: 字典[str, 元组[奖励, float]])
多个奖励的组合。
- reward(模拟器状态: Any) float
请实现此方法以定义你自己的奖励函数。
- class qlib.rl.Simulator(initial: InitialStateType, **kwargs: Any)
通过
__init__
重置,并通过step(action)
进行状态转移的模拟器。为了使数据流更加清晰,我们对 Simulator 做出以下限制:
修改模拟器内部状态的唯一方式是使用
step(action)
。外部模块可以通过调用
simulator.get_state()
来读取模拟器的状态,并通过调用simulator.done()
来检查模拟器是否处于结束状态。
模拟器被定义为具有三种类型的边界:
InitialStateType,表示用于创建模拟器的数据类型。
StateType,表示模拟器的状态(state)类型。
ActType,表示动作的类型,即每一步接收到的输入。
不同的模拟器可能共享相同的 StateType。例如,当它们处理相同任务但采用不同的模拟实现时,使用相同的类型可以安全地共享 MDP 中的其他组件。
模拟器是短暂存在的。其生命周期从初始状态开始,到轨迹结束为止。换句话说,当轨迹结束后,模拟器将被回收。如果模拟器需要在之间共享上下文(例如为了加速),可以通过访问环境包装器(env-wrapper)的弱引用来实现。
- env
对 env-wrapper 的引用,在某些边缘情况下可能有用。但不建议模拟器使用此引用,因为它容易引发错误。
- 类型:
Optional[EnvWrapper]
- __init__(initial: InitialStateType, **kwargs: Any) None
- step(动作: ActType) None
接收一个 ActType 类型的动作。
模拟器应更新其内部状态,并返回 None。更新后的状态可通过
simulator.get_state()
获取。
- done() 布尔值
检查模拟器是否处于“完成”状态。当模拟器处于“完成”状态时,不应再接收任何
step
请求。由于模拟器是短暂存在的,若要重置模拟器,应销毁旧实例并创建一个新的模拟器。
策略
- 类 qlib.rl.strategy.SingleOrderStrategy(order: Order, trade_range: TradeRange | None = None)
用于生成仅包含一个订单的交易决策的策略。
- __init__(order: Order, trade_range: TradeRange | None = None) None
- 参数:
outer_trade_decision (BaseTradeDecision,可选) –
该策略所依赖的外部策略的交易决策,此决策将在 [start_time, end_time] 时间范围内执行,默认为 None
如果该策略用于拆分交易决策,则会使用此参数
如果该策略用于投资组合管理,则可以忽略此参数
level_infra (LevelInfrastructure,可选) – 回测所用的层级共享基础设施,包括交易日历
common_infra (CommonInfrastructure,可选) – 回测所用的通用基础设施,包括交易账户、交易市场等
trade_exchange (Exchange) –
提供市场信息的交易所,用于处理订单并生成报告
如果 trade_exchange 为 None,则 self.trade_exchange 将被设置为 common_infra
允许在不同的执行过程中使用不同的交易市场。
例如:
在日频执行中,日频和分钟频的交易市场均可使用,但推荐使用日频交易市场,因其运行速度更快。
在分钟频执行中,日频交易所不可用,仅推荐使用分钟频交易所。
- generate_trade_decision(execute_result: list | None = None) TradeDecisionWO
在每个交易周期生成交易决策
- 参数:
execute_result (List[object], 可选) –
交易决策的执行结果,默认为 None
首次调用 generate_trade_decision 时,execute_result 可以为 None
Trainer
训练、测试、推理工具。
- 类 qlib.rl.trainer.Trainer(*, max_iters: int | None = None, val_every_n_iters: int | None = None, loggers: LogWriter | List[LogWriter] | None = None, callbacks: List[Callback] | None = None, finite_env_type: FiniteEnvType = 'subproc', concurrency: int = 2, fast_dev_run: int | None = None)
用于在特定任务上训练策略的工具。
与传统的深度学习训练器不同,该训练器的迭代单位是“收集”(collect),而不是“轮次”(epoch)或“小批量”(mini-batch)。在每一次收集过程中,
Collector
会收集一定数量的策略-环境交互数据,并将其累积到一个回放缓冲区中。该缓冲区被用作训练策略的“数据”。在每次收集结束时,策略会被多次更新。该 API 在形式上与PyTorch Lightning有些相似,但本质上是不同的,因为该训练器是为强化学习(RL)应用而构建的,因此大多数配置都处于 RL 的上下文中。我们仍在探索如何整合现有的训练器库,因为要构建一个像那些库一样强大的训练器似乎需要巨大的工作量,而且这也不是我们的主要目标。
它与tianshou 内置的训练器有本质区别,因为它比后者复杂得多。
- 参数:
max_iters —— 停止前的最大迭代次数。
val_every_n_iters —— 每 n 次迭代(即训练收集)执行一次验证。
logger —— 用于记录回测结果的日志记录器。必须提供日志记录器,否则所有信息都将丢失。
finite_env_type —— 有限环境实现的类型。
concurrency —— 并行工作进程数。
fast_dev_run —— 创建一个子集用于调试。具体实现方式取决于训练容器的实现。对于
TrainingVessel
,如果该值大于零,则将使用一个大小为fast_dev_run
的随机子集,替代train_initial_states
和val_initial_states
。
- should_stop: 布尔值
设置后将停止训练。
- metrics: dict
训练/验证/测试过程中产生的数值型指标。在训练/验证过程中,指标反映的是最新一个 episode 的结果;当每次训练/验证迭代完成后,指标将汇总该次迭代中所有 episode 的结果。
每次新的训练迭代开始时都会清空。
在 fit 过程中,验证指标将以
val/
作为前缀。
- current_iter: int
当前训练的迭代次数(即收集次数)。
- __init__(*, max_iters: int | None = None, val_every_n_iters: int | None = None, loggers: LogWriter | List[LogWriter] | None = None, callbacks: List[Callback] | None = None, finite_env_type: FiniteEnvType = 'subproc', concurrency: int = 2, fast_dev_run: int | None = None)
- 初始化()
初始化整个训练过程。
此处的状态应与 state_dict 保持同步。
- initialize_iter()
初始化一次迭代 / 数据收集。
- state_dict() dict
尽最大努力将当前训练的所有状态放入一个字典中。
它不会尝试处理一次训练收集过程中可能出现的所有类型的状态。在大多数情况下,在每次迭代结束时,状态通常是正确的。
请注意,收集器中的回放缓冲区数据丢失也是预期行为。
- load_state_dict(state_dict: dict) None
将所有状态加载到当前训练器中。
- named_callbacks() 字典[str, 回调函数]
获取一个包含多个回调函数的集合,每个回调函数都有名称。在保存检查点时非常有用。
- fit(vessel: TrainingVesselBase, ckpt_path: Path | None = None) None
基于定义的模拟器训练强化学习策略。
- 参数:
vessel – 训练中使用的所有元素的集合。
ckpt_path – 加载一个预训练或暂停的训练检查点。
- test(vessel: 训练载体基类) None
针对模拟器测试强化学习策略。
模拟器将接收来自
test_seed_iterator
生成的数据。- 参数:
vessel – 所有相关元素的集合。
- venv_from_iterator(iterator: 可迭代对象[InitialStateType]) FiniteVectorEnv
根据迭代器和训练载体创建一个向量化环境。
- 类 qlib.rl.trainer.TrainingVessel(*, simulator_fn: Callable[[InitialStateType], Simulator[InitialStateType, StateType, ActType]], state_interpreter: StateInterpreter[StateType, ObsType], action_interpreter: ActionInterpreter[StateType, PolicyActType, ActType], policy: BasePolicy, reward: Reward, train_initial_states: Sequence[InitialStateType] | None = None, val_initial_states: Sequence[InitialStateType] | None = None, test_initial_states: Sequence[InitialStateType] | None = None, buffer_size: int = 20000, episode_per_iter: int = 1000, update_kwargs: Dict[str, Any] = None)
训练器的默认实现。
__init__
接受一个初始状态序列,以便创建迭代器。train
、validate
和test
各自执行一次数据收集(在训练时还会进行更新)。默认情况下,在训练期间会无限重复使用训练的初始状态,而收集器将控制每次迭代的回合数;在验证和测试时,验证/测试的初始状态将仅使用一次。额外的超参数(仅在训练中使用)包括:
buffer_size
:重放缓冲区的大小。episode_per_iter
:训练时每次收集的回合数。可通过快速开发运行模式覆盖。update_kwargs
:传递给policy.update
的关键字参数。例如,dict(repeat=10, batch_size=64)
。
- __init__(*, simulator_fn: Callable[[InitialStateType], Simulator[InitialStateType, StateType, ActType]], state_interpreter: StateInterpreter[StateType, ObsType], action_interpreter: ActionInterpreter[StateType, PolicyActType, ActType], policy: BasePolicy, reward: Reward, train_initial_states: Sequence[InitialStateType] | None = None, val_initial_states: Sequence[InitialStateType] | None = None, test_initial_states: Sequence[InitialStateType] | None = None, buffer_size: int = 20000, episode_per_iter: int = 1000, update_kwargs: Dict[str, Any] = None)
- train_seed_iterator() ContextManager[可迭代对象[InitialStateType]] | 可迭代对象[InitialStateType]
重写此方法以创建用于训练的种子迭代器。如果该迭代器是上下文管理器,则整个训练过程将在 with 语句块中执行,并且在训练完成后迭代器会自动关闭。
- val_seed_iterator() ContextManager[可迭代对象[InitialStateType]] | 可迭代对象[InitialStateType]
重写此方法以创建用于验证的种子迭代器。
- test_seed_iterator() ContextManager[可迭代对象[InitialStateType]] | 可迭代对象[InitialStateType]
重写此方法以创建用于测试的种子迭代器。
- 训练(vector_env: FiniteVectorEnv) 字典[str, Any]
创建一个采集器并收集
episode_per_iter
个回合的数据。使用采集到的回放缓冲区更新策略。
- 验证(vector_env: FiniteVectorEnv) 字典[str, Any]
实现此方法以对策略进行一次验证。
- test(vector_env: FiniteVectorEnv) 字典[str, Any]
实现此方法以在测试环境中对策略进行一次评估。
- 类 qlib.rl.trainer.TrainingVesselBase(*args, **kwds)
一个包含模拟器、解释器和策略的容器,将被传递给训练器。该类控制训练中的算法相关部分,而训练器负责运行时部分。
该容器还定义了核心训练部分最重要的逻辑,以及(可选)一些回调函数,用于在特定事件发生时插入自定义逻辑。
- train_seed_iterator() ContextManager[可迭代对象[InitialStateType]] | 可迭代对象[InitialStateType]
重写此方法以创建用于训练的种子迭代器。如果该迭代器是上下文管理器,则整个训练过程将在 with 语句块中执行,并且在训练完成后迭代器会自动关闭。
- val_seed_iterator() ContextManager[可迭代对象[InitialStateType]] | 可迭代对象[InitialStateType]
重写此方法以创建用于验证的种子迭代器。
- test_seed_iterator() ContextManager[可迭代对象[InitialStateType]] | 可迭代对象[InitialStateType]
重写此方法以创建用于测试的种子迭代器。
- 训练(vector_env: BaseVectorEnv) 字典[str, Any]
实现此方法以完成一次迭代训练。在强化学习中,一次迭代通常指一次数据采集过程。
- 验证(vector_env: FiniteVectorEnv) 字典[str, Any]
实现此方法以对策略进行一次验证。
- test(vector_env: FiniteVectorEnv) 字典[str, Any]
实现此方法以在测试环境中对策略进行一次评估。
- state_dict() 字典
返回当前容器状态的检查点。
- load_state_dict(state_dict: 字典) None
从先前保存的状态字典中恢复检查点。
- 类 qlib.rl.trainer.Checkpoint(dirpath: 路径, filename: str = '{iter:03d}.pth', save_latest: Literal['link', 'copy'] | None = 'link', every_n_iters: int | None = None, time_interval: int | None = None, save_on_fit_end: bool = True)
定期保存检查点以实现持久化和恢复。
- 参数:
dirpath – 保存检查点文件的目录。
filename –
检查点文件名。可以包含自动填充的命名格式选项。例如:
{iter:03d}-{reward:.2f}.pth
。支持的参数名称包括:iter (int)
trainer.metrics
中的指标时间字符串,格式为
%Y%m%d%H%M%S
save_latest – 将最新的检查点保存为
latest.pth
。如果设置为link
,则latest.pth
将创建为一个软链接。如果设置为copy
,则latest.pth
将作为独立副本存储。设为 none 可禁用此功能。every_n_iters – 每 n 次训练迭代结束时(如果适用,在验证后)保存检查点。
time_interval – 两次检查点保存之间的最大时间间隔(秒)。
save_on_fit_end – 在训练结束时再保存最后一个检查点。如果该位置已保存检查点,则不执行任何操作。
- __init__(dirpath: 路径, filename: 字符串 = '{iter:03d}.pth', save_latest: Literal['link', 'copy'] | 无 = 'link', every_n_iters: 整数 | 无 = 无, time_interval: 整数 | 无 = 无, save_on_fit_end: 布尔值 = 真)
- on_fit_end(trainer: Trainer, vessel: TrainingVesselBase) None
在整个拟合过程结束后被调用。
- on_iter_end(trainer: Trainer, vessel: TrainingVesselBase) None
在每次迭代结束时被调用。该方法在
current_iter
递增 之后 调用,表示上一次迭代已完整结束。
- 类 qlib.rl.trainer.EarlyStopping(monitor: str = 'reward', min_delta: float = 0.0, patience: int = 0, mode: Literal['min', 'max'] = 'max', baseline: float | None = None, restore_best_weights: bool = False)
当被监控的指标停止改善时,停止训练。
每次验证结束后,都会触发早停(earlystopping)回调函数。它会检查验证过程中产生的指标,并获取名为
monitor` (``monitor
默认为reward
)的指标,以判断其是否不再上升/下降。此时会考虑min_delta
和patience
参数(如果已设置)。一旦发现该指标不再增加/减少,trainer.should_stop
将被设为 true,训练随即终止。实现参考:https://github.com/keras-team/keras/blob/v2.9.0/keras/callbacks.py#L1744-L1893
- __init__(monitor: str = 'reward', min_delta: float = 0.0, patience: int = 0, mode: Literal['min', 'max'] = 'max', baseline: float | None = None, restore_best_weights: bool = False)
- state_dict() dict
获取回调函数的状态字典,用于暂停和恢复。
- load_state_dict(state_dict: dict) None
从保存的状态字典中恢复回调函数。
- on_fit_start(trainer: Trainer, vessel: TrainingVesselBase) None
在整个训练流程开始之前调用。
- on_validate_end(trainer: Trainer, vessel: TrainingVesselBase) None
当验证结束时调用。
- 类 qlib.rl.trainer.指标写入器(dirpath: Path)
将训练指标转储到文件中。
- __init__(dirpath: Path) None
- on_train_end(trainer: Trainer, vessel: TrainingVesselBase) None
当训练结束时调用。若要访问训练期间产生的所有输出,请在 trainer 或 vessel 中缓存数据,并在此钩子中进行后续处理。
- on_validate_end(trainer: Trainer, vessel: TrainingVesselBase) None
当验证结束时调用。
- qlib.rl.trainer.train(simulator_fn: Callable[[InitialStateType], Simulator], state_interpreter: StateInterpreter, action_interpreter: ActionInterpreter, initial_states: Sequence[InitialStateType], policy: BasePolicy, reward: Reward, vessel_kwargs: Dict[str, Any], trainer_kwargs: Dict[str, Any]) None
利用强化学习框架提供的并行性来训练策略。
实验性 API,参数可能会很快发生变化。
- 参数:
simulator_fn – 可调用对象,接收初始种子并返回一个模拟器。
state_interpreter – 解释模拟器状态的组件。
action_interpreter – 解释策略动作的组件。
initial_states – 要遍历的初始状态。每个状态将恰好运行一次。
policy – 要训练的策略。
reward – 奖励函数。
vessel_kwargs – 传递给
TrainingVessel
的关键字参数,例如episode_per_iter
。trainer_kwargs – 传递给
Trainer
的关键字参数,例如finite_env_type
和concurrency
。
- qlib.rl.trainer.backtest(simulator_fn: 可调用对象[[初始状态类型], Simulator], state_interpreter: StateInterpreter, action_interpreter: ActionInterpreter, initial_states: Sequence[InitialStateType], policy: BasePolicy, logger: LogWriter | List[LogWriter], reward: Reward | None = None, finite_env_type: FiniteEnvType = 'subproc', concurrency: int = 2) 无返回值
利用强化学习框架提供的并行性进行回测。
实验性 API,参数可能会很快发生变化。
- 参数:
simulator_fn – 可调用对象,接收初始种子并返回一个模拟器。
state_interpreter – 解释模拟器状态的组件。
action_interpreter – 解释策略动作的组件。
initial_states – 要遍历的初始状态。每个状态将恰好运行一次。
policy – 要测试的策略。
logger —— 用于记录回测结果的日志记录器。必须提供日志记录器,否则所有信息都将丢失。
reward – 可选的奖励函数。在回测中,仅用于测试和记录奖励。
finite_env_type —— 有限环境实现的类型。
concurrency —— 并行工作进程数。
订单执行
目前支持单资产订单执行。多资产功能正在开发中。
- 类 qlib.rl.order_execution.FullHistoryStateInterpreter(max_step: int, data_ticks: int, data_dim: int, processed_data_provider: dict | ProcessedDataProvider)
包含今天(截至当前时刻)和昨天在内的全部历史观测数据。
- 参数:
max_step – 总步数(上限估计值)。例如,390 分钟 / 每步 30 分钟 = 13 步。
data_ticks – 等于总记录数。例如,在每分钟执行一次的 SAOE 中,总 tick 数即为一天的分钟长度。
data_dim – 数据的维度数量。
processed_data_provider – 提供预处理数据的组件。
- __init__(max_step: int, data_ticks: int, data_dim: int, processed_data_provider: dict | ProcessedDataProvider) None
- 类 qlib.rl.order_execution.当前步状态解释器(最大步数: int)
当前步骤的观测值。
当策略仅依赖于最新状态而不依赖历史时使用。该键列表不完整,如果你的策略需要更多信息,可以添加更多。
- __init__(最大步数: int) None
- class qlib.rl.order_execution.CategoricalActionInterpreter(values: int | List[float], max_step: int | None = None)
将离散策略动作转换为连续动作,然后乘以
order.amount
。- 参数:
values – 可以是一个长度为 $L$ 的列表:$[a_1, a_2, \ldots, a_L]$。当策略给出决策 $x$ 时,输出为 $a_x$ 倍的下单量。也可以是一个整数 $n$,此时会自动生成一个长度为 $n+1$ 的列表,即 $[0, 1/n, 2/n, \ldots, n/n]$。
max_step – 总步数(上限估计值)。例如,390 分钟 / 每步 30 分钟 = 13 步。
- __init__(values: int | List[float], max_step: int | None = None) None
- class qlib.rl.order_execution.TwapRelativeActionInterpreter(*args, **kwds)
将连续的比例转换为交易数量。
该比例相对于当天剩余时间内的 TWAP 策略而言。例如,还剩 5 个步骤,剩余持仓为 300。按照 TWAP 策略,每步应交易 60。当此解释器接收到动作 $a$ 时,其输出为 $60 \cdot a$。
- 类 qlib.rl.order_execution.Recurrent(obs_space: FullHistoryObs, hidden_dim: int = 64, output_dim: int = 32, rnn_type: Literal['rnn', 'lstm', 'gru'] = 'gru', rnn_num_layers: int = 1)
该网络架构源自 OPD。
在每个时间步,策略网络的输入被分为两部分:公共变量和私有变量,分别由本网络中的
raw_rnn
和pri_rnn
处理。一个细微的差别是,在本实现中,我们不假设方向是固定的。因此,额外添加了一个
dire_fc
来生成一个与方向相关的附加特征。- __init__(obs_space: FullHistoryObs, hidden_dim: int = 64, output_dim: int = 32, rnn_type: Literal['rnn', 'lstm', 'gru'] = 'gru', rnn_num_layers: int = 1) None
初始化内部 Module 状态,该状态由 nn.Module 和 ScriptModule 共享。
- forward(batch: 批次) 张量
输入应为一个字典(至少包含以下内容):
data_processed: [N, T, C]
cur_step: [N](整数)
cur_time: [N](整数)
position_history: [N, S](S 为步数)
target: [N]
num_step: [N](整数)
acquiring: [N](0 或 1)
- 类 qlib.rl.order_execution.AllOne(obs_space: gym.Space, action_space: gym.Space, fill_value: float | int = 1.0)
前向传播返回一个全部为 1 的批次。
在实现某些基线方法时非常有用(例如,TWAP)。
- __init__(obs_space: gym.Space, action_space: gym.Space, fill_value: float | int = 1.0) None
- forward(batch: Batch, state: dict | Batch | np.ndarray = None, **kwargs: Any) Batch
根据给定的批次数据计算动作。
- 返回:
一个
Batch
对象,必须包含以下键:act
:一个 numpy.ndarray 或 torch.Tensor,表示对给定批次数据采取的动作。state
:一个字典、numpy.ndarray 或 torch.Tensor,表示策略的内部状态,默认为None
。
其他键由用户自定义,具体取决于所使用的算法。例如:
# some code return Batch(logits=..., act=..., state=None, dist=...)
关键字
policy
是保留的,对应的数据将被存储到回放缓冲区中。例如:# some code return Batch(..., policy=Batch(log_prob=dist.log_prob(act))) # and in the sampled data batch, you can directly use # batch.policy.log_prob to get your data.
注意
在连续动作空间中,你应该再执行一步“map_action”来获得真实动作:
act = policy(batch).act # doesn't map to the target action range act = policy.map_action(act, batch)
- 类 qlib.rl.order_execution.PPO(network: Module, obs_space: Space, action_space: Space, lr: float, weight_decay: float = 0.0, discount_factor: float = 1.0, max_grad_norm: float = 100.0, reward_normalization: bool = True, eps_clip: float = 0.3, value_clip: bool = True, vf_coef: float = 1.0, gae_lambda: float = 1.0, max_batch_size: int = 256, deterministic_eval: bool = True, weight_file: Path | None = None)
Tianshou PPOPolicy 的一个封装。
差异:
自动创建 actor 和 critic 网络。仅支持离散动作空间。
去除了 actor 网络和 critic 网络之间的公共参数冗余(不确定最新版 tianshou 是否已包含此功能)。
支持一个
weight_file
,可用于加载检查点。部分参数的默认值与原始版本不同。
- __init__(network: 模块, obs_space: 空间, action_space: 空间, lr: 浮点数, weight_decay: 浮点数 = 0.0, discount_factor: 浮点数 = 1.0, max_grad_norm: 浮点数 = 100.0, reward_normalization: 布尔值 = True, eps_clip: 浮点数 = 0.3, vf_coef: 浮点数 = 1.0, gae_lambda: 浮点数 = 1.0, max_batch_size: 整数 = 256, deterministic_eval: 布尔值 = True, weight_file: 路径 | 无 = 无) 无
- 类 qlib.rl.order_execution.PAPenaltyReward(penalty: float = 100.0, scale: float = 1.0)
鼓励更高的 PA(执行率),但惩罚在极短时间内集中交易大量订单的行为。正式地,对于每个时间步,奖励为 \((PA_t * vol_t / target - vol_t^2 * penalty)\)。
- 参数:
penalty – 对短时间内大额交易量的惩罚系数。
scale – 用于放大或缩小奖励的权重。
- __init__(penalty: float = 100.0, scale: float = 1.0) None
- 类 qlib.rl.order_execution.SingleAssetOrderExecutionSimple(order: Order, data_dir: Path, feature_columns_today: List[str] = [], feature_columns_yesterday: List[str] = [], data_granularity: int = 1, ticks_per_step: int = 30, vol_threshold: float | None = None)
单资产订单执行(SAOE)模拟器。
由于简单模拟器中没有“日历”概念,因此使用 tick 进行交易。一个 tick 是 pickle 格式数据文件中的一条记录(一行)。每个 tick 被视为一次独立的交易机会。如果不需要如此细的粒度,可以使用
ticks_per_step
参数来延长每一步所包含的 tick 数量。在每一步中,待交易的金额会“均等”地分配到每个 tick 上,并受最大执行成交量(即
vol_threshold
)的限制;如果当前是最后一步,则会尽量确保完成全部待执行金额。- 参数:
order – 启动 SAOE 模拟器的起点是一个订单。
data_dir – 用于加载回测数据的路径。
feature_columns_today – 当日特征的列名。
feature_columns_yesterday – 昨日特征的列名。
data_granularity – 连续数据条目之间的 tick 数量。
ticks_per_step – 每步包含多少个 tick。
vol_threshold – 最大执行成交量(相对于市场成交量的比例)。
- __init__(order: Order, data_dir: Path, feature_columns_today: List[str] = [], feature_columns_yesterday: List[str] = [], data_granularity: int = 1, ticks_per_step: int = 30, vol_threshold: float | None = None) None
- ticks_index: pd.DatetimeIndex
当天所有可用的时点(不限于订单)。
- ticks_for_order: pd.DatetimeIndex
可用于交易的时点(按订单切片)。
- twap_price: float
该价格用于计算价格优势。定义为从订单开始时间到结束时间期间的平均价格。
- history_exec: pd.DataFrame
在每个可能的时间时点上的全部执行历史。可用列请参见
SAOEMetrics
。索引为datetime
。
- history_steps: pd.DataFrame
每一步对应的位置。第一步之前的位置也会被记录。可用列请参见
SAOEMetrics
。索引为datetime
,表示每一步的起始时间。
- step(amount: float) None
执行一步或 SAOE。
- 参数:
amount – 您希望交易的数量。模拟器不保证所有数量都能成功成交。
- done() 布尔值
检查模拟器是否处于“完成”状态。当模拟器处于“完成”状态时,不应再接收任何
step
请求。由于模拟器是短暂存在的,若要重置模拟器,应销毁旧实例并创建一个新的模拟器。
- 类 qlib.rl.order_execution.SAOEStateAdapter(order: Order, trade_decision: BaseTradeDecision, executor: BaseExecutor, exchange: Exchange, ticks_per_step: int, backtest_data: IntradayBacktestData, data_granularity: int = 1)
维护环境的状态。SAOEStateAdapter 接收执行结果,并根据执行结果以及从执行器和交易所获取的附加信息更新其内部状态。例如,它从执行结果中获取已成交的订单数量,并从交易所获取相应的市场价格/成交量。
使用示例:
adapter = SAOEStateAdapter(...) adapter.update(...) state = adapter.saoe_state
- __init__(order: 订单, trade_decision: 基础交易决策, executor: 基础执行器, exchange: 交易所, ticks_per_step: 整数, backtest_data: 日内回测数据, data_granularity: 整数 = 1) 无
- generate_metrics_after_done() None
在上层执行完成后生成指标
- 类 qlib.rl.order_execution.SAOEMetrics(*args, **kwargs)
用于“时间段”内累积的 SAOE 指标。可以按天累积,也可以按时间段(例如 30 分钟)累积,或每分钟单独计算。
警告
类型提示针对单个元素。很多时候它们可以向量化。例如,
market_volume
可以是浮点数列表(或 ndarray),而不仅仅是一个单独的浮点数。- stock_id: str
此记录的股票 ID。
- datetime: pd.Timestamp | pd.DatetimeIndex
此记录的时间(在数据框中作为索引)。
- direction: int
订单方向。0 表示卖出,1 表示买入。
- market_volume: np.ndarray | float
该时间段内的市场总成交量。
- market_price: np.ndarray | float
成交价格。若为一段时间,则为此期间的平均市场成交价。
- amount: np.ndarray | float
策略计划交易的总数量(成交量)。
- inner_amount: np.ndarray | float
底层策略计划交易的总量(可能大于 amount,例如为了确保完成率 ffr)。
- deal_amount: np.ndarray | float
成功执行的交易量(必须小于 inner_amount)。
- trade_price: np.ndarray | float
该策略的平均成交价格。
- trade_value: np.ndarray | float
交易的总价值。在简单模拟中,trade_value = deal_amount * price。
- position: np.ndarray | float
此“时间段”结束后剩余的持仓。
- ffr: np.ndarray | float
已完成的当日订单百分比。
- pa: np.ndarray | float
相对于基准的价格优势(即使用基准市场价进行交易的情况)。基准为使用 TWAP 策略执行该订单时的成交价。请注意此处可能存在数据泄露。单位为基点(BP,1/10000)。
- 类 qlib.rl.order_execution.SAOEState(order: Order, cur_time: pd.Timestamp, cur_step: int, position: float, history_exec: pd.DataFrame, history_steps: pd.DataFrame, metrics: SAOEMetrics | None, backtest_data: BaseIntradayBacktestData, ticks_per_step: int, ticks_index: pd.DatetimeIndex, ticks_for_order: pd.DatetimeIndex)
用于存储 SAOE 模拟器状态的数据结构。
- 订单: Order
我们正在处理的订单。
- cur_time: pd.Timestamp
- 类型:
当前时间,例如 9
- cur_step: int
当前步骤,例如 0。
- position: float
当前剩余待执行成交量。
- history_exec: pd.DataFrame
参见
SingleAssetOrderExecution.history_exec
。
- history_steps: pd.DataFrame
参见
SingleAssetOrderExecution.history_steps
。
- backtest_data: BaseIntradayBacktestData
回测数据包含在状态中。目前实际上只需要该数据的时间索引。我包含完整数据是为了支持依赖原始数据的算法(例如 VWAP)的实现。解释器可按需使用,但应注意避免泄露未来数据。
- ticks_per_step: int
每一步包含多少个时间点(ticks)。
- ticks_index: pd.DatetimeIndex
31, …, 14:59]。
- 类型:
全天的交易时间点,未按订单切分(在数据中定义)。例如 [9
- 类型:
30, 9
- ticks_for_order: pd.DatetimeIndex
46, …, 14:44]。
- 类型:
按订单切分的交易时间点,例如 [9
- 类型:
45, 9
- 类 qlib.rl.order_execution.SAOEStrategy(policy: BasePolicy, outer_trade_decision: BaseTradeDecision | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, data_granularity: int = 1, **kwargs: Any)
使用 SAOEState 作为状态的基于强化学习(RL)的策略。
- __init__(policy: BasePolicy, outer_trade_decision: BaseTradeDecision | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, data_granularity: int = 1, **kwargs: Any) None
- 参数:
policy – 用于生成动作的强化学习策略
- 重置(outer_trade_decision: BaseTradeDecision | 无 = 无, **kwargs: 任意类型) 无返回值
重置 level_infra,用于重置交易日历等
重置 common_infra,用于重置 trade_account、trade_exchange 等
重置 outer_trade_decision,用于做出拆分决策
注意:将此函数拆分为 reset 和 _reset 可使以下情况更加方便:1. 用户希望通过重写 reset 来初始化其策略,但又不希望影响初始化时调用的 _reset
- post_upper_level_exe_step() None
当上层执行器完成执行后,用于执行某些操作的钩子(例如,完成指标收集)。
- post_exe_step(execute_result: list | None) None
在对应执行器完成执行后,用于执行某些操作的钩子。
- 参数:
execute_result — 执行结果
- generate_trade_decision(execute_result: list | None = None) 基础交易决策 | 生成器[Any, Any, 基础交易决策]
对于 SAOEStrategy,每次生成决策时都需要更新 self._last_step_range。此操作应对开发者透明,因此我们在 generate_trade_decision() 中实现它。生成决策的具体逻辑应在 _generate_trade_decision() 中实现。换句话说,所有 SAOEStrategy 的子类都应重写 _generate_trade_decision(),而不是 generate_trade_decision()。
- 类 qlib.rl.order_execution.ProxySAOEStrategy(outer_trade_decision: BaseTradeDecision | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs: Any)
使用 SAOEState 的代理策略。之所以称为“代理”策略,是因为它自身不进行任何决策。相反,当需要该策略生成决策时,它会输出环境信息,由外部代理进行决策。更多细节请参考 _generate_trade_decision。
- __init__(outer_trade_decision: BaseTradeDecision | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs: Any) None
- 参数:
policy – 用于生成动作的强化学习策略
- 重置(outer_trade_decision: BaseTradeDecision | 无 = 无, **kwargs: 任意类型) 无返回值
重置 level_infra,用于重置交易日历等
重置 common_infra,用于重置 trade_account、trade_exchange 等
重置 outer_trade_decision,用于做出拆分决策
注意:将此函数拆分为 reset 和 _reset 可使以下情况更加方便:1. 用户希望通过重写 reset 来初始化其策略,但又不希望影响初始化时调用的 _reset
- 类 qlib.rl.order_execution.SAOEIntStrategy(policy: dict | BasePolicy, state_interpreter: dict | StateInterpreter, action_interpreter: dict | ActionInterpreter, network: dict | torch.nn.Module | None = None, outer_trade_decision: BaseTradeDecision | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs: Any)
基于状态的策略(SAOE)与解释器(Int)。
- __init__(policy: dict | BasePolicy, state_interpreter: dict | StateInterpreter, action_interpreter: dict | ActionInterpreter, network: dict | torch.nn.Module | None = None, outer_trade_decision: BaseTradeDecision | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs: Any) None
- 参数:
policy – 用于生成动作的强化学习策略
- 重置(outer_trade_decision: BaseTradeDecision | 无 = 无, **kwargs: 任意类型) 无返回值
重置 level_infra,用于重置交易日历等
重置 common_infra,用于重置 trade_account、trade_exchange 等
重置 outer_trade_decision,用于做出拆分决策
注意:将此函数拆分为 reset 和 _reset 可使以下情况更加方便:1. 用户希望通过重写 reset 来初始化其策略,但又不希望影响初始化时调用的 _reset
工具
- 类 qlib.rl.utils.日志级别(值)
强化学习训练的日志级别。每个日志级别的处理行为取决于
LogWriter
的实现。- DEBUG = 10
如果你只希望在调试模式下查看该指标。
- PERIODIC = 20
如果你希望周期性地查看该指标。
- INFO = 30
重要的日志消息。
- CRITICAL = 40
LogWriter 应始终处理 CRITICAL 级别的消息。
- 类 qlib.rl.utils.DataQueue(dataset: Sequence[T], repeat: int = 1, shuffle: bool = True, producer_num_workers: int = 0, queue_maxsize: int = 0)
主进程(生产者)生成数据并将其存储在队列中。子进程(消费者)可以从队列中获取数据点。数据点通过从
dataset
中读取项目来生成。DataQueue
是临时的。当repeat
耗尽后,必须创建一个新的 DataQueue。更多背景信息,请参见
qlib.rl.utils.FiniteVectorEnv
的文档。- 参数:
dataset – 读取数据的数据集。必须实现
__len__
和__getitem__
。repeat – 遍历数据点的次数。使用
-1
表示无限次遍历。shuffle – 如果
shuffle
为真,则项目将以随机顺序读取。producer_num_workers – 用于数据加载的并发工作进程数。
queue_maxsize – 队列满载前可容纳的最大项目数。
示例
>>> data_queue = DataQueue(my_dataset) >>> with data_queue: ... ...
在工作进程中:
>>> for data in data_queue: ... print(data)
- __init__(dataset: Sequence[T], repeat: int = 1, shuffle: bool = True, producer_num_workers: int = 0, queue_maxsize: int = 0) None
- 类 qlib.rl.utils.EnvWrapper(simulator_fn: Callable[..., Simulator[InitialStateType, StateType, ActType]], state_interpreter: StateInterpreter[StateType, ObsType], action_interpreter: ActionInterpreter[StateType, PolicyActType, ActType], seed_iterator: Iterable[InitialStateType] | None, Reward | None = None, aux_info_collector: AuxiliaryInfoCollector[StateType, Any] | None = None, logger: LogCollector | None = None)
基于 Qlib 的强化学习环境,继承自
gym.Env
。该类封装了模拟器、状态解释器、动作解释器和奖励函数等组件。这是在强化学习训练中模拟器 - 解释器 - 策略框架的结构示意图。除策略外的所有组件都需要被组合成一个称为“环境”的单一对象。该“环境”会被复制到多个工作进程中,在(至少 tianshou 的实现中),一个单一的策略(智能体)会同时与一批环境进行交互。
- 参数:
simulator_fn – 一个可调用对象,作为模拟器工厂。当存在
seed_iterator
时,该工厂应接受一个参数,即种子(也就是初始状态);否则应不接受任何参数。state_interpreter – 状态到观测值的转换器。
action_interpreter – 策略与模拟器之间的动作转换器。
seed_iterator (str | Iterator[InitialStateType] | None) – 一个可迭代的种子序列。借助
qlib.rl.utils.DataQueue
,不同进程中的环境工作器可以共享同一个seed_iterator
。reward_fn – 一个可调用对象,接收 StateType 类型输入并返回一个浮点数(至少在单智能体情况下如此)。
aux_info_collector – 收集辅助信息,可能在多智能体强化学习(MARL)中有用。
logger – 日志收集器,用于收集日志信息。收集到的日志通过
env.step()
的返回值传回主进程。
- __init__(simulator_fn: 可调用对象[..., 模拟器[初始状态类型, 状态类型, 动作类型]], 状态解释器: StateInterpreter[状态类型, 观测类型], 动作解释器: ActionInterpreter[状态类型, 策略动作类型, 动作类型], 种子迭代器: 可迭代对象[初始状态类型] | 无, reward_fn: 奖励函数 | 无 = 无, 辅助信息收集器: AuxiliaryInfoCollector[状态类型, 任意类型] | 无 = 无, 日志记录器: LogCollector | 无 = 无) 无
- 重置(**kwargs: Any) 观测类型
尝试从状态队列中获取一个状态,并用该状态初始化模拟器。如果队列已耗尽,则生成一个无效的(nan)观测值。
- step(policy_action: PolicyActType, **kwargs: Any) Tuple[ObsType, float, bool, InfoDict]
环境的步进操作。
请结合代码和注释查看此处发生的事件序列。
- render(mode: str = 'human') None
根据环境初始化时指定的 render_mode 属性来计算渲染帧。
每个环境支持的模式集合各不相同。(某些第三方环境可能完全不支持渲染。)按照惯例,如果 render_mode 为:
None(默认):不进行任何渲染。
human:render 返回 None。环境会在当前显示器或终端中持续渲染,通常供人类查看。
rgb_array:返回一个表示环境当前状态的单帧图像。每一帧是一个形状为 (x, y, 3) 的 numpy.ndarray,表示 x×y 像素图像的 RGB 值。
rgb_array_list:返回自上次重置以来环境状态的一系列帧。每一帧都是一个形状为 (x, y, 3) 的 numpy.ndarray,与 rgb_array 相同。
ansi:返回一个字符串(str)或 StringIO.StringIO,其中包含每个时间步的终端风格文本表示。该文本可包含换行符和 ANSI 转义序列(例如用于颜色显示)。
注意
请确保你类的元数据中 'render_modes' 键包含所支持模式的列表。建议在实现中调用 super() 以利用此方法的功能。
- 类 qlib.rl.utils.日志收集器(min_loglevel: int | 日志级别 = LogLevel.PERIODIC)
日志首先在每个环境工作进程中收集,然后在向量环境中聚合到中央线程进行流式传输。
在
LogCollector
中,每个指标都会被添加到一个字典中,该字典需要在每一步通过reset()
方法重置。该字典通过env.step()
中的info
传递,并由向量环境中的LogWriter
解码。min_loglevel
用于优化目的:避免网络或管道中产生过多流量。- 重置() None
清除所有已收集的内容。
- add_string(name: str, string: str, loglevel: int | LogLevel = LogLevel.PERIODIC) None
将一个带名称的字符串添加到日志内容中。
- add_scalar(name: str, scalar: Any, loglevel: int | LogLevel = LogLevel.PERIODIC) None
将一个标量以指定名称添加到日志内容中。该标量将被转换为浮点数。
- 类 qlib.rl.utils.日志写入器(日志级别: int | 日志级别 = LogLevel.PERIODIC)
日志写入器的基类,由有限环境(finite env)在每次重置和步骤时触发。
如何处理特定日志取决于子类对
LogWriter
的实现。一般原则是:它应处理等于或高于其日志级别的日志,并丢弃无法处理的日志。例如,控制台日志记录器显然无法处理图像。- episode_count: int
回合计数器。
- step_count: int
步数计数器。
- active_env_ids: 集合[int]
向量环境中处于激活状态的环境 ID。
- global_step: int
步数计数器。在
clear
方法中不会被清除。
- global_episode: int
回合计数器。在
clear
方法中不会被清除。
- episode_lengths: 字典[int, int]
从环境 ID 到回合长度的映射。
- episode_rewards: 字典[int, 列表[float]]
从环境 ID 到回合总奖励的映射。
- episode_logs: 字典[int, list]
从环境 ID 到回合日志的映射。
- clear()
清除所有指标以重新开始,使日志记录器实例可重复使用。
- state_dict() dict
将日志记录器的状态保存为字典。
- load_state_dict(state_dict: dict) None
从字典中加载当前日志记录器的状态。
- 静态 聚合(数组: Sequence[Any], 名称: str | None = None) Any
从逐步到每幕的聚合函数。
如果是一个浮点数序列,则取均值;否则取第一个元素。
如果指定了名称,且
该名称为
reward
,则聚合方式为求和。
- log_episode(长度: int, 奖励: List[float], 内容: List[Dict[str, Any]]) None
在每条轨迹结束时触发。
- 参数:
length – 本轨迹的长度。
rewards – 本回合中每一步的奖励列表。
contents – 每一步记录的内容。
- log_step(reward: float, contents: Dict[str, Any]) None
在每一步都会触发此操作。
- 参数:
reward – 该步的奖励。
contents – 该步记录的内容。
- on_env_step(env_id: int, obs: ObsType, rew: float, done: bool, info: InfoDict) None
针对有限环境的每一步回调。
- on_env_reset(env_id: int, _: ObsType) None
有限环境的回调函数。
重置回合统计信息。此处不记录任何任务特定的日志,因为天授(tianshou)存在限制。
- on_env_all_ready() None
当所有环境都准备就绪时调用。通常在此处重置日志记录器。
- on_env_all_done() None
所有任务已完成,进行清理工作。
- qlib.rl.utils.vectorize_env(env_factory: Callable[..., gym.Env], env_type: FiniteEnvType, concurrency: int, logger: LogWriter | List[LogWriter]) FiniteVectorEnv
用于创建向量环境的辅助函数,可用来替代常规的 VectorEnv。
例如,以前你这样写:
DummyVectorEnv([lambda: gym.make(task) for _ in range(env_num)])
现在你可以将其替换为:
finite_env_factory(lambda: gym.make(task), "dummy", env_num, my_logger)
通过这种替换,相比普通 VectorEnv,你将获得两个额外功能:
向量环境会检查是否存在 NaN 观测值,一旦发现就会终止对应的工作进程。有关为何需要此功能,请参见
FiniteVectorEnv
。用于显式收集环境工作进程日志的日志记录器。
- 参数:
env_factory – 用于实例化单个
gym.Env
的可调用对象。所有并发工作进程将共享相同的env_factory
。env_type – dummy、subproc 或 shmem。对应于 tianshou 中的并行机制。
concurrency – 并发的环境工作进程数量。
logger – 日志写入器。
警告
请勿在
env_factory
中使用 lambda 表达式,因为它可能导致创建错误共享的实例。不要这样做:
vectorize_env(lambda: EnvWrapper(...), ...)
请这样做:
def env_factory(): ... vectorize_env(env_factory, ...)
- 类 qlib.rl.utils.ConsoleWriter(log_every_n_episode: int = 20, total_episodes: int | None = None, float_format: str = ':.4f', counter_format: str = ':4d', loglevel: int | LogLevel = LogLevel.PERIODIC)
定期将日志信息输出到控制台。
它会为每个指标维护一个平均值计量器,该值表示从上一次
clear()
调用至今的平均值。每个指标的显示格式为<名称> <最新值> (<平均值>)
。非单个数值的指标将自动被跳过。
- __init__(log_every_n_episode: int = 20, total_episodes: int | None = None, float_format: str = ':.4f', counter_format: str = ':4d', loglevel: int | LogLevel = LogLevel.PERIODIC) None
- 前缀: str
可以通过
writer.prefix
设置前缀。
- clear() None
清除所有指标以重新开始,使日志记录器实例可重复使用。
- log_episode(长度: int, 奖励: List[float], 内容: List[Dict[str, Any]]) None
在每条轨迹结束时触发。
- 参数:
length – 本轨迹的长度。
rewards – 本回合中每一步的奖励列表。
contents – 每一步记录的内容。
- 类 qlib.rl.utils.CsvWriter(output_dir: 路径, loglevel: 整数 | 日志级别 = LogLevel.PERIODIC)
将所有回合的指标转储到
result.csv
文件中。这不是正确的实现方式,仅用于第一轮迭代。
- clear() None
清除所有指标以重新开始,使日志记录器实例可重复使用。
- log_episode(长度: int, 奖励: List[float], 内容: List[Dict[str, Any]]) None
在每条轨迹结束时触发。
- 参数:
length – 本轨迹的长度。
rewards – 本回合中每一步的奖励列表。
contents – 每一步记录的内容。
- on_env_all_done() None
所有任务已完成,进行清理工作。
- 类 qlib.rl.utils.EnvWrapperStatus(*args, **kwargs)
这是 EnvWrapper 中使用的状态数据结构。这里的字段遵循强化学习(RL)的语义。例如,
obs
表示输入策略的观测值,action
表示策略返回的原始动作。
- 类 qlib.rl.utils.LogBuffer(callback: Callable[[bool, bool, LogBuffer], None], loglevel: int | LogLevel = LogLevel.PERIODIC)
将所有数值保存在内存中。
无法聚合的对象(如字符串、张量、图像)不能存储在缓冲区中。若需持久化这些对象,请使用
PickleWriter
。每次日志缓冲区接收到新指标时,都会触发回调函数,这在训练器内部跟踪指标时非常有用。
- 参数:
callback –
一个接收三个参数的回调函数:
on_episode:是否在回合结束时调用
on_collect:是否在收集结束时调用
log_buffer:
LogBuffer
对象
不期望有返回值。
- state_dict() dict
将日志记录器的状态保存为字典。
- load_state_dict(state_dict: dict) None
从字典中加载当前日志记录器的状态。
- clear()
清除所有指标以重新开始,使日志记录器实例可重复使用。
- log_episode(length: 整数, rewards: 列表[浮点数], contents: 列表[字典[字符串, 任意类型]]) 无
在每条轨迹结束时触发。
- 参数:
length – 本轨迹的长度。
rewards – 本回合中每一步的奖励列表。
contents – 每一步记录的内容。
- on_env_all_done() None
所有任务已完成,进行清理工作。
- episode_metrics() dict[str, float]
获取最新一集的数值指标。
- collect_metrics() dict[str, float]
检索最新一次收集的聚合指标。
- episode_count: int
回合计数器。
- step_count: int
步数计数器。
- global_step: int
步数计数器。在
clear
方法中不会被清除。
- global_episode: int
回合计数器。在
clear
方法中不会被清除。
- active_env_ids: 集合[int]
向量环境中处于激活状态的环境 ID。
- episode_lengths: 字典[int, int]
从环境 ID 到回合长度的映射。
- episode_rewards: 字典[int, 列表[float]]
从环境 ID 到回合总奖励的映射。
- episode_logs: 字典[int, list]
从环境 ID 到回合日志的映射。