任务管理
简介
工作流部分介绍了如何以松耦合的方式运行研究工作流。但当你使用qrun
时,它只能执行一个任务
。任务管理
模块则提供了一个完整的流程,包括任务生成、任务存储、任务训练和任务收集,可自动地生成并执行不同的任务。通过该模块,用户可以在不同时间段、不同损失函数下,甚至使用不同模型自动运行他们的任务
。任务生成、模型训练以及数据合并与收集的整个流程如下图所示。
整个流程可用于在线服务。
整个流程的示例如此处所示。
任务生成
一个任务
包含模型、数据集、记录,或用户添加的任何内容。具体任务模板可在任务章节中查看。尽管任务模板是固定的,用户仍可自定义他们的TaskGen
,以基于任务模板生成不同的任务
。
以下是TaskGen
的基类定义:
- 类 qlib.workflow.task.gen.任务生成器
用于生成不同任务的基类
示例 1:
输入:特定任务模板和滚动步数
输出:任务的滚动版本
示例 2:
输入:特定任务模板和损失函数列表
输出:一组具有不同损失函数的任务
- 抽象 generate(task: dict) 列表[dict]
基于任务模板生成不同的任务
- 参数:
task (dict) – 一个任务模板
- 返回:
任务列表
- 返回类型:
List[dict]
Qlib
提供了一个名为RollingGen的类,用于在不同日期区间生成数据集的一系列任务
。该类允许用户在一次实验中验证不同时段数据对模型效果的影响。更多信息请见此处。
任务存储
为了实现更高的效率和集群操作的可能性,任务管理器
会将所有任务存储在MongoDB中。TaskManager
能够自动获取未完成的任务,并管理一组任务的生命周期,包括错误处理。用户在使用此模块时必须完成MongoDB的配置。
用户需要在初始化时为TaskManager
提供 MongoDB 的 URL 和数据库名称,或做出如下声明。
from qlib.config import C C["mongo"] = { "task_url" : "mongodb://localhost:27017/", # your MongoDB url "task_db_name" : "rolling_db" # database name }
- 类 qlib.workflow.task.manage.任务管理器(task_pool: str)
以下是 TaskManager 创建任务时的任务格式示例
{ 'def': pickle serialized task definition. using pickle will make it easier 'filter': json-like data. This is for filtering the tasks. 'status': 'waiting' | 'running' | 'done' 'res': pickle serialized task result, }
任务管理器假设你只会更新已获取的任务。通过 MongoDB 的获取与更新操作,可保证数据更新的安全性。
该类可用作命令行工具。以下是一些示例。你可以通过以下命令查看 manage 模块的帮助信息:python -m qlib.workflow.task.manage -h # 显示 manage 模块 CLI 的使用手册;python -m qlib.workflow.task.manage wait -h # 显示 manage 模块中 wait 命令的使用手册
python -m qlib.workflow.task.manage -t <pool_name> wait python -m qlib.workflow.task.manage -t <pool_name> task_stat
注意
假设:MongoDB 中的数据是经过编码的,而从 MongoDB 输出的数据是已解码的
共有四种状态,分别是:
STATUS_WAITING:等待训练
STATUS_RUNNING:正在训练
STATUS_PART_DONE:已完成部分步骤,等待下一步
STATUS_DONE:所有工作已完成
- __init__(task_pool: str)
初始化 Task Manager 时,请务必首先配置 MongoDB 的 URL 和数据库名称。一个 TaskManager 实例服务于特定的任务池。该模块的静态方法则作用于整个 MongoDB。
- 参数:
task_pool (str) – MongoDB 中的集合(Collection)名称
- 静态 list() list
列出数据库中所有的集合(task_pool)。
- 返回:
list
- replace_task(任务, 新任务)
使用一个新任务替换旧任务
- 参数:
task – 旧任务
new_task – 新任务
- insert_task(task)
插入一个任务。
- 参数:
task – 待插入的任务
- 返回:
pymongo.results.InsertOneResult
- insert_task_def(task_def)
向 task_pool 中插入一个任务
- 参数:
task_def (dict) – 任务定义
- 返回类型:
pymongo.results.InsertOneResult
- create_task(task_def_l, dry_run=False, print_nt=False) List[str]
如果 task_def_l 中的任务是新的,则将新任务插入 task_pool,并记录 inserted_id;如果任务已存在,则仅查询其 _id。
- 参数:
task_def_l (list) – 任务列表
dry_run (bool) – 是否将这些新任务插入到任务池中
print_nt (bool) – 是否打印新任务
- 返回:
task_def_l 中各个任务的 _id 列表
- 返回类型:
List[str]
- fetch_task(query={}, status='waiting') dict
使用 query 来获取任务。
- 参数:
query (dict,可选) – 查询字典,默认为 {}。
status (str,可选) – 状态描述,默认为 STATUS_WAITING。
- 返回:
解码后的任务(集合中的文档)
- 返回类型:
dict
- safe_fetch_task(query={}, status='waiting')
使用带有 contextmanager 的 query 从 task_pool 中获取任务
- 参数:
query (dict) – 查询条件字典
- 返回:
dict
- 返回类型:
解码后的任务(集合中的文档)
- query(query={}, decode=True)
查询集合中的任务。如果遍历生成器耗时过长,此函数可能会抛出异常 pymongo.errors.CursorNotFound: cursor id not found
python -m qlib.workflow.task.manage -t <你的任务池> query ‘{“_id”: “615498be837d0053acbc5d58”}’
- 参数:
query (dict) – 查询条件字典
decode (bool) –
- 返回:
dict
- 返回类型:
解码后的任务(集合中的文档)
- re_query(_id) dict
使用 _id 查询任务。
- 参数:
_id (str) – 文档的 _id
- 返回:
解码后的任务(集合中的文档)
- 返回类型:
dict
- commit_task_res(task, res, status='done')
将结果提交到 task['res']。
- 参数:
task ([类型]) – [描述]
res (object) – 你想要保存的结果
status (str, 可选) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_DONE。
- return_task(task, status='waiting')
将任务重置为指定状态。通常用于错误处理。
- 参数:
task ([类型]) – [描述]
status (str, 可选) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_WAITING。
- remove(query={})
使用查询条件删除任务
- 参数:
query (dict) – 查询条件字典
- task_stat(query={}) dict
统计每种状态下的任务数量。
- 参数:
query (dict, 可选) – 查询字典。默认为 {}。
- 返回:
dict
- reset_waiting(query={})
将所有正在运行的任务重置为等待状态。可用于某些运行中的任务意外退出后的恢复。
- 参数:
query (dict, 可选) – 查询字典。默认为 {}。
- prioritize(task, priority: int)
为任务设置优先级
- 参数:
任务 (字典) – 来自数据库的任务查询
优先级 (整数) – 目标优先级
- 等待(query={})
在多进程环境下,主进程可能无法从 TaskManager 获取任何任务,因为仍有一些任务正在运行。因此,主进程应等待,直到其他进程或机器完成所有任务的训练。
- 参数:
query (dict, 可选) – 查询字典。默认为 {}。
有关任务管理器 Task Manager
的更多详细信息,请参见此处。
任务训练
在生成并存储这些task
之后,现在可以运行处于WAITING状态的任务了。Qlib
提供了一个名为run_task
的方法,用于运行任务池中的task
,但用户也可以自定义任务的执行方式。获取task_func
的一个简便方法是直接使用qlib.model.trainer.task_train
,它将运行由task
定义的完整工作流,包括模型(Model)、数据集(Dataset)和记录(Record)。
- qlib.workflow.task.manage.run_task(task_func: 可调用对象, task_pool: 字符串, query: 字典 = {}, force_release: 布尔值 = False, before_status: 字符串 = 'waiting', after_status: 字符串 = 'done', **kwargs)
当任务池非空(存在 WAITING 状态的任务)时,使用 task_func 来获取并运行 task_pool 中的任务
调用此方法后,会出现以下四种情况(before_status -> after_status):
STATUS_WAITING -> STATUS_DONE:将 task[“def”] 作为 task_func 的参数,表示该任务尚未开始
STATUS_WAITING -> STATUS_PART_DONE:将 task[“def”] 作为 task_func 的参数
STATUS_PART_DONE -> STATUS_PART_DONE:将 task[“res”] 作为 task_func 的参数,表示该任务已开始但未完成
STATUS_PART_DONE -> STATUS_DONE:将 task[“res”] 作为 task_func 的参数
- 参数:
task_func (Callable) –
def (task_def, **kwargs) -> <将被提交的结果>
用于执行任务的函数
task_pool (str) – 任务池的名称(MongoDB 中的 Collection)
query (dict) – 获取任务时将使用该字典查询 task_pool
force_release (bool) – 程序是否会强制释放资源
before_status (str:) – 处于 before_status 的任务将被获取并进行训练。可选值为 STATUS_WAITING、STATUS_PART_DONE。
after_status (str:) – 训练完成后的任务将变为 after_status。可选值为 STATUS_WAITING、STATUS_PART_DONE。
kwargs – task_func 的参数
同时,Qlib
提供了一个名为 Trainer
的模块。
- 类 qlib.model.trainer.训练器
训练器可以训练一组模型。Trainer 和 DelayTrainer 的区别在于实际训练何时完成。
- __init__()
- train(tasks: list, *args, **kwargs) list
给定任务定义的列表,开始训练并返回模型。
对于 Trainer,实际的训练在此方法中完成;对于 DelayTrainer,此方法仅进行一些准备工作。
- 参数:
tasks – 任务列表
- 返回:
模型列表
- 返回类型:
list
- end_train(models: list, *args, **kwargs) list
给定一个模型列表,在训练结束时完成一些必要的收尾工作。这些模型可能是 Recorder、文本文件、数据库等。
对于 Trainer,此方法中执行一些收尾操作;对于 DelayTrainer,实际的训练在此方法中完成。
- 参数:
models – 模型列表
- 返回:
模型列表
- 返回类型:
list
- is_delay() 布尔值
指示 Trainer 是否会延迟执行 end_train。
- 返回:
如果是 DelayTrainer
- 返回类型:
布尔值
- has_worker() 布尔值
某些训练器具有后端工作进程以支持并行训练。此方法可用于判断工作进程是否已启用。
- 返回:
如果工作进程已启用
- 返回类型:
布尔值
- worker()
启动工作进程
- Raises:
NotImplementedError: – 如果不支持工作进程
Trainer
将训练一系列任务并返回一个模型记录器列表。Qlib
提供了两种 Trainer:TrainerR 是最简单的方式,TrainerRM 则基于 TaskManager 自动管理任务的生命周期。如果你不想使用 任务管理器
来管理任务,那么使用 TrainerR 训练由 TaskGen
生成的任务列表就足够了。此处提供了关于不同 Trainer
的详细信息。
任务收集
在收集模型训练结果之前,你需要使用 qlib.init
指定 mlruns 的路径。
为了在训练后收集 任务
的结果,Qlib
提供了 Collector、Group 和 Ensemble,以一种可读、可扩展且松耦合的方式收集结果。
Collector 可以从任意位置收集对象,并对其进行处理,例如合并、分组、平均等。它包含两个步骤:collect
(将任意内容收集到字典中)和 process_collect
(处理已收集的字典)。
Group 也包含两个步骤,分别是 group
(可根据 group_func 将一组对象分组并转换为字典)和 reduce
(可根据一定规则将字典合并为一个整体)。例如:{(A,B,C1): object, (A,B,C2): object} —group
—> {(A,B): {C1: object, C2: object}} —reduce
—> {(A,B): object}
Ensemble 可以将集合中的对象进行合并。例如:{C1: object, C2: object} —Ensemble
—> object。你可以在 Collector
的 process_list 中设置所需的 ensemble。常见的 ensemble 包括 AverageEnsemble
和 RollingEnsemble
。Average ensemble 用于集成同一时间段内不同模型的结果。RollingEnsemble 用于集成同一时间段内不同模型的结果
因此,层级关系是:Collector
的第二步对应 Group
,而 Group
的第二步对应 Ensemble
。