通过适用于 Python 的 Azure 机器学习 SDK 来计划机器学习管道Schedule machine learning pipelines with Azure Machine Learning SDK for Python

本文介绍如何以编程方式计划管道,使其在 Azure 上运行。In this article, you'll learn how to programmatically schedule a pipeline to run on Azure. 可以选择创建基于运行时间或文件系统更改的计划。You can choose to create a schedule based on elapsed time or on file-system changes. 基于时间的计划可用于处理例行任务,如监视数据偏移。Time-based schedules can be used to take care of routine tasks, such as monitoring for data drift. 基于更改的计划可用于响应非常规或无法预测的更改,例如上传新数据或编辑旧数据。Change-based schedules can be used to react to irregular or unpredictable changes, such as new data being uploaded or old data being edited. 了解如何创建计划后,你将了解如何检索和停用它们。After learning how to create schedules, you'll learn how to retrieve and deactivate them.


初始化工作区并获取数据Initialize the workspace & get data

若要计划管道,需要引用工作区、已发布管道的标识符以及要在其中创建计划的试验的名称。To schedule a pipeline, you'll need a reference to your workspace, the identifier of your published pipeline, and the name of the experiment in which you wish to create the schedule. 可通过以下代码获取这些值:You can get these values with the following code:

import azureml.core
from azureml.core import Workspace
from azureml.pipeline.core import Pipeline, PublishedPipeline
from azureml.core.experiment import Experiment

ws = Workspace.from_config()

experiments = Experiment.list(ws)
for experiment in experiments:

published_pipelines = PublishedPipeline.list(ws)
for published_pipeline in  published_pipelines:

experiment_name = "MyExperiment" 
pipeline_id = "aaaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" 

创建计划Create a schedule

若要重复运行管道,需创建计划。To run a pipeline on a recurring basis, you'll create a schedule. Schedule 与管道、试验和触发器关联。A Schedule associates a pipeline, an experiment, and a trigger. 触发器可以是 ScheduleRecurrence(用于描述两次运行之间的等待时间),也可以是数据存储路径(用于指定要监视其中更改的目录)。The trigger can either be aScheduleRecurrence that describes the wait between runs or a Datastore path that specifies a directory to watch for changes. 在任一情况下,都需要管道标识符,以及要在其中创建计划的试验名称。In either case, you'll need the pipeline identifier and the name of the experiment in which to create the schedule.

在 python 文件的顶部,导入 ScheduleScheduleRecurrence 类:At the top of your python file, import the Schedule and ScheduleRecurrence classes:

from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule

创建基于时间的计划Create a time-based schedule

ScheduleRecurrence 构造函数具有一个必需的 frequency 参数,它必须包含以下字符串之一:“Minute”、“Hour”、“Day”、“Week”或“Month”。The ScheduleRecurrence constructor has a required frequency argument that must be one of the following strings: "Minute", "Hour", "Day", "Week", or "Month". 它还需要一个整型 interval 参数,用于指定每次启动计划前应经过的 frequency 单位数。It also requires an integer interval argument specifying how many of the frequency units should elapse between schedule starts. 可以使用可选参数更具体地规定开始时间,如 ScheduleRecurrence SDK 文档中所述。Optional arguments allow you to be more specific about starting times, as detailed in the ScheduleRecurrence SDK docs.

创建每 15 分钟运行一次的 ScheduleCreate a Schedule that begins a run every 15 minutes:

recurrence = ScheduleRecurrence(frequency="Minute", interval=15)
recurring_schedule = Schedule.create(ws, name="MyRecurringSchedule", 
                            description="Based on time",

创建基于更改的计划Create a change-based schedule

由文件更改触发的管道可能比基于时间的计划更有效。Pipelines that are triggered by file changes may be more efficient than time-based schedules. 例如,你可能需要在文件更改时或者在将新文件添加到数据目录时执行预处理步骤。For instance, you may want to perform a preprocessing step when a file is changed, or when a new file is added to a data directory. 可以监视对数据存储的任何更改,或监视数据存储中特定目录中的更改。You can monitor any changes to a datastore or changes within a specific directory within the datastore. 如果监视特定目录,该目录的子目录中的更改将不会触发运行。If you monitor a specific directory, changes within subdirectories of that directory will not trigger a run.

若要创建响应文件的 Schedule,必须在对 Schedule.create 的调用中设置 datastore 参数。To create a file-reactive Schedule, you must set the datastore parameter in the call to Schedule.create. 若要监视文件夹,请设置 path_on_datastore 参数。To monitor a folder, set the path_on_datastore argument.

polling_interval 参数可用于指定检查数据存储更改的频率(分钟)。The polling_interval argument allows you to specify, in minutes, the frequency at which the datastore is checked for changes.

如果管道中构造了 DataPath PipelineParameter,则可以通过设置 data_path_parameter_name 参数将该变量设置为修改后的文件的名称。If the pipeline was constructed with a DataPath PipelineParameter, you can set that variable to the name of the changed file by setting the data_path_parameter_name argument.

datastore = Datastore(workspace=ws, name="workspaceblobstore")

reactive_schedule = Schedule.create(ws, name="MyReactiveSchedule", description="Based on input file change.",
                            pipeline_id=pipeline_id, experiment_name=experiment_name, datastore=datastore, data_path_parameter_name="input_data")

创建计划时的可选参数Optional arguments when creating a schedule

除了前面所述的参数,还可以将 status 参数设置为 "Disabled" 以创建非活动计划。In addition to the arguments discussed previously, you may set the status argument to "Disabled" to create an inactive schedule. 最后,可通过 continue_on_step_failure 传递布尔值,重写管道的默认失败行为。Finally, the continue_on_step_failure allows you to pass a Boolean that will override the pipeline's default failure behavior.

将 Azure 逻辑应用用于更复杂的工作流Use Azure Logic Apps for more complex workflows

Azure 逻辑应用支持更复杂的工作流,其集成的广泛程度远远高于 Azure 机器学习管道。Azure Logic Apps supports more complex workflows and is far more broadly integrated than Azure Machine Learning pipelines. 有关详细信息,请参阅从逻辑应用触发机器学习管道的运行See Trigger a run of a Machine Learning pipeline from a Logic App for more information.

查看计划的管道View your scheduled pipelines

在 Web 浏览器中,导航到 Azure 机器学习。In your Web browser, navigate to Azure Machine Learning. 在导航面板的“终结点”部分中,选择“管道终结点” 。From the Endpoints section of the navigation panel, choose Pipeline endpoints. 将转到已在工作区中发布的管道的列表。This takes you to a list of the pipelines published in the Workspace.

AML 的“管道”页

在此页中,可以看到工作区中所有的管道摘要信息,包括名称、说明、状态等。In this page you can see summary information about all the pipelines in the Workspace: names, descriptions, status, and so forth. 单击管道可深入了解其信息。Drill in by clicking in your pipeline. 在由此打开的页面上,显示了有关该管道的更多详细信息,可以向下钻取到单次运行。On the resulting page, there are more details about your pipeline and you may drill down into individual runs.

停用管道Deactivate the pipeline

如果拥有已发布但未计划的 Pipeline,可以使用以下方法禁用它:If you have a Pipeline that is published, but not scheduled, you can disable it with:

pipeline = PublishedPipeline.get(ws, id=pipeline_id)

如果已计划管道,必须先取消计划。If the pipeline is scheduled, you must cancel the schedule first. 从门户或通过运行以下方法检索计划的标识符:Retrieve the schedule's identifier from the portal or by running:

ss = Schedule.list(ws)
for s in ss:

获得要禁用计划的的 schedule_id 后,请运行:Once you have the schedule_id you wish to disable, run:

def stop_by_schedule_id(ws, schedule_id):
    s = next(s for s in Schedule.list(ws) if s.id == schedule_id)
    return s

stop_by_schedule_id(ws, schedule_id)

如果此后再次运行 Schedule.list(ws),应会得到一个空列表。If you then run Schedule.list(ws) again, you should get an empty list.

后续步骤Next steps

本文介绍了如何使用适用于 Python 的 Azure 机器学习 SDK 以两种不同的方式计划管道。In this article, you used the Azure Machine Learning SDK for Python to schedule a pipeline in two different ways. 其中一个计划按运行时间重复。One schedule recurs based on elapsed clock time. 另一个计划在指定的 Datastore 或该存储的目录中有文件更改时运行。The other schedule runs if a file is modified on a specified Datastore or within a directory on that store. 你了解了如何使用门户来检查管道和单次运行。You saw how to use the portal to examine the pipeline and individual runs. 最后,你了解了如何禁用计划以使管道停止运行。Finally, you learned how to disable a schedule so that the pipeline stops running.

有关详细信息,请参阅:For more information, see: