将并行运行步骤升级到 SDK v2
在 SDK v2 中,“并行运行步骤”已作为 parallel job
合并到作业概念中。 并行作业保持相同的目标,它通过将重复性任务分配到强大的多节点计算群集,使用户能够加速其作业执行。 v2 并行作业在并行运行步骤的基础上提供额外的优势:
- 灵活的接口,允许用户为并行作业定义多个自定义输入和输出。 可将这些输入和输出连接到其他步骤,以在入口脚本中使用或管理其内容
- 简化输入架构,使用 v2 中
data asset
的概念来替换用作输入的Dataset
。 可以轻松使用本地文件或 Blob 目录 URI 作为并行作业的输入。 - 还有更多仅适用于 v2 并行作业的强大功能正在开发中。 例如,恢复失败/取消的并行作业,以通过重用成功结果来继续处理失败或未处理的微批,从而免除重复性的工作。
若要将当前的 SDK v1 并行运行步骤升级到 v2,需要
- 使用
parallel_run_function
通过替换 v1 中的ParallelRunConfig
和ParallelRunStep
来创建并行作业。 - 将你的 v1 管道升级到 v2。 然后调用 v2 并行作业作为 v2 管道中的步骤。 有关管道升级的详细信息,请参阅如何将管道从 v1 升级到 v2。
注意:用户入口脚本在 v1 并行运行步骤和 v2 并行作业之间兼容。 因此,你可以在升级并行运行作业时继续使用相同的 entry_script.py。
本文比较了 SDK v1 和 SDK v2 中的方案。 在以下示例中,我们将生成一个并行作业来预测管道作业中的输入数据。 你将了解如何生成一个并行作业,以及如何在 SDK v1 和 SDK v2 的管道作业中使用该作业。
先决条件
- 准备 SDK v2 环境:安装适用于 Python 的 Azure 机器学习 SDK v2
- 了解 SDK v2 管道的基础知识:如何使用 Python SDK v2 创建 Azure 机器学习管道
创建并行步骤
SDK v1
# Create the configuration to wrap the inference script from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig parallel_run_config = ParallelRunConfig( source_directory=scripts_folder, entry_script=script_file, mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"), error_threshold=10, output_action="append_row", append_row_file_name="mnist_outputs.txt", environment=batch_env, compute_target=compute_target, process_count_per_node=PipelineParameter(name="process_count_param", default_value=2), node_count=2 ) # Create the Parallel run step parallelrun_step = ParallelRunStep( name="predict-digits-mnist", parallel_run_config=parallel_run_config, inputs=[ input_mnist_ds_consumption ], output=output_dir, allow_reuse=False )
SDK v2
# parallel job to process file data file_batch_inference = parallel_run_function( name="file_batch_score", display_name="Batch Score with File Dataset", description="parallel component for batch score", inputs=dict( job_data_path=Input( type=AssetTypes.MLTABLE, description="The data to be split and scored in parallel", ) ), outputs=dict(job_output_path=Output(type=AssetTypes.MLTABLE)), input_data="${{inputs.job_data_path}}", instance_count=2, mini_batch_size="1", mini_batch_error_threshold=1, max_concurrency_per_instance=1, task=RunFunction( code="./src", entry_script="file_batch_inference.py", program_arguments="--job_output_path ${{outputs.job_output_path}}", environment="azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu:1", ), )
在管道中使用并行步骤
SDK v1
# Run pipeline with parallel run step from azureml.core import Experiment pipeline = Pipeline(workspace=ws, steps=[parallelrun_step]) experiment = Experiment(ws, 'digit_identification') pipeline_run = experiment.submit(pipeline) pipeline_run.wait_for_completion(show_output=True)
SDK v2
@pipeline() def parallel_in_pipeline(pipeline_job_data_path, pipeline_score_model): prepare_file_tabular_data = prepare_data(input_data=pipeline_job_data_path) # output of file & tabular data should be type MLTable prepare_file_tabular_data.outputs.file_output_data.type = AssetTypes.MLTABLE prepare_file_tabular_data.outputs.tabular_output_data.type = AssetTypes.MLTABLE batch_inference_with_file_data = file_batch_inference( job_data_path=prepare_file_tabular_data.outputs.file_output_data ) # use eval_mount mode to handle file data batch_inference_with_file_data.inputs.job_data_path.mode = ( InputOutputModes.EVAL_MOUNT ) batch_inference_with_file_data.outputs.job_output_path.type = AssetTypes.MLTABLE batch_inference_with_tabular_data = tabular_batch_inference( job_data_path=prepare_file_tabular_data.outputs.tabular_output_data, score_model=pipeline_score_model, ) # use direct mode to handle tabular data batch_inference_with_tabular_data.inputs.job_data_path.mode = ( InputOutputModes.DIRECT ) return { "pipeline_job_out_file": batch_inference_with_file_data.outputs.job_output_path, "pipeline_job_out_tabular": batch_inference_with_tabular_data.outputs.job_output_path, } pipeline_job_data_path = Input( path="./dataset/", type=AssetTypes.MLTABLE, mode=InputOutputModes.RO_MOUNT ) pipeline_score_model = Input( path="./model/", type=AssetTypes.URI_FOLDER, mode=InputOutputModes.DOWNLOAD ) # create a pipeline pipeline_job = parallel_in_pipeline( pipeline_job_data_path=pipeline_job_data_path, pipeline_score_model=pipeline_score_model, ) pipeline_job.outputs.pipeline_job_out_tabular.type = AssetTypes.URI_FILE # set pipeline level compute pipeline_job.settings.default_compute = "cpu-cluster" # run pipeline job pipeline_job = ml_client.jobs.create_or_update( pipeline_job, experiment_name="pipeline_samples" )
SDK v1 和 SDK v2 中关键功能的映射
SDK v1 中的功能 | SDK v2 中的粗略映射 |
---|---|
azureml.pipeline.steps.parallelrunconfig azureml.pipeline.steps.parallelrunstep |
azure.ai.ml.parallel |
OutputDatasetConfig | 输出 |
dataset as_mount | 输入 |
并行作业配置和设置映射
SDK v1 | SDK v2 | 描述 |
---|---|---|
ParallelRunConfig.environment | parallel_run_function.task.environment | 运行训练作业的环境。 |
ParallelRunConfig.entry_script | parallel_run_function.task.entry_script | 将在多个节点上并行运行的用户脚本。 |
ParallelRunConfig.error_threshold | parallel_run_function.error_threshold | 可在此并行作业中忽略的失败微批数。 如果失败的微批计数高于此阈值,则并行作业将标记为失败。 “-1”是默认数字,表示在执行并行作业期间忽略所有失败的微批。 |
ParallelRunConfig.output_action | parallel_run_function.append_row_to | 聚合每次运行微批后的所有返回值,并将其输出到此文件中。 可以使用表达式 ${{outputs.<output_name>}} 引用并行作业的输出之一 |
ParallelRunConfig.node_count | parallel_run_function.instance_count | 计算目标使用的可选实例或节点数。 默认值为 1。 |
ParallelRunConfig.process_count_per_node | parallel_run_function.max_concurrency_per_instance | 每个计算实例具有的最大并行度。 |
ParallelRunConfig.mini_batch_size | parallel_run_function.mini_batch_size | 定义每个微批的大小以拆分输入。 如果 input_data 是一个文件夹或一组文件,则此数字定义每个小批次的“文件计数”。 例如 10、100。 如果 input_data 是来自 mltable 的表格数据,则此数字定义每个微批的近似物理大小。 默认单位为字节,值可以接受 100 kb、100 mb 之类的字符串。 |
ParallelRunConfig.source_directory | parallel_run_function.task.code | 指向源代码的本地或远程路径。 |
ParallelRunConfig.description | parallel_run_function.description | 并行作业的友好说明 |
ParallelRunConfig.logging_level | parallel_run_function.logging_level | 在“日志记录”中定义的日志记录级别名称的字符串。 可能的值为“WARNING”、“INFO”和“DEBUG”。 (可选,默认值为“INFO”。)可以通过 PipelineParameter 设置此值。 |
ParallelRunConfig.run_invocation_timeout | parallel_run_function.retry_settings.timeout | 执行自定义 run() 函数的超时(以秒为单位)。 如果执行时间高于此阈值,则微批将会中止,并标记为微批失败以触发重试。 |
ParallelRunConfig.run_max_try | parallel_run_function.retry_settings.max_retries | 微批失败或超时时的重试次数。 如果所有重试都失败,则微批将标记为无法通过 mini_batch_error_threshold 计算来统计。 |
ParallelRunConfig.append_row_file_name | parallel_run_function.append_row_to | 与 append_row_to 设置结合使用。 |
ParallelRunConfig.allowed_failed_count | parallel_run_function.mini_batch_error_threshold | 可在此并行作业中忽略的失败微批数。 如果失败的微批计数高于此阈值,则并行作业将标记为失败。 “-1”是默认数字,表示在执行并行作业期间忽略所有失败的微批。 |
ParallelRunConfig.allowed_failed_percent | parallel_run_function.task.program_arguments set --allowed_failed_percent |
与“allowed_failed_count”类似,但此设置使用失败的微批数百分比而不是微批失败计数。 此设置的范围为 [0, 100]。 “100”是默认数字,表示在执行并行作业期间忽略所有失败的微批。 |
ParallelRunConfig.partition_keys | 正在开发。 | |
ParallelRunConfig.environment_variables | parallel_run_function.environment_variables | 环境变量名称和值的字典。 这些环境变量是在执行用户脚本的进程上设置的。 |
ParallelRunStep.name | parallel_run_function.name | 创建的并行作业或组件的名称。 |
ParallelRunStep.inputs | parallel_run_function.inputs | 此并行作业使用的输入字典。 |
-- | parallel_run_function.input_data | 声明要使用并行作业拆分和处理的数据 |
ParallelRunStep.output | parallel_run_function.outputs | 此并行作业的输出。 |
ParallelRunStep.side_inputs | parallel_run_function.inputs | 与 inputs 一起定义。 |
ParallelRunStep.arguments | parallel_run_function.task.program_arguments | 并行任务的参数。 |
ParallelRunStep.allow_reuse | parallel_run_function.is_deterministic | 指定在输入相同的情况下并行作业是否返回相同的输出。 |
后续步骤
有关详细信息,请参阅以下文档: