教程:生成用于批量评分的 Azure 机器学习管道

在本高级教程中,你将了解如何构建 Azure 机器学习管道来运行批量评分作业。 机器学习管道可以优化工作流以提高其速度、可移植性和可重用性,使你能够将工作重心放在机器学习上,而不必关注基础结构和自动化。 生成并发布管道后,你将配置一个 REST 终结点,用于从任何平台上的任何 HTTP 库触发该管道。

本示例使用 Tensorflow 中实现的预先训练的 Inception-V3 卷积神经网络模型来对不带标签的图像进行分类。

在本教程中,请完成以下任务:

  • 配置工作区
  • 下载并存储示例数据
  • 创建用于提取和输出数据的数据集对象
  • 下载、准备模型并将其注册到工作区中
  • 预配计算目标并创建评分脚本
  • 使用 ParallelRunStep 类进行异步批处理评分
  • 生成、运行并发布管道
  • 为管道启用 REST 终结点

如果没有 Azure 订阅,请在开始前创建一个试用帐户。 立即试用试用帐户功能。

先决条件

  • 如果还没有 Azure 机器学习工作区或计算实例,请完成快速入门:Azure 机器学习入门
  • 完成快速入门后:
    1. 在工作室中选择“笔记本”。
    2. 选择“示例”选项卡。
    3. 打开 tutorials/machine-learning-pipelines-advanced/tutorial-pipeline-batch-scoring-classification.ipynb 笔记本。

如果要在自己的本地环境中运行设置教程,可以访问 GitHub 上的教程。 运行 pip install azureml-sdk[notebooks] azureml-pipeline-core azureml-pipeline-steps pandas requests 以获取所需的包。

配置工作区并创建数据存储

从现有的 Azure 机器学习工作区创建工作区对象。

from azureml.core import Workspace
ws = Workspace.from_config()

重要

此代码片段需要将工作区配置保存到当前目录或其父目录中。 有关创建工作区的详细信息,请参阅创建和管理 Azure 机器学习工作区。 有关将配置保存到文件的详细信息,请参阅创建工作区配置文件

为示例图像创建数据存储

pipelinedata 帐户中,从 sampledata 公共 Blob 容器获取 ImageNet 评估公共数据示例。 调用 register_azure_blob_container() 可使数据可用于名为 images_datastore 的工作区。 然后,将工作区的默认数据存储设置为输出数据存储。 使用输出数据存储在管道中为输出评分。

有关访问数据的详细信息,请参阅如何访问数据

from azureml.core.datastore import Datastore

batchscore_blob = Datastore.register_azure_blob_container(ws, 
                      datastore_name="images_datastore", 
                      container_name="sampledata", 
                      account_name="pipelinedata", 
                      overwrite=True)

def_data_store = ws.get_default_datastore()

创建数据集对象

生成管道时,将使用 Dataset 对象从工作区数据存储读取数据,并使用 OutputFileDatasetConfig 对象在管道步骤之间传输中间数据。

重要

本教程中的批量评分示例只使用一个管道步骤。 在包含多个步骤的用例中,典型流包括以下步骤:

  1. 使用 Dataset 对象作为提取原始数据的输入,执行某种转换,然后输出 OutputFileDatasetConfig 对象 。

  2. 使用上一步骤中的 OutputFileDatasetConfig 输出对象作为输入对象。 针对后续步骤重复此过程。

在此场景中,你将创建与输入图像和分类标签(y-test 值)的数据存储目录相对应的 Dataset 对象。 还将为批量评分输出数据创建一个 OutputFileDatasetConfig 对象。

from azureml.core.dataset import Dataset
from azureml.data import OutputFileDatasetConfig

input_images = Dataset.File.from_files((batchscore_blob, "batchscoring/images/"))
label_ds = Dataset.File.from_files((batchscore_blob, "batchscoring/labels/"))
output_dir = OutputFileDatasetConfig(name="scores")

如果以后要重用数据集,请将其注册到工作区。 此步骤是可选的。


input_images = input_images.register(workspace = ws, name = "input_images")
label_ds = label_ds.register(workspace = ws, name = "label_ds")

下载并注册模型

下载预先训练的 Tensorflow 模型用于管道中的批量评分。 首先创建一个用于存储模型的本地目录。 然后下载并提取该模型。

import os
import tarfile
import urllib.request

if not os.path.isdir("models"):
    os.mkdir("models")
    
response = urllib.request.urlretrieve("http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz", "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")
tar.extractall("models")

接下来,将模型注册到工作区,以便能够在管道流程中轻松检索该模型。 在 register() 静态函数中,model_name 参数是用于在整个 SDK 中查找模型的键。

from azureml.core.model import Model
 
model = Model.register(model_path="models/inception_v3.ckpt",
                       model_name="inception",
                       tags={"pretrained": "inception"},
                       description="Imagenet trained tensorflow inception",
                       workspace=ws)

创建并附加远程计算目标

机器学习管道无法在本地运行,因此你需要在云资源或远程计算目标上运行这些管道。 远程计算目标是可重用的虚拟计算环境,可在其中运行试验和机器学习工作流。

运行以下代码创建支持 GPU 的 AmlCompute 目标,并将其附加到工作区。 有关计算目标的详细信息,请参阅概念文章

from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.exceptions import ComputeTargetException
compute_name = "gpu-cluster"

# checks to see if compute target already exists in workspace, else create it
try:
    compute_target = ComputeTarget(workspace=ws, name=compute_name)
except ComputeTargetException:
    config = AmlCompute.provisioning_configuration(vm_size="STANDARD_NC6",
                                                   vm_priority="lowpriority", 
                                                   min_nodes=0, 
                                                   max_nodes=1)

    compute_target = ComputeTarget.create(workspace=ws, name=compute_name, provisioning_configuration=config)
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

编写评分脚本

若要执行评分,请创建名为 batch_scoring.py 的批量评分脚本,并将其写入当前目录。 该脚本将提取输入图像,应用分类模型,然后将预测结果输出到结果文件中。

batch_scoring.py 脚本采用以下参数,这些参数将从稍后创建的 ParallelRunStep 传递:

  • --model_name:所用模型的名称。
  • --labels_dirlabels.txt 文件的位置。

管道基础结构使用 ArgumentParser 类将参数传入管道步骤。 例如,在以下代码中,为第一个参数 --model_name 指定了属性标识符 model_name。 在 init() 函数中,使用 Model.get_model_path(args.model_name) 访问此属性。

%%writefile batch_scoring.py

import os
import argparse
import datetime
import time
import tensorflow as tf
from math import ceil
import numpy as np
import shutil
from tensorflow.contrib.slim.python.slim.nets import inception_v3

from azureml.core import Run
from azureml.core.model import Model
from azureml.core.dataset import Dataset

slim = tf.contrib.slim

image_size = 299
num_channel = 3


def get_class_label_dict(labels_dir):
    label = []
    labels_path = os.path.join(labels_dir, 'labels.txt')
    proto_as_ascii_lines = tf.gfile.GFile(labels_path).readlines()
    for l in proto_as_ascii_lines:
        label.append(l.rstrip())
    return label


def init():
    global g_tf_sess, probabilities, label_dict, input_images

    parser = argparse.ArgumentParser(description="Start a tensorflow model serving")
    parser.add_argument('--model_name', dest="model_name", required=True)
    parser.add_argument('--labels_dir', dest="labels_dir", required=True)
    args, _ = parser.parse_known_args()

    label_dict = get_class_label_dict(args.labels_dir)
    classes_num = len(label_dict)

    with slim.arg_scope(inception_v3.inception_v3_arg_scope()):
        input_images = tf.placeholder(tf.float32, [1, image_size, image_size, num_channel])
        logits, _ = inception_v3.inception_v3(input_images,
                                              num_classes=classes_num,
                                              is_training=False)
        probabilities = tf.argmax(logits, 1)

    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True
    g_tf_sess = tf.Session(config=config)
    g_tf_sess.run(tf.global_variables_initializer())
    g_tf_sess.run(tf.local_variables_initializer())

    model_path = Model.get_model_path(args.model_name)
    saver = tf.train.Saver()
    saver.restore(g_tf_sess, model_path)


def file_to_tensor(file_path):
    image_string = tf.read_file(file_path)
    image = tf.image.decode_image(image_string, channels=3)

    image.set_shape([None, None, None])
    image = tf.image.resize_images(image, [image_size, image_size])
    image = tf.divide(tf.subtract(image, [0]), [255])
    image.set_shape([image_size, image_size, num_channel])
    return image


def run(mini_batch):
    result_list = []
    for file_path in mini_batch:
        test_image = file_to_tensor(file_path)
        out = g_tf_sess.run(test_image)
        result = g_tf_sess.run(probabilities, feed_dict={input_images: [out]})
        result_list.append(os.path.basename(file_path) + ": " + label_dict[result[0]])
    return result_list

提示

本教程中的管道只有一个步骤,它会将输出写入某个文件。 对于多步骤管道,你也可以使用 ArgumentParser 来定义要将输出数据写入到的目录,以便将其输入到后续步骤。 有关使用 ArgumentParser 设计模式在多个管道步骤之间传递数据的示例,请参阅笔记本

构建管道

在运行管道之前,请创建一个用于定义 Python 环境的对象,并创建 batch_scoring.py 脚本所需的依赖项。 所需的主要依赖项是 Tensorflow,但你还需安装 ParallelRunStep 所需的 azureml-coreazureml-dataprep[fuse]。 另外,指定 Docker 和 Docker-GPU 支持。

from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_GPU_IMAGE

cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.15.2",
                                            "azureml-core", "azureml-dataprep[fuse]"])
env = Environment(name="parallelenv")
env.python.conda_dependencies = cd
env.docker.base_image = DEFAULT_GPU_IMAGE

创建用于包装脚本的配置

使用脚本、环境配置和参数创建管道步骤。 指定已附加到工作区的计算目标。

from azureml.pipeline.steps import ParallelRunConfig

parallel_run_config = ParallelRunConfig(
    environment=env,
    entry_script="batch_scoring.py",
    source_directory=".",
    output_action="append_row",
    mini_batch_size="20",
    error_threshold=1,
    compute_target=compute_target,
    process_count_per_node=2,
    node_count=1
)

创建管道步骤

管道步骤是一个对象,用于封装运行管道所需的任何内容,其中包括:

  • 环境和依赖项设置
  • 要在其上运行管道的计算资源
  • 输入和输出数据,以及任何自定义参数
  • 对执行步骤期间要运行的脚本或 SDK 逻辑的引用

有多个类继承自父类 PipelineStep。 你可以选择适当的类,以使用特定的框架和堆栈生成步骤。 在此示例中,将通过自定义 Python 脚本使用 ParallelRunStep 类定义步骤逻辑。 如果脚本的某个自变量是步骤的输入或步骤的输出,则必须分别在 arguments 数组以及 inputoutput 参数中定义该自变量。

如果存在多个步骤,outputs 数组中的某个对象引用可用作后续管道步骤的输入。

from azureml.pipeline.steps import ParallelRunStep
from datetime import datetime

parallel_step_name = "batchscoring-" + datetime.now().strftime("%Y%m%d%H%M")

label_config = label_ds.as_named_input("labels_input")

batch_score_step = ParallelRunStep(
    name=parallel_step_name,
    inputs=[input_images.as_named_input("input_images")],
    output=output_dir,
    arguments=["--model_name", "inception",
               "--labels_dir", label_config],
    side_inputs=[label_config],
    parallel_run_config=parallel_run_config,
    allow_reuse=False
)

有关可对不同步骤类型使用的所有类的列表,请参阅步骤包

提交管道

现在请运行管道。 首先,使用工作区引用和创建的管道步骤创建一个 Pipeline 对象。 steps 参数是步骤数组。 在本例中,批量评分只有一个步骤。 若要生成包含多个步骤的管道,请将步骤按顺序放入此数组。

接下来,使用 Experiment.submit() 函数提交管道以供执行。 wait_for_completion 函数将在管道生成过程中输出日志。 可以使用日志来查看当前进度。

重要

首次管道运行需要大约 15 分钟。 必须下载所有依赖项、创建 Docker 映像,并预配和创建 Python 环境。 再次运行管道所花费的时间会大幅减少,因为会重复使用这些资源,而无需再次创建。 但是,管道的总运行时间取决于脚本的工作负荷,以及每个管道步骤中运行的进程数。

from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'Tutorial-Batch-Scoring').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

下载并查看输出

运行以下代码下载通过 batch_scoring.py 脚本创建的输出文件。 然后浏览评分结果。

import pandas as pd

batch_run = next(pipeline_run.get_children())
batch_output = batch_run.get_output_data("scores")
batch_output.download(local_path="inception_results")

for root, dirs, files in os.walk("inception_results"):
    for file in files:
        if file.endswith("parallel_run_step.txt"):
            result_file = os.path.join(root, file)

df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
print("Prediction has ", df.shape[0], " rows")
df.head(10)

从 REST 终结点发布和运行

运行以下代码,将管道发布到工作区。 在 Azure 机器学习工作室的工作区中,可以看到管道的元数据,包括运行历史记录和持续时间。 还可以从工作室手动运行管道。

发布管道会启用一个 REST 终结点,用于从任何平台上的任何 HTTP 库重新运行该管道。

published_pipeline = pipeline_run.publish_pipeline(
    name="Inception_v3_scoring", description="Batch scoring using Inception v3 model", version="1.0")

published_pipeline

若要从 REST 终结点运行管道,需要获取 OAuth2 Bearer-type 身份验证标头。 以下示例使用交互式身份验证(用于演示目的),但对于大多数需要自动身份验证或无头身份验证的生产方案,请使用服务主体身份验证,如此文中所述

服务主体身份验证涉及到在 Azure Active Directory 中创建应用注册。 首先生成客户端机密,然后为服务主体授予对机器学习工作区的角色访问权限。 使用 ServicePrincipalAuthentication 类来管理身份验证流。

InteractiveLoginAuthenticationServicePrincipalAuthentication 均继承自 AbstractAuthentication。 在这两种情况下,请以相同的方式使用 get_authentication_header() 函数来提取标头:

from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()

从已发布的管道对象的 endpoint 属性获取 REST URL。 也可以在 Azure 机器学习工作室的工作区中找到该 REST URL。

对终结点生成 HTTP POST 请求。 在请求中指定身份验证标头。 添加包含试验名称的 JSON 有效负载对象。

发出触发运行的请求。 包含相应的代码用于访问响应字典中的 Id 密钥以获取运行 ID 的值。

import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "Tutorial-Batch-Scoring",
                               "ParameterAssignments": {"process_count_per_node": 6}})
run_id = response.json()["Id"]

使用运行 ID 监视新运行的状态。 新的运行需要花费 10-15 分钟来完成。

新的运行类似于在本教程中前面运行的管道。 可以选择不查看完整输出。

from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments["Tutorial-Batch-Scoring"], run_id)
RunDetails(published_pipeline_run).show()

清理资源

如果你打算运行其他 Azure 机器学习教程,请不要完成本部分。

停止计算实例

如果使用了计算实例,请在不使用 VM 时将其停止,以降低成本。

  1. 在工作区中选择“计算”。

  2. 从列表中选择计算实例的名称。

  3. 选择“停止” 。

  4. 准备好再次使用服务器时,选择“启动” 。

删除所有内容

如果不打算使用已创建的资源,请删除它们,以免产生任何费用:

  1. 在 Azure 门户的左侧菜单中选择“资源组”。
  2. 在资源组列表中,选择创建的资源组。
  3. 选择“删除资源组”。
  4. 输入资源组名称。 然后选择“删除”。

还可保留资源组,但请删除单个工作区。 显示工作区属性,然后选择“删除”。

后续步骤

在本机器学习管道教程中,你已完成以下任务:

  • 使用环境依赖项生成了一个要在远程 GPU 计算资源上运行的管道。
  • 使用预先训练的 Tensorflow 模型创建了一个用于运行批量预测的评分脚本。
  • 发布了管道,并使其从 REST 终结点运行。

有关演示如何使用机器学习 SDK 生成管道的更多示例,请参阅笔记本存储库