任务管理

简介

工作流部分介绍了如何以松耦合的方式运行研究工作流。但当你使用qrun时,它只能执行一个任务任务管理模块则提供了一个完整的流程,包括任务生成任务存储任务训练任务收集,可自动地生成并执行不同的任务。通过该模块,用户可以在不同时间段、不同损失函数下,甚至使用不同模型自动运行他们的任务。任务生成、模型训练以及数据合并与收集的整个流程如下图所示。

../_images/Task-Gen-Recorder-Collector.svg

整个流程可用于在线服务

整个流程的示例如此处所示。

任务生成

一个任务包含模型数据集记录,或用户添加的任何内容。具体任务模板可在任务章节中查看。尽管任务模板是固定的,用户仍可自定义他们的TaskGen,以基于任务模板生成不同的任务

以下是TaskGen的基类定义:

qlib.workflow.task.gen.任务生成器

用于生成不同任务的基类

示例 1:

输入:特定任务模板和滚动步数

输出:任务的滚动版本

示例 2:

输入:特定任务模板和损失函数列表

输出:一组具有不同损失函数的任务

抽象 generate(task: dict) 列表[dict]

基于任务模板生成不同的任务

参数:

task (dict) – 一个任务模板

返回:

任务列表

返回类型:

List[dict]

Qlib提供了一个名为RollingGen的类,用于在不同日期区间生成数据集的一系列任务。该类允许用户在一次实验中验证不同时段数据对模型效果的影响。更多信息请见此处

任务存储

为了实现更高的效率和集群操作的可能性,任务管理器会将所有任务存储在MongoDB中。TaskManager能够自动获取未完成的任务,并管理一组任务的生命周期,包括错误处理。用户在使用此模块时必须完成MongoDB的配置。

用户需要在初始化时为TaskManager提供 MongoDB 的 URL 和数据库名称,或做出如下声明。

from qlib.config import C
C["mongo"] = {
    "task_url" : "mongodb://localhost:27017/", # your MongoDB url
    "task_db_name" : "rolling_db" # database name
}
qlib.workflow.task.manage.任务管理器(task_pool: str)

以下是 TaskManager 创建任务时的任务格式示例

{
    'def': pickle serialized task definition.  using pickle will make it easier
    'filter': json-like data. This is for filtering the tasks.
    'status': 'waiting' | 'running' | 'done'
    'res': pickle serialized task result,
}

任务管理器假设你只会更新已获取的任务。通过 MongoDB 的获取与更新操作,可保证数据更新的安全性。

该类可用作命令行工具。以下是一些示例。你可以通过以下命令查看 manage 模块的帮助信息:python -m qlib.workflow.task.manage -h # 显示 manage 模块 CLI 的使用手册;python -m qlib.workflow.task.manage wait -h # 显示 manage 模块中 wait 命令的使用手册

python -m qlib.workflow.task.manage -t <pool_name> wait
python -m qlib.workflow.task.manage -t <pool_name> task_stat

注意

假设:MongoDB 中的数据是经过编码的,而从 MongoDB 输出的数据是已解码的

共有四种状态,分别是:

STATUS_WAITING:等待训练

STATUS_RUNNING:正在训练

STATUS_PART_DONE:已完成部分步骤,等待下一步

STATUS_DONE:所有工作已完成

__init__(task_pool: str)

初始化 Task Manager 时,请务必首先配置 MongoDB 的 URL 和数据库名称。一个 TaskManager 实例服务于特定的任务池。该模块的静态方法则作用于整个 MongoDB。

参数:

task_pool (str) – MongoDB 中的集合(Collection)名称

静态 list() list

列出数据库中所有的集合(task_pool)。

返回:

list

replace_task(任务, 新任务)

使用一个新任务替换旧任务

参数:
  • task – 旧任务

  • new_task – 新任务

insert_task(task)

插入一个任务。

参数:

task – 待插入的任务

返回:

pymongo.results.InsertOneResult

insert_task_def(task_def)

向 task_pool 中插入一个任务

参数:

task_def (dict) – 任务定义

返回类型:

pymongo.results.InsertOneResult

create_task(task_def_l, dry_run=False, print_nt=False) List[str]

如果 task_def_l 中的任务是新的,则将新任务插入 task_pool,并记录 inserted_id;如果任务已存在,则仅查询其 _id。

参数:
  • task_def_l (list) – 任务列表

  • dry_run (bool) – 是否将这些新任务插入到任务池中

  • print_nt (bool) – 是否打印新任务

返回:

task_def_l 中各个任务的 _id 列表

返回类型:

List[str]

fetch_task(query={}, status='waiting') dict

使用 query 来获取任务。

参数:
  • query (dict,可选) – 查询字典,默认为 {}。

  • status (str,可选) – 状态描述,默认为 STATUS_WAITING。

返回:

解码后的任务(集合中的文档)

返回类型:

dict

safe_fetch_task(query={}, status='waiting')

使用带有 contextmanager 的 query 从 task_pool 中获取任务

参数:

query (dict) – 查询条件字典

返回:

dict

返回类型:

解码后的任务(集合中的文档)

query(query={}, decode=True)

查询集合中的任务。如果遍历生成器耗时过长,此函数可能会抛出异常 pymongo.errors.CursorNotFound: cursor id not found

python -m qlib.workflow.task.manage -t <你的任务池> query ‘{“_id”: “615498be837d0053acbc5d58”}’

参数:
  • query (dict) – 查询条件字典

  • decode (bool) –

返回:

dict

返回类型:

解码后的任务(集合中的文档)

re_query(_id) dict

使用 _id 查询任务。

参数:

_id (str) – 文档的 _id

返回:

解码后的任务(集合中的文档)

返回类型:

dict

commit_task_res(task, res, status='done')

将结果提交到 task['res']。

参数:
  • task ([类型]) – [描述]

  • res (object) – 你想要保存的结果

  • status (str, 可选) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_DONE。

return_task(task, status='waiting')

将任务重置为指定状态。通常用于错误处理。

参数:
  • task ([类型]) – [描述]

  • status (str, 可选) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_WAITING。

remove(query={})

使用查询条件删除任务

参数:

query (dict) – 查询条件字典

task_stat(query={}) dict

统计每种状态下的任务数量。

参数:

query (dict, 可选) – 查询字典。默认为 {}。

返回:

dict

reset_waiting(query={})

将所有正在运行的任务重置为等待状态。可用于某些运行中的任务意外退出后的恢复。

参数:

query (dict, 可选) – 查询字典。默认为 {}。

prioritize(task, priority: int)

为任务设置优先级

参数:
  • 任务 (字典) – 来自数据库的任务查询

  • 优先级 (整数) – 目标优先级

等待(query={})

在多进程环境下,主进程可能无法从 TaskManager 获取任何任务,因为仍有一些任务正在运行。因此,主进程应等待,直到其他进程或机器完成所有任务的训练。

参数:

query (dict, 可选) – 查询字典。默认为 {}。

有关任务管理器 Task Manager的更多详细信息,请参见此处

任务训练

在生成并存储这些task之后,现在可以运行处于WAITING状态的任务了。Qlib提供了一个名为run_task的方法,用于运行任务池中的task,但用户也可以自定义任务的执行方式。获取task_func的一个简便方法是直接使用qlib.model.trainer.task_train,它将运行由task定义的完整工作流,包括模型(Model)数据集(Dataset)记录(Record)

qlib.workflow.task.manage.run_task(task_func: 可调用对象, task_pool: 字符串, query: 字典 = {}, force_release: 布尔值 = False, before_status: 字符串 = 'waiting', after_status: 字符串 = 'done', **kwargs)

当任务池非空(存在 WAITING 状态的任务)时,使用 task_func 来获取并运行 task_pool 中的任务

调用此方法后,会出现以下四种情况(before_status -> after_status):

STATUS_WAITING -> STATUS_DONE:将 task[“def”] 作为 task_func 的参数,表示该任务尚未开始

STATUS_WAITING -> STATUS_PART_DONE:将 task[“def”] 作为 task_func 的参数

STATUS_PART_DONE -> STATUS_PART_DONE:将 task[“res”] 作为 task_func 的参数,表示该任务已开始但未完成

STATUS_PART_DONE -> STATUS_DONE:将 task[“res”] 作为 task_func 的参数

参数:
  • task_func (Callable) –

    def (task_def, **kwargs) -> <将被提交的结果>

    用于执行任务的函数

  • task_pool (str) – 任务池的名称(MongoDB 中的 Collection)

  • query (dict) – 获取任务时将使用该字典查询 task_pool

  • force_release (bool) – 程序是否会强制释放资源

  • before_status (str:) – 处于 before_status 的任务将被获取并进行训练。可选值为 STATUS_WAITING、STATUS_PART_DONE。

  • after_status (str:) – 训练完成后的任务将变为 after_status。可选值为 STATUS_WAITING、STATUS_PART_DONE。

  • kwargstask_func 的参数

同时,Qlib 提供了一个名为 Trainer 的模块。

qlib.model.trainer.训练器

训练器可以训练一组模型。Trainer 和 DelayTrainer 的区别在于实际训练何时完成。

__init__()
train(tasks: list, *args, **kwargs) list

给定任务定义的列表,开始训练并返回模型。

对于 Trainer,实际的训练在此方法中完成;对于 DelayTrainer,此方法仅进行一些准备工作。

参数:

tasks – 任务列表

返回:

模型列表

返回类型:

list

end_train(models: list, *args, **kwargs) list

给定一个模型列表,在训练结束时完成一些必要的收尾工作。这些模型可能是 Recorder、文本文件、数据库等。

对于 Trainer,此方法中执行一些收尾操作;对于 DelayTrainer,实际的训练在此方法中完成。

参数:

models – 模型列表

返回:

模型列表

返回类型:

list

is_delay() 布尔值

指示 Trainer 是否会延迟执行 end_train

返回:

如果是 DelayTrainer

返回类型:

布尔值

has_worker() 布尔值

某些训练器具有后端工作进程以支持并行训练。此方法可用于判断工作进程是否已启用。

返回:

如果工作进程已启用

返回类型:

布尔值

worker()

启动工作进程

Raises:

NotImplementedError: – 如果不支持工作进程

Trainer 将训练一系列任务并返回一个模型记录器列表。Qlib 提供了两种 Trainer:TrainerR 是最简单的方式,TrainerRM 则基于 TaskManager 自动管理任务的生命周期。如果你不想使用 任务管理器 来管理任务,那么使用 TrainerR 训练由 TaskGen 生成的任务列表就足够了。此处提供了关于不同 Trainer 的详细信息。

任务收集

在收集模型训练结果之前,你需要使用 qlib.init 指定 mlruns 的路径。

为了在训练后收集 任务 的结果,Qlib 提供了 CollectorGroupEnsemble,以一种可读、可扩展且松耦合的方式收集结果。

Collector 可以从任意位置收集对象,并对其进行处理,例如合并、分组、平均等。它包含两个步骤:collect(将任意内容收集到字典中)和 process_collect(处理已收集的字典)。

Group 也包含两个步骤,分别是 group(可根据 group_func 将一组对象分组并转换为字典)和 reduce(可根据一定规则将字典合并为一个整体)。例如:{(A,B,C1): object, (A,B,C2): object} —group—> {(A,B): {C1: object, C2: object}} —reduce—> {(A,B): object}

Ensemble 可以将集合中的对象进行合并。例如:{C1: object, C2: object} —Ensemble—> object。你可以在 Collector 的 process_list 中设置所需的 ensemble。常见的 ensemble 包括 AverageEnsembleRollingEnsemble。Average ensemble 用于集成同一时间段内不同模型的结果。RollingEnsemble 用于集成同一时间段内不同模型的结果

因此,层级关系是:Collector 的第二步对应 Group,而 Group 的第二步对应 Ensemble

欲了解更多信息,请参阅 CollectorGroupEnsemble,或查看 示例