在线服务

简介

../_images/online_serving.png

除了回测之外,测试模型有效性的一种方法是在真实市场条件下进行预测,甚至根据这些预测执行实盘交易。在线 服务 是一组利用最新数据进行在线模型推理的模块,包括 在线管理器在线策略在线工具更新器

这里提供了一些示例供参考,展示了 在线服务 的不同功能。如果你有许多模型或需要管理 任务,请考虑使用 任务管理示例 基于 任务管理 中的一些组件,例如 TrainerRMCollector

注意:用户应确保其数据源保持更新,以支持在线服务。例如,Qlib 提供了 一批脚本 来帮助用户更新 Yahoo 日频数据。

当前已知限制——目前仅支持每日更新下一个交易日的预测,但由于 公开数据的限制 <https://github.com/microsoft/qlib/issues/215#issuecomment-766293563>_,尚不支持生成下一个交易日的交易订单。

在线管理器

OnlineManager 可以管理一组 在线策略 并动态运行它们。

随着时间的推移,决策模型也可能发生变化。在本模块中,我们将这些持续贡献的模型称为 在线 模型。在每个周期(例如每天或每分钟)中,在线 模型可能会更新,其预测结果也需要相应刷新。因此,本模块提供了一系列方法来控制这一过程。

该模块还提供了一种在历史数据中模拟在线策略的方法,这意味着你可以验证自己的策略或寻找更优的策略。

在不同场景下使用不同训练器共有四种情况:

场景

描述

在线 + 训练器(Trainer)

当你希望执行一个真实的运行流程时,训练器将帮助你训练模型。它会按任务和策略逐一进行模型训练。

在线 + 延迟训练器(DelayTrainer)

延迟训练器会跳过具体的训练步骤,直到所有策略都准备好了各自的任务。这使得用户可以在 routinefirst_train 的末尾集中并行训练所有任务。否则,每当各个策略准备任务时,这些函数可能会被阻塞。

模拟 + 训练器(Trainer)

其行为方式与 在线 + 训练器 相同,唯一的区别是它用于模拟/回测,而非在线交易。

模拟 + 延迟训练器(DelayTrainer)

当你的模型没有时间依赖性时,可以使用延迟训练器以实现多任务并行处理。这意味着在模拟结束时,所有流程中的任务都可以真正地完成训练。信号将在不同的时间段内被妥善准备(取决于是否有新模型上线)。

以下是一些伪代码,用于展示每种情况的工作流程

为简化说明
  • 策略中仅使用了一个策略

  • update_online_pred 仅在在线模式下调用,在其他情况下被忽略

  1. 在线 + 训练器(Trainer)

tasks = first_train()
models = trainer.train(tasks)
trainer.end_train(models)
for day in online_trading_days:
    # OnlineManager.routine
    models = trainer.train(strategy.prepare_tasks())  # for each strategy
    strategy.prepare_online_models(models)  # for each strategy

    trainer.end_train(models)
    prepare_signals()  # prepare trading signals daily

在线 + 延迟训练器:工作流程与 在线 + 训练器 相同。

  1. 模拟 + 延迟训练器(DelayTrainer)

# simulate
tasks = first_train()
models = trainer.train(tasks)
for day in historical_calendars:
    # OnlineManager.routine
    models = trainer.train(strategy.prepare_tasks())  # for each strategy
    strategy.prepare_online_models(models)  # for each strategy
# delay_prepare()
# FIXME: Currently the delay_prepare is not implemented in a proper way.
trainer.end_train(<for all previous models>)
prepare_signals()

我们能否简化当前的工作流程?

  • 能否减少任务的状态数量?

    • 对于每个任务,我们有三个阶段(即:任务、部分训练后的任务、最终训练完成的任务)

qlib.workflow.online.manager.OnlineManager(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')

OnlineManager 可以通过在线策略管理在线模型。它还提供了历史记录功能,用于记录哪些模型在何时处于在线状态。

__init__(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')

初始化 OnlineManager。每个 OnlineManager 至少需要一个 OnlineStrategy。

参数:
  • strategies (Union[OnlineStrategy, List[OnlineStrategy]]) – 一个 OnlineStrategy 实例或 OnlineStrategy 的列表

  • begin_time (Union[str,pd.Timestamp], 可选) – OnlineManager 将从该时间开始运行。默认为 None,表示使用最新日期。

  • trainer (qlib.model.trainer.Trainer) – 用于训练任务的训练器。若为 None,则使用 TrainerR。

  • freq (str, 可选) – 数据频率。默认为 “day”。

first_train(strategies: List[OnlineStrategy] | None = None, model_kwargs: dict = {})

从每个策略的 first_tasks 方法中获取任务并进行训练。如果使用 DelayTrainer,可以在所有策略的 first_tasks 执行完毕后一并完成训练。

参数:
  • strategies (List[OnlineStrategy]) – 策略列表(添加策略时需要此参数)。若为 None,则使用默认策略。

  • model_kwargs (dict) – prepare_online_models 所需的参数

routine(cur_time: str | Timestamp | None = None, task_kwargs: dict = {}, model_kwargs: dict = {}, signal_kwargs: dict = {})

每个策略的典型更新流程,并记录在线历史。

例行操作后的典型更新流程,例如按天或按月进行。该流程包括:更新预测 -> 准备任务 -> 准备在线模型 -> 准备信号。

如果使用 DelayTrainer,可以在每个策略执行完 prepare_tasks 后统一完成训练。

参数:
  • cur_time (Union[str,pd.Timestamp], 可选) – 在指定时间运行该例行方法。默认为 None。

  • task_kwargs (dict) – 传递给 prepare_tasks 方法的参数。

  • model_kwargs (dict) – prepare_online_models 所需的参数

  • signal_kwargs (dict) – 传递给 prepare_signals 方法的参数。

get_collector(**kwargs) 合并收集器

获取 Collector 实例,用于收集每个策略的结果。该收集器可作为信号准备的基础。

参数:

**kwargs – get_collector 方法的参数。

返回:

用于合并其他收集器的收集器。

返回类型:

合并收集器

add_strategy(strategies: 在线策略 | 列表[在线策略])

向 OnlineManager 添加一些新策略。

参数:

strategy (Union[OnlineStrategy, List[OnlineStrategy]]) – OnlineStrategy 的列表

prepare_signals(prepare_func: ~typing.Callable = <qlib.model.ens.ensemble.AverageEnsemble object>, over_write=False)

在完成上一个周期(箱线图中的一个箱子)的数据准备后,即该周期结束时,我们可以为下一个周期准备交易信号。

注意:给定一组预测结果时,所有早于这些预测结束时间的信号都将被妥善准备。

即使最新的信号已存在,最新的计算结果也会将其覆盖。

注意

给定某一时刻的预测,所有早于该时刻的信号都将被妥善准备。

参数:
  • prepare_func (Callable, 可选) – 从收集后的字典中获取信号。默认为 AverageEnsemble(),MergeCollector 收集的结果必须是 {xxx: pred} 格式。

  • over_write (bool, 可选) – 如果为 True,新信号将覆盖旧信号;如果为 False,新信号将追加到现有信号末尾。默认为 False。

返回:

这些信号。

返回类型:

pd.DataFrame

get_signals() 序列 | DataFrame

获取已准备好的在线信号。

返回:

若每个时间点仅有一个信号,则返回 pd.Series;若有多个信号(例如买入和卖出操作使用不同的交易信号),则返回 pd.DataFrame。

返回类型:

Union[pd.Series, pd.DataFrame]

simulate(end_time=None, frequency='day', task_kwargs={}, model_kwargs={}, signal_kwargs={}) Series | DataFrame

从当前时间开始,该方法将模拟 OnlineManager 中的每一个例行任务,直到结束时间。

考虑到并行训练,模型和信号可以在所有例行模拟完成后进行准备。

延迟训练方式可以是 DelayTrainer,而延迟准备信号的方式可以是 delay_prepare

参数:
  • end_time – 仿真结束的时间

  • frequency – 日历频率

  • task_kwargs (dict) – 传递给 prepare_tasks 方法的参数。

  • model_kwargs (dict) – prepare_online_models 所需的参数

  • signal_kwargs (dict) – 传递给 prepare_signals 方法的参数。

返回:

若每个时间点仅有一个信号,则返回 pd.Series;若有多个信号(例如买入和卖出操作使用不同的交易信号),则返回 pd.DataFrame。

返回类型:

Union[pd.Series, pd.DataFrame]

delay_prepare(model_kwargs={}, signal_kwargs={})

如果存在等待准备的模型或信号,则准备所有模型和信号。

参数:
  • model_kwargs – 传递给 end_train 的参数

  • signal_kwargs – 传递给 prepare_signals 的参数

在线策略

OnlineStrategy 模块是在线服务的一个组成部分。

qlib.workflow.online.strategy。在线策略(name_id: str)

OnlineStrategy 与 Online Manager 协同工作,决定任务如何生成、模型如何更新以及信号如何准备。

__init__(name_id: str)

初始化 OnlineStrategy。该模块必须使用 Trainer 来完成模型训练。

参数:
  • name_id (str) – 唯一的名称或 ID。

  • trainer (qlib.model.trainer.Trainer, 可选) – Trainer 的一个实例。默认为 None。

prepare_tasks(cur_time, **kwargs) List[dict]

在一个例行任务结束后,检查是否需要基于当前时间(None 表示最新时间)准备和训练一些新任务。返回等待训练的新任务列表。

你可以通过 OnlineTool.online_models 找到最新的在线模型。

prepare_online_models(trained_models, cur_time=None) List[object]

从已训练的模型中选择一些模型并将其设置为在线模型。这是一个典型的实现,用于将所有已训练的模型上线,你可以重写该方法以实现更复杂的逻辑。如果你仍需要之前的在线模型,可以通过 OnlineTool.online_models 获取。

注意:将所有在线模型重置为已训练模型。如果没有已训练的模型,则不执行任何操作。

注意

当前的实现非常简单。以下是一个更复杂的情况,更接近实际应用场景:1. 在 test_start 前一天(时间戳 T)训练新模型;2. 在 test_start 时刻(通常为时间戳 T + 1)切换模型。

参数:
  • models (list) – 模型列表。

  • cur_time (pd.Dataframe) – 来自 OnlineManager 的当前时间。若为 None,则表示使用最新时间。

返回:

一个包含在线模型的列表。

返回类型:

List[object]

first_tasks() 列表[dict]

首先生成一系列任务并返回它们。

get_collector() 收集器

获取 Collector 的实例,用于收集该策略的不同结果。

例如:
  1. 在 Recorder 中收集预测结果

  2. 在文本文件中收集信号

返回:

收集器

class qlib.workflow.online.strategy.RollingStrategy(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)

此示例策略始终使用最新的滚动模型 sas 在线模型。

__init__(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)

初始化 RollingStrategy。

假设:name_id 的字符串、实验名称以及训练器的实验名称相同。

参数:
  • name_id (str) – 唯一的名称或 ID,也将作为实验的名称。

  • task_template (Union[dict, List[dict]]) – 一个任务模板列表或单个模板,将用于通过 rolling_gen 生成多个任务。

  • rolling_gen (RollingGen) – RollingGen 的一个实例

get_collector(process_list=[<qlib.model.ens.group.RollingGroup 对象>], rec_key_func=None, rec_filter_func=None, artifacts_key=None)

获取 Collector 实例以收集结果。返回的收集器必须能够区分不同模型的结果。

假设:模型可以通过模型名称和滚动测试时间段加以区分。如果您不希望依赖此假设,请实现您自己的方法或使用其他 rec_key_func。

参数:
  • rec_key_func (Callable) – 用于获取记录器(recorder)键的函数。如果为 None,则使用 recorder 的 ID。

  • rec_filter_func (Callable, 可选) – 通过返回 True 或 False 来过滤 recorder。默认为 None。

  • artifacts_key (List[str], 可选) – 您想要获取的 artifacts 键。如果为 None,则获取所有 artifacts。

first_tasks() 列表[dict]

使用 rolling_gen 基于 task_template 生成不同的任务。

返回:

任务列表

返回类型:

List[dict]

prepare_tasks(cur_time) 列表[dict]

基于 cur_time 准备新任务(None 表示最新时间)。

您可以通过 OnlineToolR.online_models 找到最新的在线模型。

返回:

一个包含新任务的列表。

返回类型:

List[dict]

在线工具

OnlineTool 是一个用于设置和取消设置一系列 在线 模型的模块。在线 模型是在某些时间点具有决定性的模型,可随时间变化而更新。这使得我们能够随着市场风格的变化使用高效的子模型。

qlib.workflow.online.utils.在线工具

OnlineTool 将管理包含模型记录器的实验中的 在线 模型。

__init__()

初始化 OnlineTool。

set_online_tag(tag, recorder: list | object)

将标签 tag 设置给模型,以标识其是否在线。

参数:
  • tag (str) – 来自 ONLINE_TAGOFFLINE_TAG 的标签

  • recorder (Union[list,object]) – 模型的记录器(recorder)

get_online_tag(recorder: object) str

根据模型记录器返回其在线标签。

参数:

recorder (Object) – 模型的记录器

返回:

在线标签

返回类型:

str

reset_online_tag(recorder: list | object)

将所有模型设为离线,并将其记录器状态重置为‘在线’。

参数:

recorder (Union[list,object]) – 要重置为‘在线’状态的记录器

online_models() list

获取当前处于 在线 状态的模型

返回:

一个包含所有 在线 模型的列表。

返回类型:

list

update_online_pred(to_date=None)

在线 模型的预测更新到 to_date。

参数:

to_date (pd.Timestamp) – 此日期之前的预测将被更新。若为 None,则更新至最新。

qlib.workflow.online.utils.在线工具 R(default_exp_name: str | None = None)

基于 (R)ecorder 实现的 OnlineTool。

__init__(default_exp_name: str | None = None)

初始化 OnlineToolR。

参数:

default_exp_name (str) – 默认实验名称。

set_online_tag(tag, recorder: Recorder | List)

将标签 tag 设置到模型的记录器中,以标识其是否在线。

参数:
  • tag (str) – 可选标签包括 ONLINE_TAGNEXT_ONLINE_TAGOFFLINE_TAG

  • recorder (Union[Recorder, List]) – Recorder 对象的列表或单个 Recorder 实例

get_online_tag(recorder: 记录器) str

根据模型记录器返回其在线标签。

参数:

recorder (Recorder) – 一个 recorder 实例

返回:

在线标签

返回类型:

str

reset_online_tag(recorder: Recorder | List, exp_name: str | None = None)

将所有模型设为离线,并将其记录器状态重置为‘在线’。

参数:
  • recorder (Union[Recorder, List]) – 您想要重置为“在线”状态的记录器。

  • exp_name (str) – 实验名称。如果为 None,则使用默认实验名称。

online_models(exp_name: str | None = None) list

获取当前处于 在线 状态的模型

参数:

exp_name (str) – 实验名称。如果为 None,则使用默认实验名称。

返回:

一个包含所有 在线 模型的列表。

返回类型:

list

update_online_pred(to_date=None, from_date=None, exp_name: str | None = None)

将在线模型的预测更新至指定日期。

参数:
  • to_date (pd.Timestamp) – 该日期之前的预测将被更新。若为 None,则更新至日历中的最新时间。

  • exp_name (str) – 实验名称。如果为 None,则使用默认实验名称。

更新器

Updater 是一个用于在股票数据更新时,更新预测等产物的模块。

qlib.workflow.online.update.RMD 加载器(rec: 记录器)

记录器 模型 数据集 加载器

__init__(rec: 记录器)
get_dataset(start_time, end_time, segments=None, unprepared_dataset: DatasetH | None = None) DatasetH

加载、配置并设置数据集。

该数据集用于推理。

参数:
  • start_time – 底层数据的起始时间

  • end_time – 底层数据的结束时间

  • segments – 字典类型,表示数据集的分段配置。由于是时间序列数据集(TSDatasetH),测试分段可能与 start_time 和 end_time 不同

  • unprepared_dataset – 可选[DatasetH],如果用户不希望从记录器加载数据集,请指定用户的数据集

返回:

DatasetH 的实例

返回类型:

DatasetH

class qlib.workflow.online.update.RecordUpdater(record: Recorder, *args, **kwargs)

更新特定的记录器

__init__(record: Recorder, *args, **kwargs)
抽象 update(*args, **kwargs)

更新特定记录器的信息

qlib.workflow.online.update.DSBasedUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

基于数据集的更新器

  • 为基于 Qlib 数据集的数据提供更新功能

假设

  • 基于 Qlib 数据集

  • 待更新的数据是一个多级索引的 pd.DataFrame,例如标签、预测值。

                             LABEL0
    datetime   instrument
    2021-05-10 SH600000    0.006965
               SH600004    0.003407
    ...                         ...
    2021-05-28 SZ300498    0.015748
               SZ300676   -0.001321
    
__init__(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

初始化 PredUpdater。

以下情况的预期行为:

  • 如果 to_date 大于日历中的最大日期,则数据将更新到最新日期

  • 如果存在 from_date 之前或 to_date 之后的数据,则仅影响 from_dateto_date 之间的数据。

参数:
  • record – 记录器(Recorder)

  • to_date

    将预测更新到 to_date

    如果 to_date 为 None:

    数据将更新到最新日期。

  • from_date

    更新将从 from_date 开始

    如果 from_date 为 None:

    更新将在历史数据中最新日期的下一个时间点进行

  • hist_ref

    int 有时,数据集会存在历史依赖。此参数由用户指定历史依赖的长度。如果用户未指定该参数,Updater 将尝试加载数据集以自动确定 hist_ref

    注意

    起始时间不包含在 hist_ref 内;因此在大多数情况下,hist_ref 等于 step_len - 1

  • loader_cls – type 用于加载模型和数据集的类

prepare_data(unprepared_dataset: DatasetH | None = None) DatasetH

加载数据集 - 如果指定了 unprepared_dataset,则直接准备数据集 - 否则,

将此功能分离将使数据集更易于复用

返回:

DatasetH 的实例

返回类型:

DatasetH

update(dataset: DatasetH | None = None, write: bool = True, ret_new: bool = False) object | None
参数:
  • 数据集 (DatasetH) – DatasetH:DatasetH 的实例。若为 None,则重新准备。

  • 写入 (布尔值) – 是否执行写入操作

  • 返回新数据 (布尔值) – 是否返回更新后的数据

返回:

更新后的数据集

返回类型:

可选[对象]

抽象 get_update_data(dataset: 数据集) DataFrame

根据给定的数据集返回更新后的数据

get_update_dataupdate 的区别 — update_date 仅包含某些特定于数据的特性,而 update 包含一些通用的常规步骤(例如准备数据集、检查等)

qlib.workflow.online.update.PredUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <类 'qlib.workflow.online.update.RMDLoader'>)

更新记录器(Recorder)中的预测结果

get_update_data(dataset: 数据集) DataFrame

根据给定的数据集返回更新后的数据

get_update_dataupdate 的区别 — update_date 仅包含某些特定于数据的特性,而 update 包含一些通用的常规步骤(例如准备数据集、检查等)

qlib.workflow.online.update.LabelUpdater(record: Recorder, to_date=None, **kwargs)

更新记录器中的标签

假设 - 标签由 record_temp.SignalRecord 生成。

__init__(record: Recorder, to_date=None, **kwargs)

初始化 PredUpdater。

以下情况的预期行为:

  • 如果 to_date 大于日历中的最大日期,则数据将更新到最新日期

  • 如果存在 from_date 之前或 to_date 之后的数据,则仅影响 from_dateto_date 之间的数据。

参数:
  • record – 记录器(Recorder)

  • to_date

    将预测更新到 to_date

    如果 to_date 为 None:

    数据将更新到最新日期。

  • from_date

    更新将从 from_date 开始

    如果 from_date 为 None:

    更新将在历史数据中最新日期的下一个时间点进行

  • hist_ref

    int 有时,数据集会存在历史依赖。此参数由用户指定历史依赖的长度。如果用户未指定该参数,Updater 将尝试加载数据集以自动确定 hist_ref

    注意

    起始时间不包含在 hist_ref 内;因此在大多数情况下,hist_ref 等于 step_len - 1

  • loader_cls – type 用于加载模型和数据集的类

get_update_data(dataset: 数据集) DataFrame

根据给定的数据集返回更新后的数据

get_update_dataupdate 的区别 — update_date 仅包含某些特定于数据的特性,而 update 包含一些通用的常规步骤(例如准备数据集、检查等)