在线服务
介绍

除了回测之外,测试模型的一种有效方法是在真实的市场条件下进行预测,甚至根据这些预测进行真实交易。
在线服务
是一组用于使用最新数据的在线模型的模块,
其中包括 在线管理器,在线策略,在线工具,更新器。
这里 有几个示例供参考,展示了 在线服务
的不同特点。
如果您有很多模型或需要管理的 任务,请考虑 任务管理 。
示例 基于 任务管理 中的一些组件,如 TrainerRM
或 Collector
。
注意:用户应保持其数据源更新,以支持在线服务。例如,Qlib提供了 一批脚本 来帮助用户更新Yahoo的日频数据。
目前已知的限制 - 目前,支持每日更新下一个交易日的预测。但由于 公共数据限制 ,不支持为下一个交易日生成订单。
在线管理器
OnlineManager can manage a set of Online Strategy and run them dynamically.
With the change of time, the decisive models will be also changed. In this module, we call those contributing models online models. In every routine(such as every day or every minute), the online models may be changed and the prediction of them needs to be updated. So this module provides a series of methods to control this process.
This module also provides a method to simulate Online Strategy in history. Which means you can verify your strategy or find a better one.
There are 4 total situations for using different trainers in different situations:
Situations |
Description |
---|---|
Online + Trainer |
When you want to do a REAL routine, the Trainer will help you train the models. It will train models task by task and strategy by strategy. |
Online + DelayTrainer |
DelayTrainer will skip concrete training until all tasks have been prepared by different strategies. It makes users can parallelly train all tasks at the end of routine or first_train. Otherwise, these functions will get stuck when each strategy prepare tasks. |
Simulation + Trainer |
It will behave in the same way as Online + Trainer. The only difference is that it is for simulation/backtesting instead of online trading |
Simulation + DelayTrainer |
When your models don’t have any temporal dependence, you can use DelayTrainer for the ability to multitasking. It means all tasks in all routines can be REAL trained at the end of simulating. The signals will be prepared well at different time segments (based on whether or not any new model is online). |
Here is some pseudo code that demonstrate the workflow of each situation
- For simplicity
Only one strategy is used in the strategy
update_online_pred is only called in the online mode and is ignored
Online + Trainer
tasks = first_train()
models = trainer.train(tasks)
trainer.end_train(models)
for day in online_trading_days:
# OnlineManager.routine
models = trainer.train(strategy.prepare_tasks()) # for each strategy
strategy.prepare_online_models(models) # for each strategy
trainer.end_train(models)
prepare_signals() # prepare trading signals daily
Online + DelayTrainer: the workflow is the same as Online + Trainer.
Simulation + DelayTrainer
# simulate
tasks = first_train()
models = trainer.train(tasks)
for day in historical_calendars:
# OnlineManager.routine
models = trainer.train(strategy.prepare_tasks()) # for each strategy
strategy.prepare_online_models(models) # for each strategy
# delay_prepare()
# FIXME: Currently the delay_prepare is not implemented in a proper way.
trainer.end_train(<for all previous models>)
prepare_signals()
# Can we simplify current workflow?
Can reduce the number of state of tasks?
For each task, we have three phases (i.e. task, partly trained task, final trained task)
- class qlib.workflow.online.manager.OnlineManager(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')
OnlineManager can manage online models with Online Strategy. It also provides a history recording of which models are online at what time.
- __init__(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')
Init OnlineManager. One OnlineManager must have at least one OnlineStrategy.
- 参数:
strategies (Union[OnlineStrategy, List[OnlineStrategy]]) – an instance of OnlineStrategy or a list of OnlineStrategy
begin_time (Union[str,pd.Timestamp], optional) – the OnlineManager will begin at this time. Defaults to None for using the latest date.
trainer (qlib.model.trainer.Trainer) – the trainer to train task. None for using TrainerR.
freq (str, optional) – data frequency. Defaults to “day”.
- first_train(strategies: List[OnlineStrategy] | None = None, model_kwargs: dict = {})
Get tasks from every strategy’s first_tasks method and train them. If using DelayTrainer, it can finish training all together after every strategy’s first_tasks.
- 参数:
strategies (List[OnlineStrategy]) – the strategies list (need this param when adding strategies). None for use default strategies.
model_kwargs (dict) – the params for prepare_online_models
- routine(cur_time: str | Timestamp | None = None, task_kwargs: dict = {}, model_kwargs: dict = {}, signal_kwargs: dict = {})
Typical update process for every strategy and record the online history.
The typical update process after a routine, such as day by day or month by month. The process is: Update predictions -> Prepare tasks -> Prepare online models -> Prepare signals.
If using DelayTrainer, it can finish training all together after every strategy’s prepare_tasks.
- 参数:
cur_time (Union[str,pd.Timestamp], optional) – run routine method in this time. Defaults to None.
task_kwargs (dict) – the params for prepare_tasks
model_kwargs (dict) – the params for prepare_online_models
signal_kwargs (dict) – the params for prepare_signals
- get_collector(**kwargs) MergeCollector
Get the instance of Collector to collect results from every strategy. This collector can be a basis as the signals preparation.
- 参数:
**kwargs – the params for get_collector.
- 返回:
the collector to merge other collectors.
- 返回类型:
- add_strategy(strategies: OnlineStrategy | List[OnlineStrategy])
Add some new strategies to OnlineManager.
- 参数:
strategy (Union[OnlineStrategy, List[OnlineStrategy]]) – a list of OnlineStrategy
- prepare_signals(prepare_func: ~typing.Callable = <qlib.model.ens.ensemble.AverageEnsemble object>, over_write=False)
After preparing the data of the last routine (a box in box-plot) which means the end of the routine, we can prepare trading signals for the next routine.
NOTE: Given a set prediction, all signals before these prediction end times will be prepared well.
Even if the latest signal already exists, the latest calculation result will be overwritten.
备注
Given a prediction of a certain time, all signals before this time will be prepared well.
- 参数:
prepare_func (Callable, optional) – Get signals from a dict after collecting. Defaults to AverageEnsemble(), the results collected by MergeCollector must be {xxx:pred}.
over_write (bool, optional) – If True, the new signals will overwrite. If False, the new signals will append to the end of signals. Defaults to False.
- 返回:
the signals.
- 返回类型:
pd.DataFrame
- get_signals() Series | DataFrame
Get prepared online signals.
- 返回:
pd.Series for only one signals every datetime. pd.DataFrame for multiple signals, for example, buy and sell operations use different trading signals.
- 返回类型:
Union[pd.Series, pd.DataFrame]
- simulate(end_time=None, frequency='day', task_kwargs={}, model_kwargs={}, signal_kwargs={}) Series | DataFrame
Starting from the current time, this method will simulate every routine in OnlineManager until the end time.
Considering the parallel training, the models and signals can be prepared after all routine simulating.
The delay training way can be
DelayTrainer
and the delay preparing signals way can bedelay_prepare
.- 参数:
end_time – the time the simulation will end
frequency – the calendar frequency
task_kwargs (dict) – the params for prepare_tasks
model_kwargs (dict) – the params for prepare_online_models
signal_kwargs (dict) – the params for prepare_signals
- 返回:
pd.Series for only one signals every datetime. pd.DataFrame for multiple signals, for example, buy and sell operations use different trading signals.
- 返回类型:
Union[pd.Series, pd.DataFrame]
- delay_prepare(model_kwargs={}, signal_kwargs={})
Prepare all models and signals if something is waiting for preparation.
- 参数:
model_kwargs – the params for end_train
signal_kwargs – the params for prepare_signals
在线策略
OnlineStrategy module is an element of online serving.
- class qlib.workflow.online.strategy.OnlineStrategy(name_id: str)
OnlineStrategy is working with Online Manager, responding to how the tasks are generated, the models are updated and signals are prepared.
- __init__(name_id: str)
Init OnlineStrategy. This module MUST use Trainer to finishing model training.
- 参数:
name_id (str) – a unique name or id.
trainer (qlib.model.trainer.Trainer, optional) – a instance of Trainer. Defaults to None.
- prepare_tasks(cur_time, **kwargs) List[dict]
After the end of a routine, check whether we need to prepare and train some new tasks based on cur_time (None for latest).. Return the new tasks waiting for training.
You can find the last online models by OnlineTool.online_models.
- prepare_online_models(trained_models, cur_time=None) List[object]
Select some models from trained models and set them to online models. This is a typical implementation to online all trained models, you can override it to implement the complex method. You can find the last online models by OnlineTool.online_models if you still need them.
NOTE: Reset all online models to trained models. If there are no trained models, then do nothing.
- NOTE:
Current implementation is very naive. Here is a more complex situation which is more closer to the practical scenarios. 1. Train new models at the day before test_start (at time stamp T) 2. Switch models at the test_start (at time timestamp T + 1 typically)
- 参数:
models (list) – a list of models.
cur_time (pd.Dataframe) – current time from OnlineManger. None for the latest.
- 返回:
a list of online models.
- 返回类型:
List[object]
- first_tasks() List[dict]
Generate a series of tasks firstly and return them.
- class qlib.workflow.online.strategy.RollingStrategy(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)
This example strategy always uses the latest rolling model sas online models.
- __init__(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)
Init RollingStrategy.
Assumption: the str of name_id, the experiment name, and the trainer’s experiment name are the same.
- 参数:
name_id (str) – a unique name or id. Will be also the name of the Experiment.
task_template (Union[dict, List[dict]]) – a list of task_template or a single template, which will be used to generate many tasks using rolling_gen.
rolling_gen (RollingGen) – an instance of RollingGen
- get_collector(process_list=[<qlib.model.ens.group.RollingGroup object>], rec_key_func=None, rec_filter_func=None, artifacts_key=None)
Get the instance of Collector to collect results. The returned collector must distinguish results in different models.
Assumption: the models can be distinguished based on the model name and rolling test segments. If you do not want this assumption, please implement your method or use another rec_key_func.
- 参数:
rec_key_func (Callable) – a function to get the key of a recorder. If None, use recorder id.
rec_filter_func (Callable, optional) – filter the recorder by return True or False. Defaults to None.
artifacts_key (List[str], optional) – the artifacts key you want to get. If None, get all artifacts.
- first_tasks() List[dict]
Use rolling_gen to generate different tasks based on task_template.
- 返回:
a list of tasks
- 返回类型:
List[dict]
- prepare_tasks(cur_time) List[dict]
Prepare new tasks based on cur_time (None for the latest).
You can find the last online models by OnlineToolR.online_models.
- 返回:
a list of new tasks.
- 返回类型:
List[dict]
在线工具
OnlineTool is a module to set and unset a series of online models. The online models are some decisive models in some time points, which can be changed with the change of time. This allows us to use efficient submodels as the market-style changing.
- class qlib.workflow.online.utils.OnlineTool
OnlineTool will manage online models in an experiment that includes the model recorders.
- __init__()
Init OnlineTool.
- set_online_tag(tag, recorder: list | object)
Set tag to the model to sign whether online.
- 参数:
tag (str) – the tags in ONLINE_TAG, OFFLINE_TAG
recorder (Union[list,object]) – the model’s recorder
- get_online_tag(recorder: object) str
Given a model recorder and return its online tag.
- 参数:
recorder (Object) – the model’s recorder
- 返回:
the online tag
- 返回类型:
str
- reset_online_tag(recorder: list | object)
Offline all models and set the recorders to ‘online’.
- 参数:
recorder (Union[list,object]) – the recorder you want to reset to ‘online’.
- online_models() list
Get current online models
- 返回:
a list of online models.
- 返回类型:
list
- update_online_pred(to_date=None)
Update the predictions of online models to to_date.
- 参数:
to_date (pd.Timestamp) – the pred before this date will be updated. None for updating to the latest.
- class qlib.workflow.online.utils.OnlineToolR(default_exp_name: str | None = None)
The implementation of OnlineTool based on (R)ecorder.
- __init__(default_exp_name: str | None = None)
Init OnlineToolR.
- 参数:
default_exp_name (str) – the default experiment name.
- set_online_tag(tag, recorder: Recorder | List)
Set tag to the model’s recorder to sign whether online.
- 参数:
tag (str) – the tags in ONLINE_TAG, NEXT_ONLINE_TAG, OFFLINE_TAG
recorder (Union[Recorder, List]) – a list of Recorder or an instance of Recorder
- get_online_tag(recorder: Recorder) str
Given a model recorder and return its online tag.
- 参数:
recorder (Recorder) – an instance of recorder
- 返回:
the online tag
- 返回类型:
str
- reset_online_tag(recorder: Recorder | List, exp_name: str | None = None)
Offline all models and set the recorders to ‘online’.
- 参数:
recorder (Union[Recorder, List]) – the recorder you want to reset to ‘online’.
exp_name (str) – the experiment name. If None, then use default_exp_name.
- online_models(exp_name: str | None = None) list
Get current online models
- 参数:
exp_name (str) – the experiment name. If None, then use default_exp_name.
- 返回:
a list of online models.
- 返回类型:
list
- update_online_pred(to_date=None, from_date=None, exp_name: str | None = None)
Update the predictions of online models to to_date.
- 参数:
to_date (pd.Timestamp) – the pred before this date will be updated. None for updating to latest time in Calendar.
exp_name (str) – the experiment name. If None, then use default_exp_name.
更新器
Updater is a module to update artifacts such as predictions when the stock data is updating.
- class qlib.workflow.online.update.RMDLoader(rec: Recorder)
Recorder Model Dataset Loader
- __init__(rec: Recorder)
- get_dataset(start_time, end_time, segments=None, unprepared_dataset: DatasetH | None = None) DatasetH
Load, config and setup dataset.
This dataset is for inference.
- 参数:
start_time – the start_time of underlying data
end_time – the end_time of underlying data
segments – dict the segments config for dataset Due to the time series dataset (TSDatasetH), the test segments maybe different from start_time and end_time
unprepared_dataset – Optional[DatasetH] if user don’t want to load dataset from recorder, please specify user’s dataset
- 返回:
the instance of DatasetH
- 返回类型:
- class qlib.workflow.online.update.RecordUpdater(record: Recorder, *args, **kwargs)
Update a specific recorders
- __init__(record: Recorder, *args, **kwargs)
- abstract update(*args, **kwargs)
Update info for specific recorder
- class qlib.workflow.online.update.DSBasedUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
Dataset-Based Updater
Providing updating feature for Updating data based on Qlib Dataset
Assumption
Based on Qlib dataset
The data to be updated is a multi-level index pd.DataFrame. For example label, prediction.
LABEL0 datetime instrument 2021-05-10 SH600000 0.006965 SH600004 0.003407 ... ... 2021-05-28 SZ300498 0.015748 SZ300676 -0.001321
- __init__(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
Init PredUpdater.
Expected behavior in following cases:
if to_date is greater than the max date in the calendar, the data will be updated to the latest date
if there are data before from_date or after to_date, only the data between from_date and to_date are affected.
- 参数:
record – Recorder
to_date –
update to prediction to the to_date
if to_date is None:
data will updated to the latest date.
from_date –
the update will start from from_date
if from_date is None:
the updating will occur on the next tick after the latest data in historical data
hist_ref –
int Sometimes, the dataset will have historical depends. Leave the problem to users to set the length of historical dependency If user doesn’t specify this parameter, Updater will try to load dataset to automatically determine the hist_ref
备注
the start_time is not included in the hist_ref; So the hist_ref will be step_len - 1 in most cases
loader_cls – type the class to load the model and dataset
- prepare_data(unprepared_dataset: DatasetH | None = None) DatasetH
Load dataset - if unprepared_dataset is specified, then prepare the dataset directly - Otherwise,
Separating this function will make it easier to reuse the dataset
- 返回:
the instance of DatasetH
- 返回类型:
- update(dataset: DatasetH | None = None, write: bool = True, ret_new: bool = False) object | None
- 参数:
dataset (DatasetH) – DatasetH: the instance of DatasetH. None for prepare it again.
write (bool) – will the the write action be executed
ret_new (bool) – will the updated data be returned
- 返回:
the updated dataset
- 返回类型:
Optional[object]
- abstract get_update_data(dataset: Dataset) DataFrame
return the updated data based on the given dataset
The difference between get_update_data and update - update_date only include some data specific feature - update include some general routine steps(e.g. prepare dataset, checking)
- class qlib.workflow.online.update.PredUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
Update the prediction in the Recorder
- get_update_data(dataset: Dataset) DataFrame
return the updated data based on the given dataset
The difference between get_update_data and update - update_date only include some data specific feature - update include some general routine steps(e.g. prepare dataset, checking)
- class qlib.workflow.online.update.LabelUpdater(record: Recorder, to_date=None, **kwargs)
Update the label in the recorder
Assumption - The label is generated from record_temp.SignalRecord.
- __init__(record: Recorder, to_date=None, **kwargs)
Init PredUpdater.
Expected behavior in following cases:
if to_date is greater than the max date in the calendar, the data will be updated to the latest date
if there are data before from_date or after to_date, only the data between from_date and to_date are affected.
- 参数:
record – Recorder
to_date –
update to prediction to the to_date
if to_date is None:
data will updated to the latest date.
from_date –
the update will start from from_date
if from_date is None:
the updating will occur on the next tick after the latest data in historical data
hist_ref –
int Sometimes, the dataset will have historical depends. Leave the problem to users to set the length of historical dependency If user doesn’t specify this parameter, Updater will try to load dataset to automatically determine the hist_ref
备注
the start_time is not included in the hist_ref; So the hist_ref will be step_len - 1 in most cases
loader_cls – type the class to load the model and dataset
- get_update_data(dataset: Dataset) DataFrame
return the updated data based on the given dataset
The difference between get_update_data and update - update_date only include some data specific feature - update include some general routine steps(e.g. prepare dataset, checking)