将并行运行步骤升级到 SDK v2

在 SDK v2 中,“并行运行步骤”已作为 parallel job 合并到作业概念中。 并行作业保持相同的目标,它通过将重复性任务分配到强大的多节点计算群集,使用户能够加速其作业执行。 v2 并行作业在并行运行步骤的基础上提供额外的优势:

  • 灵活的接口,允许用户为并行作业定义多个自定义输入和输出。 可将这些输入和输出连接到其他步骤,以在入口脚本中使用或管理其内容
  • 简化输入架构,使用 v2 中 data asset 的概念来替换用作输入的 Dataset。 可以轻松使用本地文件或 Blob 目录 URI 作为并行作业的输入。
  • 还有更多仅适用于 v2 并行作业的强大功能正在开发中。 例如,恢复失败/取消的并行作业,以通过重用成功结果来继续处理失败或未处理的微批,从而免除重复性的工作。

若要将当前的 SDK v1 并行运行步骤升级到 v2,需要

  • 使用 parallel_run_function 通过替换 v1 中的 ParallelRunConfigParallelRunStep 来创建并行作业。
  • 将你的 v1 管道升级到 v2。 然后调用 v2 并行作业作为 v2 管道中的步骤。 有关管道升级的详细信息,请参阅如何将管道从 v1 升级到 v2

注意:用户入口脚本在 v1 并行运行步骤和 v2 并行作业之间兼容。 因此,你可以在升级并行运行作业时继续使用相同的 entry_script.py。

本文比较了 SDK v1 和 SDK v2 中的方案。 在以下示例中,我们将生成一个并行作业来预测管道作业中的输入数据。 你将了解如何生成一个并行作业,以及如何在 SDK v1 和 SDK v2 的管道作业中使用该作业。

先决条件

创建并行步骤

  • SDK v1

    # Create the configuration to wrap the inference script
    from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
    
    parallel_run_config = ParallelRunConfig(
        source_directory=scripts_folder,
        entry_script=script_file,
        mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
        error_threshold=10,
        output_action="append_row",
        append_row_file_name="mnist_outputs.txt",
        environment=batch_env,
        compute_target=compute_target,
        process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
        node_count=2
    )
    
    # Create the Parallel run step
    parallelrun_step = ParallelRunStep(
        name="predict-digits-mnist",
        parallel_run_config=parallel_run_config,
        inputs=[ input_mnist_ds_consumption ],
        output=output_dir,
        allow_reuse=False
    )
    
  • SDK v2

    # parallel job to process file data
    file_batch_inference = parallel_run_function(
        name="file_batch_score",
        display_name="Batch Score with File Dataset",
        description="parallel component for batch score",
        inputs=dict(
            job_data_path=Input(
                type=AssetTypes.MLTABLE,
                description="The data to be split and scored in parallel",
            )
        ),
        outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)),
        input_data="${{inputs.job_data_path}}",
        instance_count=2,
        mini_batch_size="1",
        mini_batch_error_threshold=1,
        max_concurrency_per_instance=1,
        task=RunFunction(
            code="./src",
            entry_script="file_batch_inference.py",
            program_arguments="--job_output_path ${{outputs.job_output_path}}",
            environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1",
        ),
    )
    

在管道中使用并行步骤

  • SDK v1

    # Run pipeline with parallel run step
    from azureml.core import Experiment
    
    pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
    experiment = Experiment(ws, 'digit_identification')
    pipeline_run = experiment.submit(pipeline)
    pipeline_run.wait_for_completion(show_output=True)
    
  • SDK v2

    @pipeline()
    def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model):
    
        prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path)
        # output of file & tabular data should be type MLTable
        prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE
        prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE
    
        batch_inference_with_file_data = file_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.file_output_data
        )
        # use eval_mount mode to handle file data
        batch_inference_with_file_data.inputs.job_data_path.mode = (
            InputOutputModes.EVAL_MOUNT
        )
        batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE
    
        batch_inference_with_tabular_data = tabular_batch_inference(
            job_data_path=prepare_file_tabular_data.outputs.tabular_output_data,
            score_model=pipeline_score_model,
        )
        # use direct mode to handle tabular data
        batch_inference_with_tabular_data.inputs.job_data_path.mode = (
            InputOutputModes.DIRECT
        )
    
        return {
            "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path,
            "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path,
        }
    
    pipeline_job_data_path = Input(
        path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT
    )
    pipeline_score_model = Input(
        path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD
    )
    # create a pipeline
    pipeline_job = parallel_in_pipeline(
        pipeline_job_data_path=pipeline_job_data_path,
        pipeline_score_model=pipeline_score_model,
    )
    pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE
    
    # set pipeline level compute
    pipeline_job.settings.default_compute = "cpu-cluster"
    
    # run pipeline job
    pipeline_job = ml_client.jobs.create_or_update(
        pipeline_job, experiment_name="pipeline_samples"
    )
    

SDK v1 和 SDK v2 中关键功能的映射

SDK v1 中的功能 SDK v2 中的粗略映射
azureml.pipeline.steps.parallelrunconfig
azureml.pipeline.steps.parallelrunstep
azure.ai.ml.parallel
OutputDatasetConfig 输出
dataset as_mount 输入

并行作业配置和设置映射

SDK v1 SDK v2 描述
ParallelRunConfig.environment parallel_run_function.task.environment 运行训练作业的环境。
ParallelRunConfig.entry_script parallel_run_function.task.entry_script 将在多个节点上并行运行的用户脚本。
ParallelRunConfig.error_threshold parallel_run_function.error_threshold 可在此并行作业中忽略的失败微批数。 如果失败的微批计数高于此阈值,则并行作业将标记为失败。

“-1”是默认数字,表示在执行并行作业期间忽略所有失败的微批。
ParallelRunConfig.output_action parallel_run_function.append_row_to 聚合每次运行微批后的所有返回值,并将其输出到此文件中。 可以使用表达式 ${{outputs.<output_name>}} 引用并行作业的输出之一
ParallelRunConfig.node_count parallel_run_function.instance_count 计算目标使用的可选实例或节点数。 默认值为 1。
ParallelRunConfig.process_count_per_node parallel_run_function.max_concurrency_per_instance 每个计算实例具有的最大并行度。
ParallelRunConfig.mini_batch_size parallel_run_function.mini_batch_size 定义每个微批的大小以拆分输入。

如果 input_data 是一个文件夹或一组文件,则此数字定义每个小批次的“文件计数”。 例如 10、100。

如果 input_data 是来自 mltable 的表格数据,则此数字定义每个微批的近似物理大小。 默认单位为字节,值可以接受 100 kb、100 mb 之类的字符串。
ParallelRunConfig.source_directory parallel_run_function.task.code 指向源代码的本地或远程路径。
ParallelRunConfig.description parallel_run_function.description 并行作业的友好说明
ParallelRunConfig.logging_level parallel_run_function.logging_level 在“日志记录”中定义的日志记录级别名称的字符串。 可能的值为“WARNING”、“INFO”和“DEBUG”。 (可选,默认值为“INFO”。)可以通过 PipelineParameter 设置此值。
ParallelRunConfig.run_invocation_timeout parallel_run_function.retry_settings.timeout 执行自定义 run() 函数的超时(以秒为单位)。 如果执行时间高于此阈值,则微批将会中止,并标记为微批失败以触发重试。
ParallelRunConfig.run_max_try parallel_run_function.retry_settings.max_retries 微批失败或超时时的重试次数。 如果所有重试都失败,则微批将标记为无法通过 mini_batch_error_threshold 计算来统计。
ParallelRunConfig.append_row_file_name parallel_run_function.append_row_to append_row_to 设置结合使用。
ParallelRunConfig.allowed_failed_count parallel_run_function.mini_batch_error_threshold 可在此并行作业中忽略的失败微批数。 如果失败的微批计数高于此阈值,则并行作业将标记为失败。

“-1”是默认数字,表示在执行并行作业期间忽略所有失败的微批。
ParallelRunConfig.allowed_failed_percent parallel_run_function.task.program_arguments set
--allowed_failed_percent
与“allowed_failed_count”类似,但此设置使用失败的微批数百分比而不是微批失败计数。

此设置的范围为 [0, 100]。 “100”是默认数字,表示在执行并行作业期间忽略所有失败的微批。
ParallelRunConfig.partition_keys 正在开发。
ParallelRunConfig.environment_variables parallel_run_function.environment_variables 环境变量名称和值的字典。 这些环境变量是在执行用户脚本的进程上设置的。
ParallelRunStep.name parallel_run_function.name 创建的并行作业或组件的名称。
ParallelRunStep.inputs parallel_run_function.inputs 此并行作业使用的输入字典。
-- parallel_run_function.input_data 声明要使用并行作业拆分和处理的数据
ParallelRunStep.output parallel_run_function.outputs 此并行作业的输出。
ParallelRunStep.side_inputs parallel_run_function.inputs inputs 一起定义。
ParallelRunStep.arguments parallel_run_function.task.program_arguments 并行任务的参数。
ParallelRunStep.allow_reuse parallel_run_function.is_deterministic 指定在输入相同的情况下并行作业是否返回相同的输出。

后续步骤

有关详细信息,请参阅以下文档: