排查 ParallelRunStep 问题Troubleshooting the ParallelRunStep

本文介绍在 Azure 机器学习 SDK 中使用 ParallelRunStep 出现错误时如何进行故障排除。In this article, you learn how to troubleshoot when you get errors using the ParallelRunStep class from the Azure Machine Learning SDK.

有关对管道进行故障排除的一般提示,请参阅对机器学习管道进行故障排除For general tips on troubleshooting a pipeline, see Troubleshooting machine learning pipelines.

在本地测试脚本Testing scripts locally

ParallelRunStep 作为 ML 管道中的一个步骤运行。Your ParallelRunStep runs as a step in ML pipelines. 第一步,需要在本地测试脚本You may want to test your scripts locally as a first step.

脚本要求Script requirements

ParallelRunStep 的脚本必须包含两个函数:The script for a ParallelRunStep must contain two functions:

  • init():此函数适用于后续推理的任何成本高昂或常见的准备工作。init(): Use this function for any costly or common preparation for later inference. 例如,使用它将模型加载到全局对象。For example, use it to load the model into a global object. 此函数将在进程开始时调用一次。This function will be called only once at beginning of process.
  • run(mini_batch):将针对每个 mini_batch 实例运行此函数。run(mini_batch): The function will run for each mini_batch instance.
    • mini_batch``ParallelRunStep 将调用 run 方法,并将列表或 pandas DataFrame 作为参数传递给该方法。mini_batch: ParallelRunStep will invoke run method and pass either a list or pandas DataFrame as an argument to the method. 如果输入是 FileDataset,则 mini_batch 中的每个条目都将是文件路径;如果输入是 TabularDataset,则是 pandas DataFrameEach entry in mini_batch will be a file path if input is a FileDataset or a pandas DataFrame if input is a TabularDataset.
    • response:run() 方法应返回 pandas DataFrame 或数组。response: run() method should return a pandas DataFrame or an array. 对于 append_row output_action,这些返回的元素将追加到公共输出文件中。For append_row output_action, these returned elements are appended into the common output file. 对于 summary_only,将忽略元素的内容。For summary_only, the contents of the elements are ignored. 对于所有的输出操作,每个返回的输出元素都指示输入微型批处理中输入元素的一次成功运行。For all output actions, each returned output element indicates one successful run of input element in the input mini-batch. 确保运行结果中包含足够的数据,以便将输入映射到运行输出结果。Make sure that enough data is included in run result to map input to run output result. 运行输出将写入输出文件中,并且不保证按顺序写入,你应使用输出中的某个键将其映射到输入。Run output will be written in output file and not guaranteed to be in order, you should use some key in the output to map it to input.
%%writefile digit_identification.py
# Snippets from a sample script.
# Refer to the accompanying digit_identification.py
# (https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml/machine-learning-pipelines/parallel-run)
# for the implementation script.

import os
import numpy as np
import tensorflow as tf
from PIL import Image
from azureml.core import Model

def init():
    global g_tf_sess

    # Pull down the model from the workspace
    model_path = Model.get_model_path("mnist")

    # Construct a graph to execute
    saver = tf.train.import_meta_graph(os.path.join(model_path, 'mnist-tf.model.meta'))
    g_tf_sess = tf.Session()
    saver.restore(g_tf_sess, os.path.join(model_path, 'mnist-tf.model'))

def run(mini_batch):
    print(f'run method start: {__file__}, run({mini_batch})')
    resultList = []
    in_tensor = g_tf_sess.graph.get_tensor_by_name("network/X:0")
    output = g_tf_sess.graph.get_tensor_by_name("network/output/MatMul:0")

    for image in mini_batch:
        # Prepare each image
        data = Image.open(image)
        np_im = np.array(data).reshape((1, 784))
        # Perform inference
        inference_result = output.eval(feed_dict={in_tensor: np_im}, session=g_tf_sess)
        # Find the best probability, and add it to the result list
        best_result = np.argmax(inference_result)
        resultList.append("{}: {}".format(os.path.basename(image), best_result))

    return resultList

如果推理脚本所在的同一目录中包含另一个文件或文件夹,可以通过查找当前工作目录来引用此文件或文件夹。If you have another file or folder in the same directory as your inference script, you can reference it by finding the current working directory.

script_dir = os.path.realpath(os.path.join(__file__, '..',))
file_path = os.path.join(script_dir, "<file_name>")

ParallelRunConfig 的参数Parameters for ParallelRunConfig

ParallelRunConfigParallelRunStep 实例在 Azure 机器学习管道中的主要配置。ParallelRunConfig is the major configuration for ParallelRunStep instance within the Azure Machine Learning pipeline. 使用它来包装脚本并配置所需的参数,包括所有以下条目:You use it to wrap your script and configure necessary parameters, including all of the following entries:

  • entry_script:作为将在多个节点上并行运行的本地文件路径的用户脚本。entry_script: A user script as a local file path that will be run in parallel on multiple nodes. 如果 source_directory 存在,则使用相对路径。If source_directory is present, use a relative path. 否则,请使用计算机上可访问的任何路径。Otherwise, use any path that's accessible on the machine.
  • mini_batch_size:传递给单个 run() 调用的微型批处理的大小。mini_batch_size: The size of the mini-batch passed to a single run() call. (可选;默认值对于 FileDataset10 个文件,对应 TabularDataset1MB。)(optional; the default value is 10 files for FileDataset and 1MB for TabularDataset.)
    • 对于 FileDataset,它是最小值为 1 的文件数。For FileDataset, it's the number of files with a minimum value of 1. 可以将多个文件合并成一个微型批处理。You can combine multiple files into one mini-batch.
    • 对于 TabularDataset,它是数据的大小。For TabularDataset, it's the size of data. 示例值为 10241024KB10MB1GBExample values are 1024, 1024KB, 10MB, and 1GB. 建议值为 1MBThe recommended value is 1MB. TabularDataset 中的微批永远不会跨越文件边界。The mini-batch from TabularDataset will never cross file boundaries. 例如,如果你有各种大小的 .csv 文件,最小的文件为 100 KB,最大的文件为 10 MB。For example, if you have .csv files with various sizes, the smallest file is 100 KB and the largest is 10 MB. 如果设置 mini_batch_size = 1MB,则大小小于 1 MB 的文件将被视为一个微型批处理。If you set mini_batch_size = 1MB, then files with a size smaller than 1 MB will be treated as one mini-batch. 大小大于 1 MB 的文件将被拆分为多个微型批处理。Files with a size larger than 1 MB will be split into multiple mini-batches.
  • error_threshold:在处理过程中应忽略的 TabularDataset 记录失败数和 FileDataset 文件失败数。error_threshold: The number of record failures for TabularDataset and file failures for FileDataset that should be ignored during processing. 如果整个输入的错误计数超出此值,则作业将中止。If the error count for the entire input goes above this value, the job will be aborted. 错误阈值适用于整个输入,而不适用于发送给 run() 方法的单个微型批处理。The error threshold is for the entire input and not for individual mini-batch sent to the run() method. 范围为 [-1, int.max]The range is [-1, int.max]. -1 部分指示在处理过程中忽略所有失败。The -1 part indicates ignoring all failures during processing.
  • output_action:以下值之一指示将如何组织输出:output_action: One of the following values indicates how the output will be organized:
    • summary_only:用户脚本将存储输出。summary_only: The user script will store the output. ParallelRunStep 仅将输出用于错误阈值计算。ParallelRunStep will use the output only for the error threshold calculation.
    • append_row:对于所有输入,仅在输出文件夹中创建一个文件来追加所有按行分隔的输出。append_row: For all inputs, only one file will be created in the output folder to append all outputs separated by line.
  • append_row_file_name:用于自定义 append_row output_action 的输出文件名(可选;默认值为 parallel_run_step.txt)。append_row_file_name: To customize the output file name for append_row output_action (optional; default value is parallel_run_step.txt).
  • source_directory:文件夹的路径,这些文件夹包含要在计算目标上执行的所有文件(可选)。source_directory: Paths to folders that contain all files to execute on the compute target (optional).
  • compute_target:仅支持 AmlComputecompute_target: Only AmlCompute is supported.
  • node_count:用于运行用户脚本的计算节点数。node_count: The number of compute nodes to be used for running the user script.
  • process_count_per_node:每个节点的进程数。process_count_per_node: The number of processes per node. 最佳做法是设置为一个节点具有的 GPU 或 CPU 数量(可选;默认值为 1)。Best practice is to set to the number of GPU or CPU one node has (optional; default value is 1).
  • environment:Python 环境定义。environment: The Python environment definition. 可以将其配置为使用现有的 Python 环境或设置临时环境。You can configure it to use an existing Python environment or to set up a temporary environment. 定义还负责设置所需的应用程序依赖项(可选)。The definition is also responsible for setting the required application dependencies (optional).
  • logging_level:日志详细程度。logging_level: Log verbosity. 递增详细程度的值为:WARNINGINFODEBUGValues in increasing verbosity are: WARNING, INFO, and DEBUG. (可选;默认值为 INFO(optional; the default value is INFO)
  • run_invocation_timeoutrun() 方法调用超时(以秒为单位)。run_invocation_timeout: The run() method invocation timeout in seconds. (可选;默认值为 60(optional; default value is 60)
  • run_max_try:微型批处理的 run() 的最大尝试次数。run_max_try: Maximum try count of run() for a mini-batch. 如果引发异常,则 run() 失败;如果达到 run_invocation_timeout,则不返回任何内容(可选;默认值为 3)。A run() is failed if an exception is thrown, or nothing is returned when run_invocation_timeout is reached (optional; default value is 3).

可以指定 mini_batch_sizenode_countprocess_count_per_nodelogging_levelrun_invocation_timeoutrun_max_try 作为 PipelineParameter 以便在重新提交管道运行时,可以微调参数值。You can specify mini_batch_size, node_count, process_count_per_node, logging_level, run_invocation_timeout, and run_max_try as PipelineParameter, so that when you resubmit a pipeline run, you can fine-tune the parameter values. 在此示例中,对 mini_batch_sizeProcess_count_per_node 使用 PipelineParameter,并在稍后重新提交运行时更改这些值。In this example, you use PipelineParameter for mini_batch_size and Process_count_per_node and you will change these values when resubmit a run later.

用于创建 ParallelRunStep 的参数Parameters for creating the ParallelRunStep

使用脚本、环境配置和参数创建 ParallelRunStep。Create the ParallelRunStep by using the script, environment configuration, and parameters. 将已附加到工作区的计算目标指定为推理脚本的执行目标。Specify the compute target that you already attached to your workspace as the target of execution for your inference script. 使用 ParallelRunStep 创建批处理推理管道步骤,该步骤采用以下所有参数:Use ParallelRunStep to create the batch inference pipeline step, which takes all the following parameters:

  • name:步骤的名称,但具有以下命名限制:唯一、3-32 个字符和正则表达式 ^[a-z]([-a-z0-9]*[a-z0-9])?$。name: The name of the step, with the following naming restrictions: unique, 3-32 characters, and regex ^[a-z]([-a-z0-9]*[a-z0-9])?$.
  • parallel_run_configParallelRunConfig 对象,如前文所述。parallel_run_config: A ParallelRunConfig object, as defined earlier.
  • inputs:要分区以进行并行处理的一个或多个单类型 Azure 机器学习数据集。inputs: One or more single-typed Azure Machine Learning datasets to be partitioned for parallel processing.
  • side_inputs:无需分区就可以用作辅助输入的一个或多个参考数据或数据集。side_inputs: One or more reference data or datasets used as side inputs without need to be partitioned.
  • output:与输出目录相对应的 PipelineData 对象。output: A PipelineData object that corresponds to the output directory.
  • arguments:传递给用户脚本的参数列表。arguments: A list of arguments passed to the user script. 使用 unknown_args 在入口脚本中检索它们(可选)。Use unknown_args to retrieve them in your entry script (optional).
  • allow_reuse:当使用相同的设置/输入运行时,该步骤是否应重用以前的结果。allow_reuse: Whether the step should reuse previous results when run with the same settings/inputs. 如果此参数为 False,则在管道执行过程中将始终为此步骤生成新的运行。If this parameter is False, a new run will always be generated for this step during pipeline execution. (可选;默认值为 True。)(optional; the default value is True.)
from azureml.pipeline.steps import ParallelRunStep

parallelrun_step = ParallelRunStep(

从远程上下文调试脚本Debugging scripts from remote context

要实现从在本地调试评分脚本到在实际管道中调试评分脚本这一飞跃可能很困难。The transition from debugging a scoring script locally to debugging a scoring script in an actual pipeline can be a difficult leap. 若要了解如何在门户中查找日志,请参阅有关从远程上下文调试脚本的机器学习管道部分For information on finding your logs in the portal, see machine learning pipelines section on debugging scripts from a remote context. 该部分中的信息也适用于 ParallelRunStep。The information in that section also applies to a ParallelRunStep.

例如,日志文件 70_driver_log.txt 包含来自启动 ParallelRunStep 代码的控制器的信息。For example, the log file 70_driver_log.txt contains information from the controller that launches the ParallelRunStep code.

由于 ParallelRunStep 作业具有分布式特性,因此存在来自多个不同源的日志。Because of the distributed nature of ParallelRunStep jobs, there are logs from several different sources. 但是,会创建两个合并文件来提供高级信息:However, two consolidated files are created that provide high-level information:

  • ~/logs/job_progress_overview.txt:此文件提供了有关到目前为止已创建的微型批处理数(也称为任务数)以及已处理的微型批处理数的高级信息。~/logs/job_progress_overview.txt: This file provides a high-level info about the number of mini-batches (also known as tasks) created so far and number of mini-batches processed so far. 最后,它会显示作业的结果。At this end, it shows the result of the job. 如果作业失败,它将显示错误消息以及开始进行故障排除的位置。If the job failed, it will show the error message and where to start the troubleshooting.

  • ~/logs/sys/master_role.txt:此文件提供运行中作业的主节点(也称为业务流程协调程序)视图。~/logs/sys/master_role.txt: This file provides the principal node (also known as the orchestrator) view of the running job. 包括任务创建、进度监视和运行结果。Includes task creation, progress monitoring, the run result.

使用 EntryScript 帮助程序和 print 语句,通过入口脚本生成的日志将显示在以下文件中:Logs generated from entry script using EntryScript helper and print statements will be found in following files:

  • ~/logs/user/entry_script_log/<ip_address>/<process_name>.log.txt:这些文件是使用 EntryScript 帮助程序从 entry_script 写入的日志。~/logs/user/entry_script_log/<ip_address>/<process_name>.log.txt: These files are the logs written from entry_script using EntryScript helper.

  • ~/logs/user/stdout/<ip_address>/<process_name>.stdout.txt:这些文件是 entry_script 的 stdout(例如 print 语句)的日志。~/logs/user/stdout/<ip_address>/<process_name>.stdout.txt: These files are the logs from stdout (e.g. print statement) of entry_script.

  • ~/logs/user/stderr/<ip_address>/<process_name>.stderr.txt:这些文件是 entry_script 的 stderr 的日志。~/logs/user/stderr/<ip_address>/<process_name>.stderr.txt: These files are the logs from stderr of entry_script.

要简要了解脚本中的错误,请参阅以下文件:For a concise understanding of errors in your script there is:

  • ~/logs/user/error.txt:此文件将尝试汇总脚本中的错误。~/logs/user/error.txt: This file will try to summarize the errors in your script.

有关脚本中错误的详细信息,请参阅以下文件:For more information on errors in your script, there is:

  • ~/logs/user/error/:包含加载和运行入口脚本时引发的异常的完整堆栈跟踪。~/logs/user/error/: Contains full stack traces of exceptions thrown while loading and running entry script.

如需全面了解每个节点如何执行评分脚本,请查看每个节点单独的进程日志。When you need a full understanding of how each node executed the score script, look at the individual process logs for each node. 进程日志位于 sys/node 文件夹中,按工作器节点分组:The process logs can be found in the sys/node folder, grouped by worker nodes:

  • ~/logs/sys/node/<ip_address>/<process_name>.txt:此文件提供有关每个微型批处理在工作器拾取或完成它时的详细信息。~/logs/sys/node/<ip_address>/<process_name>.txt: This file provides detailed info about each mini-batch as it's picked up or completed by a worker. 对于每个微型批处理,此文件包括以下内容:For each mini-batch, this file includes:

    • 工作进程的 IP 地址和 PID。The IP address and the PID of the worker process.
    • 总项数、成功处理的项计数和失败的项计数。The total number of items, successfully processed items count, and failed item count.
    • 开始时间、持续时间、处理时间和运行方法时间。The start time, duration, process time and run method time.

此外,还可以找到有关每个工作进程的资源使用情况的信息。You can also find information on the resource usage of the processes for each worker. 此信息采用 CSV 格式,并且位于 ~/logs/sys/perf/<ip_address>/node_resource_usage.csv 中。This information is in CSV format and is located at ~/logs/sys/perf/<ip_address>/node_resource_usage.csv. 有关每个进程的信息可在 ~logs/sys/perf/<ip_address>/processes_resource_usage.csv 下找到。Information about each process is available under ~logs/sys/perf/<ip_address>/processes_resource_usage.csv.

如何从远程上下文中的用户脚本记录?How do I log from my user script from a remote context?

ParallelRunStep 可以基于 process_count_per_node 在一个节点上运行多个进程。ParallelRunStep may run multiple processes on one node based on process_count_per_node. 为了组织节点上每个进程的日志并结合 print 和 log 语句,我们建议使用 ParallelRunStep 记录器,如下所示。In order to organize logs from each process on node and combine print and log statement, we recommend using ParallelRunStep logger as shown below. 从 EntryScript 获取记录器,使日志显示在门户的 logs/user 文件夹中。You get a logger from EntryScript and make the logs show up in logs/user folder in the portal.

使用记录器的示例入口脚本:A sample entry script using the logger:

from azureml_user.parallel_run import EntryScript

def init():
    """ Initialize the node."""
    entry_script = EntryScript()
    logger = entry_script.logger
    logger.debug("This will show up in files under logs/user on the Azure portal.")

def run(mini_batch):
    """ Accept and return the list back."""
    # This class is in singleton pattern and will return same instance as the one in init()
    entry_script = EntryScript()
    logger = entry_script.logger
    logger.debug(f"{__file__}: {mini_batch}.")

    return mini_batch

如何将端输入(如包含查找表的单个或多个文件)传递到所有工作器?How could I pass a side input such as, a file or file(s) containing a lookup table, to all my workers?

用户可以使用 ParalleRunStep 的 side_inputs 参数将引用数据传递给脚本。User can pass reference data to script using side_inputs parameter of ParalleRunStep. 作为 side_inputs 提供的所有数据集将装载到每个工作器节点上。All datasets provided as side_inputs will be mounted on each worker node. 用户可以通过传递参数获取装载的位置。User can get the location of mount by passing argument.

构造包含引用数据的数据集,并将其注册到工作区。Construct a Dataset containing the reference data and register it with your workspace. 将其传递到 ParallelRunStepside_inputs 参数。Pass it to the side_inputs parameter of your ParallelRunStep. 此外,还可以在 arguments 节中添加其路径,以便轻松访问其已装载的路径:Additionally, you can add its path in the arguments section to easily access its mounted path:

label_config = label_ds.as_named_input("labels_input")
batch_score_step = ParallelRunStep(
    arguments=["--labels_dir", label_config],

之后,可以在推理脚本中访问它(例如在 init() 方法中),如下所示:After that you can access it in your inference script (for example, in your init() method) as follows:

parser = argparse.ArgumentParser()
parser.add_argument('--labels_dir', dest="labels_dir", required=True)
args, _ = parser.parse_known_args()

labels_path = args.labels_dir

如何通过服务主体身份验证使用输入数据集?How to use input datasets with service principal authentication?

用户可以通过工作区中使用的服务主体身份验证传递输入数据集。User can pass input datasets with service principal authentication used in workspace. 若要在 ParallelRunStep 中使用此类数据集,需要为其注册该数据集以构造 ParallelRunStep 配置。Using such dataset in ParallelRunStep requires that dataset to be registered for it to construct ParallelRunStep configuration.

service_principal = ServicePrincipalAuthentication(
ws = Workspace(
default_blob_store = ws.get_default_datastore() # or Datastore(ws, '_*_datastore-name_*_') 
ds = Dataset.File.from_files(default_blob_store, '_*path**_')
registered_ds = ds.register(ws, '_*_dataset-name_*_', create_new_version=True)

后续步骤Next steps

_ 请参阅这些展示了 Azure 机器学习管道的 Jupyter 笔记本_ See these Jupyter notebooks demonstrating Azure Machine Learning pipelines

  • 请查看 SDK 参考,获取有关 azureml-pipeline-steps 包的帮助。See the SDK reference for help with the azureml-pipeline-steps package. 查看 ParallelRunStep 类的参考文档View reference documentation for ParallelRunStep class.

  • 按照高级教程操作,将管道与 ParallelRunStep 配合使用。Follow the advanced tutorial on using pipelines with ParallelRunStep. 本教程演示如何将另一个文件作为旁路输入。The tutorial shows how to pass another file as a side input.