教程:生成用于批量评分的 Azure 机器学习管道Tutorial: Build an Azure Machine Learning pipeline for batch scoring

适用于:是基本版是企业版               (升级到企业版APPLIES TO: yesBasic edition yesEnterprise edition                    (Upgrade to Enterprise edition)

此高级教程介绍如何在 Azure 机器学习中构建管道来运行批量评分作业。In this advanced tutorial, you learn how to build a pipeline in Azure Machine Learning to run a batch scoring job. 机器学习管道可以优化工作流以提高其速度、可移植性和可重用性,使你能够将工作重心放在机器学习上,而不必关注基础结构和自动化。Machine learning pipelines optimize your workflow with speed, portability, and reuse, so you can focus on machine learning instead of infrastructure and automation. 生成并发布管道后,你将配置一个 REST 终结点,用于从任何平台上的任何 HTTP 库触发该管道。After you build and publish a pipeline, you configure a REST endpoint that you can use to trigger the pipeline from any HTTP library on any platform.

本示例使用 Tensorflow 中实现的预先训练的 Inception-V3 卷积神经网络模型来对不带标签的图像进行分类。The example uses a pretrained Inception-V3 convolutional neural network model implemented in Tensorflow to classify unlabeled images. 详细了解机器学习管道Learn more about machine learning pipelines.

在本教程中,请完成以下任务:In this tutorial, you complete the following tasks:

  • 配置工作区Configure workspace
  • 下载并存储示例数据Download and store sample data
  • 创建用于提取和输出数据的数据集对象Create dataset objects to fetch and output data
  • 下载、准备模型并将其注册到工作区中Download, prepare, and register the model in your workspace
  • 预配计算目标并创建评分脚本Provision compute targets and create a scoring script
  • 使用 ParallelRunStep 类进行异步批处理评分Use the ParallelRunStep class for async batch scoring
  • 生成、运行并发布管道Build, run, and publish a pipeline
  • 为管道启用 REST 终结点Enable a REST endpoint for the pipeline

如果没有 Azure 订阅,请在开始前创建一个试用帐户。If you don’t have an Azure subscription, create a trial account before you begin. 立即试用试用帐户功能。Try the trial account today.


  • 如果你没有 Azure 机器学习工作区或笔记本虚拟机,请完成设置教程的第 1 部分If you don't already have an Azure Machine Learning workspace or notebook virtual machine, complete Part 1 of the setup tutorial.
  • 完成设置教程后,使用同一笔记本服务器打开 tutorials/machine-learning-pipelines-advanced/tutorial-pipeline-batch-scoring-classification.ipynb 笔记本。When you finish the setup tutorial, use the same notebook server to open the tutorials/machine-learning-pipelines-advanced/tutorial-pipeline-batch-scoring-classification.ipynb notebook.

如果要在自己的本地环境中运行设置教程,可以访问 GitHub 上的教程。If you want to run the setup tutorial in your own local environment, you can access the tutorial on GitHub. 运行 pip install azureml-sdk[notebooks] azureml-pipeline-core azureml-pipeline-steps pandas requests 以获取所需的包。Run pip install azureml-sdk[notebooks] azureml-pipeline-core azureml-pipeline-steps pandas requests to get the required packages.

配置工作区并创建数据存储Configure workspace and create a datastore

从现有的 Azure 机器学习工作区创建工作区对象。Create a workspace object from the existing Azure Machine Learning workspace.

from azureml.core import Workspace
ws = Workspace.from_config()


此代码片段需要将工作区配置保存到当前目录或其父目录中。This code snippet expects the workspace configuration to be saved in the current directory or its parent. 有关创建工作区的详细信息,请参阅创建和管理 Azure 机器学习工作区For more information on creating a workspace, see Create and manage Azure Machine Learning workspaces. 有关将配置保存到文件的详细信息,请参阅创建工作区配置文件For more information on saving the configuration to file, see Create a workspace configuration file.

为示例图像创建数据存储Create a datastore for sample images

pipelinedata 帐户中,从 sampledata 公共 Blob 容器获取 ImageNet 评估公共数据示例。On the pipelinedata account, get the ImageNet evaluation public data sample from the sampledata public blob container. 调用 register_azure_blob_container() 可使数据可用于名为 images_datastore 的工作区。Call register_azure_blob_container() to make the data available to the workspace under the name images_datastore. 然后,将工作区的默认数据存储设置为输出数据存储。Then, set the workspace default datastore as the output datastore. 使用输出数据存储在管道中为输出评分。Use the output datastore to score output in the pipeline.

有关访问数据的详细信息,请参阅如何访问数据For more information on accessing data, see How to access data.

from azureml.core.datastore import Datastore

batchscore_blob = Datastore.register_azure_blob_container(ws, 

def_data_store = ws.get_default_datastore()

创建数据集对象Create dataset objects

生成管道时,将使用 Dataset 对象从工作区数据存储读取数据,并使用 PipelineData 对象在管道步骤之间传输中间数据。When building pipelines, Dataset objects are used for reading data from workspace datastores, and PipelineData objects are used for transferring intermediate data between pipeline steps.


本教程中的批量评分示例只使用一个管道步骤。The batch scoring example in this tutorial uses only one pipeline step. 在包含多个步骤的用例中,典型流包括以下步骤:In use cases that have multiple steps, the typical flow will include these steps:

  1. 使用 Dataset 对象作为提取原始数据的输入,执行某种转换,然后输出 PipelineData 对象。 Use Dataset objects as inputs to fetch raw data, perform some transformation, and then output a PipelineData object.

  2. 使用上一步骤中的 PipelineData 输出对象作为输入对象 。Use the PipelineData output object in the preceding step as an input object. 针对后续步骤重复此过程。Repeat it for subsequent steps.

在此场景中,你将创建与输入图像和分类标签(y-test 值)的数据存储目录相对应的 Dataset 对象。In this scenario, you create Dataset objects that correspond to the datastore directories for both the input images and the classification labels (y-test values). 此外,将为批量评分输出数据创建一个 PipelineData 对象。You also create a PipelineData object for the batch scoring output data.

from azureml.core.dataset import Dataset
from azureml.pipeline.core import PipelineData

input_images = Dataset.File.from_files((batchscore_blob, "batchscoring/images/"))
label_ds = Dataset.File.from_files((batchscore_blob, "batchscoring/labels/"))
output_dir = PipelineData(name="scores", 

接下来,将数据集注册到工作区。Next, register the datasets to the workspace.

input_images = input_images.register(workspace = ws, name = "input_images")
label_ds = label_ds.register(workspace = ws, name = "label_ds")

下载并注册模型Download and register the model

下载预先训练的 Tensorflow 模型用于管道中的批量评分。Download the pretrained Tensorflow model to use it for batch scoring in a pipeline. 首先创建一个用于存储模型的本地目录。First, create a local directory where you store the model. 然后下载并提取该模型。Then, download and extract the model.

import os
import tarfile
import urllib.request

if not os.path.isdir("models"):
response = urllib.request.urlretrieve("http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz", "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")

接下来,将模型注册到工作区,以便能够在管道流程中轻松检索该模型。Next, register the model to your workspace, so you can easily retrieve the model in the pipeline process. register() 静态函数中,model_name 参数是用于在整个 SDK 中查找模型的键。In the register() static function, the model_name parameter is the key you use to locate your model throughout the SDK.

from azureml.core.model import Model
model = Model.register(model_path="models/inception_v3.ckpt",
                       tags={"pretrained": "inception"},
                       description="Imagenet trained tensorflow inception",

创建并附加远程计算目标Create and attach the remote compute target

机器学习管道无法在本地运行,因此你需要在云资源或远程计算目标上运行这些管道。Machine learning pipelines can't be run locally, so you run them on cloud resources or remote compute targets. 远程计算目标是可重用的虚拟计算环境,可在其中运行试验和机器学习工作流。A remote compute target is a reusable virtual compute environment where you run experiments and machine learning workflows.

运行以下代码创建支持 GPU 的 AmlCompute 目标,并将其附加到工作区。Run the following code to create a GPU-enabled AmlCompute target, and then attach it to your workspace. 有关计算目标的详细信息,请参阅概念文章For more information about compute targets, see the conceptual article.

from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.exceptions import ComputeTargetException
compute_name = "gpu-cluster"

# checks to see if compute target already exists in workspace, else create it
    compute_target = ComputeTarget(workspace=ws, name=compute_name)
except ComputeTargetException:
    config = AmlCompute.provisioning_configuration(vm_size="STANDARD_NC6",

    compute_target = ComputeTarget.create(workspace=ws, name=compute_name, provisioning_configuration=config)
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

编写评分脚本Write a scoring script

若要执行评分,请创建名为 batch_scoring.py 的批量评分脚本,并将其写入当前目录。To do the scoring, create a batch scoring script called batch_scoring.py, and then write it to the current directory. 该脚本将提取输入图像,应用分类模型,然后将预测结果输出到结果文件中。The script takes input images, applies the classification model, and then outputs the predictions to a results file.

batch_scoring.py 脚本采用以下参数,这些参数将从稍后创建的 ParallelRunStep 传递:The batch_scoring.py script takes the following parameters, which get passed from the ParallelRunStep you create later:

  • --model_name:所用模型的名称。--model_name: The name of the model being used.
  • --labels_dirlabels.txt 文件的位置。--labels_dir: The location of the labels.txt file.

管道基础结构使用 ArgumentParser 类将参数传入管道步骤。The pipeline infrastructure uses the ArgumentParser class to pass parameters into pipeline steps. 例如,在以下代码中,为第一个参数 --model_name 指定了属性标识符 model_nameFor example, in the following code, the first argument --model_name is given the property identifier model_name. init() 函数中,使用 Model.get_model_path(args.model_name) 访问此属性。In the init() function, Model.get_model_path(args.model_name) is used to access this property.

%%writefile batch_scoring.py

import os
import argparse
import datetime
import time
import tensorflow as tf
from math import ceil
import numpy as np
import shutil
from tensorflow.contrib.slim.python.slim.nets import inception_v3

from azureml.core import Run
from azureml.core.model import Model
from azureml.core.dataset import Dataset

slim = tf.contrib.slim

image_size = 299
num_channel = 3

def get_class_label_dict(labels_dir):
    label = []
    labels_path = os.path.join(labels_dir, 'labels.txt')
    proto_as_ascii_lines = tf.gfile.GFile(labels_path).readlines()
    for l in proto_as_ascii_lines:
    return label

def init():
    global g_tf_sess, probabilities, label_dict, input_images

    parser = argparse.ArgumentParser(description="Start a tensorflow model serving")
    parser.add_argument('--model_name', dest="model_name", required=True)
    parser.add_argument('--labels_dir', dest="labels_dir", required=True)
    args, _ = parser.parse_known_args()

    label_dict = get_class_label_dict(args.labels_dir)
    classes_num = len(label_dict)

    with slim.arg_scope(inception_v3.inception_v3_arg_scope()):
        input_images = tf.placeholder(tf.float32, [1, image_size, image_size, num_channel])
        logits, _ = inception_v3.inception_v3(input_images,
        probabilities = tf.argmax(logits, 1)

    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True
    g_tf_sess = tf.Session(config=config)

    model_path = Model.get_model_path(args.model_name)
    saver = tf.train.Saver()
    saver.restore(g_tf_sess, model_path)

def file_to_tensor(file_path):
    image_string = tf.read_file(file_path)
    image = tf.image.decode_image(image_string, channels=3)

    image.set_shape([None, None, None])
    image = tf.image.resize_images(image, [image_size, image_size])
    image = tf.divide(tf.subtract(image, [0]), [255])
    image.set_shape([image_size, image_size, num_channel])
    return image

def run(mini_batch):
    result_list = []
    for file_path in mini_batch:
        test_image = file_to_tensor(file_path)
        out = g_tf_sess.run(test_image)
        result = g_tf_sess.run(probabilities, feed_dict={input_images: [out]})
        result_list.append(os.path.basename(file_path) + ": " + label_dict[result[0]])
    return result_list


本教程中的管道只有一个步骤,它会将输出写入某个文件。The pipeline in this tutorial has only one step, and it writes the output to a file. 对于多步骤管道,你也可以使用 ArgumentParser 来定义要将输出数据写入到的目录,以便将其输入到后续步骤。For multi-step pipelines, you also use ArgumentParser to define a directory to write output data for input to subsequent steps. 有关使用 ArgumentParser 设计模式在多个管道步骤之间传递数据的示例,请参阅笔记本For an example of passing data between multiple pipeline steps by using the ArgumentParser design pattern, see the notebook.

构建管道Build the pipeline

在运行管道之前,请创建一个用于定义 Python 环境的对象,并创建 batch_scoring.py 脚本所需的依赖项。Before you run the pipeline, create an object that defines the Python environment and creates the dependencies that your batch_scoring.py script requires. 所需的主要依赖项是 Tensorflow,但你还需安装 ParallelRunStep 所需的 azureml-coreazureml-dataprep[fuse]The main dependency required is Tensorflow, but you also install azureml-core and azureml-dataprep[fuse] which are required by ParallelRunStep. 另外,指定 Docker 和 Docker-GPU 支持。Also, specify Docker and Docker-GPU support.

from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_GPU_IMAGE

cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.15.2",
                                            "azureml-core", "azureml-dataprep[fuse]"])
env = Environment(name="parallelenv")
env.python.conda_dependencies = cd
env.docker.base_image = DEFAULT_GPU_IMAGE

创建用于包装脚本的配置Create the configuration to wrap the script

使用脚本、环境配置和参数创建管道步骤。Create the pipeline step using the script, environment configuration, and parameters. 指定已附加到工作区的计算目标。Specify the compute target you already attached to your workspace.

from azureml.pipeline.steps import ParallelRunConfig

parallel_run_config = ParallelRunConfig(

创建管道步骤Create the pipeline step

管道步骤是一个对象,用于封装运行管道所需的任何内容,其中包括:A pipeline step is an object that encapsulates everything you need to run a pipeline, including:

  • 环境和依赖项设置Environment and dependency settings
  • 要在其上运行管道的计算资源The compute resource to run the pipeline on
  • 输入和输出数据,以及任何自定义参数Input and output data, and any custom parameters
  • 对执行步骤期间要运行的脚本或 SDK 逻辑的引用Reference to a script or SDK logic to run during the step

有多个类继承自父类 PipelineStepMultiple classes inherit from the parent class PipelineStep. 你可以选择适当的类,以使用特定的框架和堆栈生成步骤。You can choose classes to use specific frameworks or stacks to build a step. 在此示例中,将通过自定义 Python 脚本使用 ParallelRunStep 类定义步骤逻辑。In this example, you use the ParallelRunStep class to define your step logic by using a custom Python script. 如果脚本的某个自变量是步骤的输入或步骤的输出,则必须分别在 arguments 数组以及 inputoutput 参数中定义该自变量。 If an argument to your script is either an input to the step or an output of the step, the argument must be defined both in the arguments array and in either the input or the output parameter, respectively.

如果存在多个步骤,outputs 数组中的某个对象引用可用作后续管道步骤的输入。In scenarios where there is more than one step, an object reference in the outputs array becomes available as an input for a subsequent pipeline step.

from azureml.pipeline.steps import ParallelRunStep
from datetime import datetime

parallel_step_name = "batchscoring-" + datetime.now().strftime("%Y%m%d%H%M")

label_config = label_ds.as_named_input("labels_input")

batch_score_step = ParallelRunStep(
    arguments=["--model_name", "inception",
               "--labels_dir", label_config],

有关可对不同步骤类型使用的所有类的列表,请参阅步骤包For a list of all the classes you can use for different step types, see the steps package.

提交管道Submit the pipeline

现在请运行管道。Now, run the pipeline. 首先,使用工作区引用和创建的管道步骤创建一个 Pipeline 对象。First, create a Pipeline object by using your workspace reference and the pipeline step you created. steps 参数是步骤数组。The steps parameter is an array of steps. 在本例中,批量评分只有一个步骤。In this case, there's only one step for batch scoring. 若要生成包含多个步骤的管道,请将步骤按顺序放入此数组。To build pipelines that have multiple steps, place the steps in order in this array.

接下来,使用 Experiment.submit() 函数提交管道以供执行。Next, use the Experiment.submit() function to submit the pipeline for execution. wait_for_completion 函数将在管道生成过程中输出日志。The wait_for_completion function outputs logs during the pipeline build process. 可以使用日志来查看当前进度。You can use the logs to see current progress.


首次管道运行需要大约 15 分钟。The first pipeline run takes roughly 15 minutes. 必须下载所有依赖项、创建 Docker 映像,并预配和创建 Python 环境。All dependencies must be downloaded, a Docker image is created, and the Python environment is provisioned and created. 再次运行管道所花费的时间会大幅减少,因为会重复使用这些资源,而无需再次创建。Running the pipeline again takes significantly less time because those resources are reused instead of created. 但是,管道的总运行时间取决于脚本的工作负荷,以及每个管道步骤中运行的进程数。However, total run time for the pipeline depends on the workload of your scripts and the processes that are running in each pipeline step.

from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline)

下载并查看输出Download and review output

运行以下代码下载通过 batch_scoring.py 脚本创建的输出文件。Run the following code to download the output file that's created from the batch_scoring.py script. 然后浏览评分结果。Then, explore the scoring results.

import pandas as pd

batch_run = next(pipeline_run.get_children())
batch_output = batch_run.get_output_data("scores")

for root, dirs, files in os.walk("inception_results"):
    for file in files:
        if file.endswith("parallel_run_step.txt"):
            result_file = os.path.join(root, file)

df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
print("Prediction has ", df.shape[0], " rows")

从 REST 终结点发布和运行Publish and run from a REST endpoint

运行以下代码,将管道发布到工作区。Run the following code to publish the pipeline to your workspace. 在 Azure 机器学习工作室的工作区中,可以看到管道的元数据,包括运行历史记录和持续时间。In your workspace in Azure Machine Learning studio, you can see metadata for the pipeline, including run history and durations. 还可以从工作室手动运行管道。You can also run the pipeline manually from the studio.

发布管道会启用一个 REST 终结点,用于从任何平台上的任何 HTTP 库重新运行该管道。Publishing the pipeline enables a REST endpoint that you can use to run the pipeline from any HTTP library on any platform.

published_pipeline = pipeline_run.publish_pipeline(
    name="Inception_v3_scoring", description="Batch scoring using Inception v3 model", version="1.0")


若要从 REST 终结点运行管道,需要获取 OAuth2 Bearer-type 身份验证标头。To run the pipeline from the REST endpoint, you need an OAuth2 Bearer-type authentication header. 以下示例使用交互式身份验证(用于演示目的),但对于大多数需要自动身份验证或无头身份验证的生产方案,请使用服务主体身份验证,如此文中所述The following example uses interactive authentication (for illustration purposes), but for most production scenarios that require automated or headless authentication, use service principal authentication as described in this article.

服务主体身份验证涉及到在 Azure Active Directory 中创建应用注册。Service principal authentication involves creating an App Registration in Azure Active Directory. 首先生成客户端机密,然后为服务主体授予对机器学习工作区的角色访问权限。First, you generate a client secret, and then you grant your service principal role access to your machine learning workspace. 使用 ServicePrincipalAuthentication 类来管理身份验证流。Use the ServicePrincipalAuthentication class to manage your authentication flow.

InteractiveLoginAuthenticationServicePrincipalAuthentication 均继承自 AbstractAuthenticationBoth InteractiveLoginAuthentication and ServicePrincipalAuthentication inherit from AbstractAuthentication. 在这两种情况下,请以相同的方式使用 get_authentication_header() 函数来提取标头:In both cases, use the get_authentication_header() function in the same way to fetch the header:

from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()

从已发布的管道对象的 endpoint 属性获取 REST URL。Get the REST URL from the endpoint property of the published pipeline object. 也可以在 Azure 机器学习工作室的工作区中找到该 REST URL。You can also find the REST URL in your workspace in Azure Machine Learning studio.

对终结点生成 HTTP POST 请求。Build an HTTP POST request to the endpoint. 在请求中指定身份验证标头。Specify your authentication header in the request. 添加包含试验名称的 JSON 有效负载对象。Add a JSON payload object that has the experiment name.

发出触发运行的请求。Make the request to trigger the run. 包含相应的代码用于访问响应字典中的 Id 密钥以获取运行 ID 的值。Include code to access the Id key from the response dictionary to get the value of the run ID.

import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         json={"ExperimentName": "batch_scoring",
                               "ParameterAssignments": {"process_count_per_node": 6}})
run_id = response.json()["Id"]

使用运行 ID 监视新运行的状态。Use the run ID to monitor the status of the new run. 新的运行需要花费 10-15 分钟来完成。The new run takes another 10-15 min to finish.

新的运行类似于在本教程中前面运行的管道。The new run will look similar to the pipeline you ran earlier in the tutorial. 可以选择不查看完整输出。You can choose not to view the full output.

from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments["batch_scoring"], run_id)

清理资源Clean up resources

如果你打算运行其他 Azure 机器学习教程,请不要完成本部分。Don't complete this section if you plan to run other Azure Machine Learning tutorials.

停止计算实例Stop the compute instance

如果使用了计算实例或笔记本 VM,请停止未使用的 VM,以降低成本。If you used a compute instance or Notebook VM, stop the VM when you are not using it to reduce cost.

  1. 在工作区中选择“计算”。 In your workspace, select Compute.

  2. 从列表中选择 VM。From the list, select the VM.

  3. 选择“停止” 。Select Stop.

  4. 准备好再次使用服务器时,选择“启动” 。When you're ready to use the server again, select Start.

删除所有内容Delete everything

如果不打算使用已创建的资源,请删除它们,以免产生任何费用:If you don't plan to use the resources you created, delete them, so you don't incur any charges:

  1. 在 Azure 门户的左侧菜单中选择“资源组”。In the Azure portal, in the left menu, select Resource groups.
  2. 在资源组列表中,选择创建的资源组。In the list of resource groups, select the resource group you created.
  3. 选择“删除资源组”。Select Delete resource group.
  4. 输入资源组名称。Enter the resource group name. 然后选择“删除”。Then, select Delete.

还可保留资源组,但请删除单个工作区。You can also keep the resource group but delete a single workspace. 显示工作区属性,然后选择“删除”。Display the workspace properties, and then select Delete.

后续步骤Next steps

在本机器学习管道教程中,你已完成以下任务:In this machine learning pipelines tutorial, you did the following tasks:

  • 使用环境依赖项生成了一个要在远程 GPU 计算资源上运行的管道。Built a pipeline with environment dependencies to run on a remote GPU compute resource.
  • 使用预先训练的 Tensorflow 模型创建了一个用于运行批量预测的评分脚本。Created a scoring script to run batch predictions by using a pretrained Tensorflow model.
  • 发布了管道,并使其从 REST 终结点运行。Published a pipeline and enabled it to be run from a REST endpoint.

有关演示如何使用机器学习 SDK 生成管道的更多示例,请参阅笔记本存储库For more examples of how to build pipelines by using the machine learning SDK, see the notebook repository.