在线服务
简介

除了回测之外,测试模型有效性的一种方法是在真实市场条件下进行预测,甚至根据这些预测执行实盘交易。在线 服务
是一组利用最新数据进行在线模型推理的模块,包括 在线管理器、在线策略、在线工具 和 更新器。
这里提供了一些示例供参考,展示了 在线服务
的不同功能。如果你有许多模型或需要管理 任务,请考虑使用 任务管理。示例 基于 任务管理 中的一些组件,例如 TrainerRM
或 Collector
。
注意:用户应确保其数据源保持更新,以支持在线服务。例如,Qlib 提供了 一批脚本 来帮助用户更新 Yahoo 日频数据。
当前已知限制——目前仅支持每日更新下一个交易日的预测,但由于 公开数据的限制 <https://github.com/microsoft/qlib/issues/215#issuecomment-766293563>_,尚不支持生成下一个交易日的交易订单。
在线管理器
OnlineManager 可以管理一组 在线策略 并动态运行它们。
随着时间的推移,决策模型也可能发生变化。在本模块中,我们将这些持续贡献的模型称为 在线 模型。在每个周期(例如每天或每分钟)中,在线 模型可能会更新,其预测结果也需要相应刷新。因此,本模块提供了一系列方法来控制这一过程。
该模块还提供了一种在历史数据中模拟在线策略的方法,这意味着你可以验证自己的策略或寻找更优的策略。
在不同场景下使用不同训练器共有四种情况:
场景 |
描述 |
---|---|
在线 + 训练器(Trainer) |
当你希望执行一个真实的运行流程时,训练器将帮助你训练模型。它会按任务和策略逐一进行模型训练。 |
在线 + 延迟训练器(DelayTrainer) |
延迟训练器会跳过具体的训练步骤,直到所有策略都准备好了各自的任务。这使得用户可以在 routine 或 first_train 的末尾集中并行训练所有任务。否则,每当各个策略准备任务时,这些函数可能会被阻塞。 |
模拟 + 训练器(Trainer) |
其行为方式与 在线 + 训练器 相同,唯一的区别是它用于模拟/回测,而非在线交易。 |
模拟 + 延迟训练器(DelayTrainer) |
当你的模型没有时间依赖性时,可以使用延迟训练器以实现多任务并行处理。这意味着在模拟结束时,所有流程中的任务都可以真正地完成训练。信号将在不同的时间段内被妥善准备(取决于是否有新模型上线)。 |
以下是一些伪代码,用于展示每种情况的工作流程
- 为简化说明
策略中仅使用了一个策略
update_online_pred 仅在在线模式下调用,在其他情况下被忽略
在线 + 训练器(Trainer)
tasks = first_train()
models = trainer.train(tasks)
trainer.end_train(models)
for day in online_trading_days:
# OnlineManager.routine
models = trainer.train(strategy.prepare_tasks()) # for each strategy
strategy.prepare_online_models(models) # for each strategy
trainer.end_train(models)
prepare_signals() # prepare trading signals daily
在线 + 延迟训练器:工作流程与 在线 + 训练器 相同。
模拟 + 延迟训练器(DelayTrainer)
# simulate
tasks = first_train()
models = trainer.train(tasks)
for day in historical_calendars:
# OnlineManager.routine
models = trainer.train(strategy.prepare_tasks()) # for each strategy
strategy.prepare_online_models(models) # for each strategy
# delay_prepare()
# FIXME: Currently the delay_prepare is not implemented in a proper way.
trainer.end_train(<for all previous models>)
prepare_signals()
我们能否简化当前的工作流程?
能否减少任务的状态数量?
对于每个任务,我们有三个阶段(即:任务、部分训练后的任务、最终训练完成的任务)
- 类 qlib.workflow.online.manager.OnlineManager(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')
OnlineManager 可以通过在线策略管理在线模型。它还提供了历史记录功能,用于记录哪些模型在何时处于在线状态。
- __init__(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')
初始化 OnlineManager。每个 OnlineManager 至少需要一个 OnlineStrategy。
- 参数:
strategies (Union[OnlineStrategy, List[OnlineStrategy]]) – 一个 OnlineStrategy 实例或 OnlineStrategy 的列表
begin_time (Union[str,pd.Timestamp], 可选) – OnlineManager 将从该时间开始运行。默认为 None,表示使用最新日期。
trainer (qlib.model.trainer.Trainer) – 用于训练任务的训练器。若为 None,则使用 TrainerR。
freq (str, 可选) – 数据频率。默认为 “day”。
- first_train(strategies: List[OnlineStrategy] | None = None, model_kwargs: dict = {})
从每个策略的 first_tasks 方法中获取任务并进行训练。如果使用 DelayTrainer,可以在所有策略的 first_tasks 执行完毕后一并完成训练。
- 参数:
strategies (List[OnlineStrategy]) – 策略列表(添加策略时需要此参数)。若为 None,则使用默认策略。
model_kwargs (dict) – prepare_online_models 所需的参数
- routine(cur_time: str | Timestamp | None = None, task_kwargs: dict = {}, model_kwargs: dict = {}, signal_kwargs: dict = {})
每个策略的典型更新流程,并记录在线历史。
例行操作后的典型更新流程,例如按天或按月进行。该流程包括:更新预测 -> 准备任务 -> 准备在线模型 -> 准备信号。
如果使用 DelayTrainer,可以在每个策略执行完 prepare_tasks 后统一完成训练。
- 参数:
cur_time (Union[str,pd.Timestamp], 可选) – 在指定时间运行该例行方法。默认为 None。
task_kwargs (dict) – 传递给 prepare_tasks 方法的参数。
model_kwargs (dict) – prepare_online_models 所需的参数
signal_kwargs (dict) – 传递给 prepare_signals 方法的参数。
- get_collector(**kwargs) 合并收集器
获取 Collector 实例,用于收集每个策略的结果。该收集器可作为信号准备的基础。
- 参数:
**kwargs – get_collector 方法的参数。
- 返回:
用于合并其他收集器的收集器。
- 返回类型:
- add_strategy(strategies: 在线策略 | 列表[在线策略])
向 OnlineManager 添加一些新策略。
- 参数:
strategy (Union[OnlineStrategy, List[OnlineStrategy]]) – OnlineStrategy 的列表
- prepare_signals(prepare_func: ~typing.Callable = <qlib.model.ens.ensemble.AverageEnsemble object>, over_write=False)
在完成上一个周期(箱线图中的一个箱子)的数据准备后,即该周期结束时,我们可以为下一个周期准备交易信号。
注意:给定一组预测结果时,所有早于这些预测结束时间的信号都将被妥善准备。
即使最新的信号已存在,最新的计算结果也会将其覆盖。
注意
给定某一时刻的预测,所有早于该时刻的信号都将被妥善准备。
- 参数:
prepare_func (Callable, 可选) – 从收集后的字典中获取信号。默认为 AverageEnsemble(),MergeCollector 收集的结果必须是 {xxx: pred} 格式。
over_write (bool, 可选) – 如果为 True,新信号将覆盖旧信号;如果为 False,新信号将追加到现有信号末尾。默认为 False。
- 返回:
这些信号。
- 返回类型:
pd.DataFrame
- get_signals() 序列 | DataFrame
获取已准备好的在线信号。
- 返回:
若每个时间点仅有一个信号,则返回 pd.Series;若有多个信号(例如买入和卖出操作使用不同的交易信号),则返回 pd.DataFrame。
- 返回类型:
Union[pd.Series, pd.DataFrame]
- simulate(end_time=None, frequency='day', task_kwargs={}, model_kwargs={}, signal_kwargs={}) Series | DataFrame
从当前时间开始,该方法将模拟 OnlineManager 中的每一个例行任务,直到结束时间。
考虑到并行训练,模型和信号可以在所有例行模拟完成后进行准备。
延迟训练方式可以是
DelayTrainer
,而延迟准备信号的方式可以是delay_prepare
。- 参数:
end_time – 仿真结束的时间
frequency – 日历频率
task_kwargs (dict) – 传递给 prepare_tasks 方法的参数。
model_kwargs (dict) – prepare_online_models 所需的参数
signal_kwargs (dict) – 传递给 prepare_signals 方法的参数。
- 返回:
若每个时间点仅有一个信号,则返回 pd.Series;若有多个信号(例如买入和卖出操作使用不同的交易信号),则返回 pd.DataFrame。
- 返回类型:
Union[pd.Series, pd.DataFrame]
- delay_prepare(model_kwargs={}, signal_kwargs={})
如果存在等待准备的模型或信号,则准备所有模型和信号。
- 参数:
model_kwargs – 传递给 end_train 的参数
signal_kwargs – 传递给 prepare_signals 的参数
在线策略
OnlineStrategy 模块是在线服务的一个组成部分。
- 类 qlib.workflow.online.strategy。在线策略(name_id: str)
OnlineStrategy 与 Online Manager 协同工作,决定任务如何生成、模型如何更新以及信号如何准备。
- __init__(name_id: str)
初始化 OnlineStrategy。该模块必须使用 Trainer 来完成模型训练。
- 参数:
name_id (str) – 唯一的名称或 ID。
trainer (qlib.model.trainer.Trainer, 可选) – Trainer 的一个实例。默认为 None。
- prepare_tasks(cur_time, **kwargs) List[dict]
在一个例行任务结束后,检查是否需要基于当前时间(None 表示最新时间)准备和训练一些新任务。返回等待训练的新任务列表。
你可以通过 OnlineTool.online_models 找到最新的在线模型。
- prepare_online_models(trained_models, cur_time=None) List[object]
从已训练的模型中选择一些模型并将其设置为在线模型。这是一个典型的实现,用于将所有已训练的模型上线,你可以重写该方法以实现更复杂的逻辑。如果你仍需要之前的在线模型,可以通过 OnlineTool.online_models 获取。
注意:将所有在线模型重置为已训练模型。如果没有已训练的模型,则不执行任何操作。
- 注意:
当前的实现非常简单。以下是一个更复杂的情况,更接近实际应用场景:1. 在 test_start 前一天(时间戳 T)训练新模型;2. 在 test_start 时刻(通常为时间戳 T + 1)切换模型。
- 参数:
models (list) – 模型列表。
cur_time (pd.Dataframe) – 来自 OnlineManager 的当前时间。若为 None,则表示使用最新时间。
- 返回:
一个包含在线模型的列表。
- 返回类型:
List[object]
- first_tasks() 列表[dict]
首先生成一系列任务并返回它们。
- class qlib.workflow.online.strategy.RollingStrategy(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)
此示例策略始终使用最新的滚动模型 sas 在线模型。
- __init__(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)
初始化 RollingStrategy。
假设:name_id 的字符串、实验名称以及训练器的实验名称相同。
- 参数:
name_id (str) – 唯一的名称或 ID,也将作为实验的名称。
task_template (Union[dict, List[dict]]) – 一个任务模板列表或单个模板,将用于通过 rolling_gen 生成多个任务。
rolling_gen (RollingGen) – RollingGen 的一个实例
- get_collector(process_list=[<qlib.model.ens.group.RollingGroup 对象>], rec_key_func=None, rec_filter_func=None, artifacts_key=None)
获取 Collector 实例以收集结果。返回的收集器必须能够区分不同模型的结果。
假设:模型可以通过模型名称和滚动测试时间段加以区分。如果您不希望依赖此假设,请实现您自己的方法或使用其他 rec_key_func。
- 参数:
rec_key_func (Callable) – 用于获取记录器(recorder)键的函数。如果为 None,则使用 recorder 的 ID。
rec_filter_func (Callable, 可选) – 通过返回 True 或 False 来过滤 recorder。默认为 None。
artifacts_key (List[str], 可选) – 您想要获取的 artifacts 键。如果为 None,则获取所有 artifacts。
- first_tasks() 列表[dict]
使用 rolling_gen 基于 task_template 生成不同的任务。
- 返回:
任务列表
- 返回类型:
List[dict]
- prepare_tasks(cur_time) 列表[dict]
基于 cur_time 准备新任务(None 表示最新时间)。
您可以通过 OnlineToolR.online_models 找到最新的在线模型。
- 返回:
一个包含新任务的列表。
- 返回类型:
List[dict]
在线工具
OnlineTool 是一个用于设置和取消设置一系列 在线 模型的模块。在线 模型是在某些时间点具有决定性的模型,可随时间变化而更新。这使得我们能够随着市场风格的变化使用高效的子模型。
- 类 qlib.workflow.online.utils.在线工具
OnlineTool 将管理包含模型记录器的实验中的 在线 模型。
- __init__()
初始化 OnlineTool。
- set_online_tag(tag, recorder: list | object)
将标签 tag 设置给模型,以标识其是否在线。
- 参数:
tag (str) – 来自 ONLINE_TAG、OFFLINE_TAG 的标签
recorder (Union[list,object]) – 模型的记录器(recorder)
- get_online_tag(recorder: object) str
根据模型记录器返回其在线标签。
- 参数:
recorder (Object) – 模型的记录器
- 返回:
在线标签
- 返回类型:
str
- reset_online_tag(recorder: list | object)
将所有模型设为离线,并将其记录器状态重置为‘在线’。
- 参数:
recorder (Union[list,object]) – 要重置为‘在线’状态的记录器
- online_models() list
获取当前处于 在线 状态的模型
- 返回:
一个包含所有 在线 模型的列表。
- 返回类型:
list
- update_online_pred(to_date=None)
将 在线 模型的预测更新到 to_date。
- 参数:
to_date (pd.Timestamp) – 此日期之前的预测将被更新。若为 None,则更新至最新。
- 类 qlib.workflow.online.utils.在线工具 R(default_exp_name: str | None = None)
基于 (R)ecorder 实现的 OnlineTool。
- __init__(default_exp_name: str | None = None)
初始化 OnlineToolR。
- 参数:
default_exp_name (str) – 默认实验名称。
- set_online_tag(tag, recorder: Recorder | List)
将标签 tag 设置到模型的记录器中,以标识其是否在线。
- 参数:
tag (str) – 可选标签包括 ONLINE_TAG、NEXT_ONLINE_TAG、OFFLINE_TAG
recorder (Union[Recorder, List]) – Recorder 对象的列表或单个 Recorder 实例
- get_online_tag(recorder: 记录器) str
根据模型记录器返回其在线标签。
- 参数:
recorder (Recorder) – 一个 recorder 实例
- 返回:
在线标签
- 返回类型:
str
- reset_online_tag(recorder: Recorder | List, exp_name: str | None = None)
将所有模型设为离线,并将其记录器状态重置为‘在线’。
- 参数:
recorder (Union[Recorder, List]) – 您想要重置为“在线”状态的记录器。
exp_name (str) – 实验名称。如果为 None,则使用默认实验名称。
- online_models(exp_name: str | None = None) list
获取当前处于 在线 状态的模型
- 参数:
exp_name (str) – 实验名称。如果为 None,则使用默认实验名称。
- 返回:
一个包含所有 在线 模型的列表。
- 返回类型:
list
- update_online_pred(to_date=None, from_date=None, exp_name: str | None = None)
将在线模型的预测更新至指定日期。
- 参数:
to_date (pd.Timestamp) – 该日期之前的预测将被更新。若为 None,则更新至日历中的最新时间。
exp_name (str) – 实验名称。如果为 None,则使用默认实验名称。
更新器
Updater 是一个用于在股票数据更新时,更新预测等产物的模块。
- 类 qlib.workflow.online.update.RMD 加载器(rec: 记录器)
记录器 模型 数据集 加载器
- __init__(rec: 记录器)
- get_dataset(start_time, end_time, segments=None, unprepared_dataset: DatasetH | None = None) DatasetH
加载、配置并设置数据集。
该数据集用于推理。
- 参数:
start_time – 底层数据的起始时间
end_time – 底层数据的结束时间
segments – 字典类型,表示数据集的分段配置。由于是时间序列数据集(TSDatasetH),测试分段可能与 start_time 和 end_time 不同
unprepared_dataset – 可选[DatasetH],如果用户不希望从记录器加载数据集,请指定用户的数据集
- 返回:
DatasetH 的实例
- 返回类型:
- class qlib.workflow.online.update.RecordUpdater(record: Recorder, *args, **kwargs)
更新特定的记录器
- __init__(record: Recorder, *args, **kwargs)
- 抽象 update(*args, **kwargs)
更新特定记录器的信息
- 类 qlib.workflow.online.update.DSBasedUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
基于数据集的更新器
为基于 Qlib 数据集的数据提供更新功能
假设
基于 Qlib 数据集
待更新的数据是一个多级索引的 pd.DataFrame,例如标签、预测值。
LABEL0 datetime instrument 2021-05-10 SH600000 0.006965 SH600004 0.003407 ... ... 2021-05-28 SZ300498 0.015748 SZ300676 -0.001321
- __init__(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
初始化 PredUpdater。
以下情况的预期行为:
如果 to_date 大于日历中的最大日期,则数据将更新到最新日期
如果存在 from_date 之前或 to_date 之后的数据,则仅影响 from_date 和 to_date 之间的数据。
- 参数:
record – 记录器(Recorder)
to_date –
将预测更新到 to_date
如果 to_date 为 None:
数据将更新到最新日期。
from_date –
更新将从 from_date 开始
如果 from_date 为 None:
更新将在历史数据中最新日期的下一个时间点进行
hist_ref –
int 有时,数据集会存在历史依赖。此参数由用户指定历史依赖的长度。如果用户未指定该参数,Updater 将尝试加载数据集以自动确定 hist_ref
注意
起始时间不包含在 hist_ref 内;因此在大多数情况下,hist_ref 等于 step_len - 1
loader_cls – type 用于加载模型和数据集的类
- prepare_data(unprepared_dataset: DatasetH | None = None) DatasetH
加载数据集 - 如果指定了 unprepared_dataset,则直接准备数据集 - 否则,
将此功能分离将使数据集更易于复用
- 返回:
DatasetH 的实例
- 返回类型:
- update(dataset: DatasetH | None = None, write: bool = True, ret_new: bool = False) object | None
- 参数:
数据集 (DatasetH) – DatasetH:DatasetH 的实例。若为 None,则重新准备。
写入 (布尔值) – 是否执行写入操作
返回新数据 (布尔值) – 是否返回更新后的数据
- 返回:
更新后的数据集
- 返回类型:
可选[对象]
- 抽象 get_update_data(dataset: 数据集) DataFrame
根据给定的数据集返回更新后的数据
get_update_data 与 update 的区别 — update_date 仅包含某些特定于数据的特性,而 update 包含一些通用的常规步骤(例如准备数据集、检查等)
- 类 qlib.workflow.online.update.PredUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <类 'qlib.workflow.online.update.RMDLoader'>)
更新记录器(Recorder)中的预测结果
- get_update_data(dataset: 数据集) DataFrame
根据给定的数据集返回更新后的数据
get_update_data 与 update 的区别 — update_date 仅包含某些特定于数据的特性,而 update 包含一些通用的常规步骤(例如准备数据集、检查等)
- 类 qlib.workflow.online.update.LabelUpdater(record: Recorder, to_date=None, **kwargs)
更新记录器中的标签
假设 - 标签由 record_temp.SignalRecord 生成。
- __init__(record: Recorder, to_date=None, **kwargs)
初始化 PredUpdater。
以下情况的预期行为:
如果 to_date 大于日历中的最大日期,则数据将更新到最新日期
如果存在 from_date 之前或 to_date 之后的数据,则仅影响 from_date 和 to_date 之间的数据。
- 参数:
record – 记录器(Recorder)
to_date –
将预测更新到 to_date
如果 to_date 为 None:
数据将更新到最新日期。
from_date –
更新将从 from_date 开始
如果 from_date 为 None:
更新将在历史数据中最新日期的下一个时间点进行
hist_ref –
int 有时,数据集会存在历史依赖。此参数由用户指定历史依赖的长度。如果用户未指定该参数,Updater 将尝试加载数据集以自动确定 hist_ref
注意
起始时间不包含在 hist_ref 内;因此在大多数情况下,hist_ref 等于 step_len - 1
loader_cls – type 用于加载模型和数据集的类
- get_update_data(dataset: 数据集) DataFrame
根据给定的数据集返回更新后的数据
get_update_data 与 update 的区别 — update_date 仅包含某些特定于数据的特性,而 update 包含一些通用的常规步骤(例如准备数据集、检查等)