使用 Azure 机器学习 SDK v2 和组件创建和运行机器学习管道

适用范围:Python SDK azure-ai-ml v2(最新版)

本文介绍如何使用 Python SDK v2 生成 Azure 机器学习管道,以完成包含三个步骤的图像分类任务:准备数据、训练图像分类模型和为模型评分。 机器学习管道可以优化工作流以提高其速度、可移植性和可重用性,使你能够将工作重心放在机器学习上,而不必关注基础结构和自动化。

该示例将训练一个小型 Keras 卷积神经网络,以对 Fashion MNIST 数据集中的图像进行分类。 管道如下所示。

显示图像分类 Keras 示例管道图的屏幕截图。

在本文中,你将完成以下任务:

  • 为管道作业准备输入数据
  • 创建三个组件来准备数据、训练和评分
  • 从组件构建管道
  • 使用计算获取工作区访问权限
  • 提交管道作业
  • 查看组件的输出和已训练的神经网络
  • (可选)注册组件供将来重用以及在工作区中共享

如果没有 Azure 订阅,请在开始前创建一个试用版订阅。 立即尝试试用版订阅

先决条件

  • Azure 机器学习工作区 - 如果没有工作区,请完成“创建资源”教程

  • 装有 Azure 机器学习 Python SDK v2 的 Python 环境 - 安装说明 - 查看入门部分。 此环境用于定义和控制 Azure 机器学习资源,独立于运行时用于训练的环境。

  • 克隆示例存储库

    若要运行训练示例,请首先克隆示例存储库,然后更改为 sdk 目录:

    git clone --depth 1 https://github.com/Azure/azureml-examples
    cd azureml-examples/sdk
    

启动交互式 Python 会话

本文使用适用于 Azure 机器学习的 Python SDK 创建和控制 Azure 机器学习管道。 本文假设你将在 Python REPL 环境或 Jupyter 笔记本中以交互方式运行代码片段。

本文基于 Azure 机器学习示例存储库的 sdk/python/jobs/pipelines/2e_image_classification_keras_minist_convnet 目录中的 image_classification_keras_minist_convnet.ipynb 笔记本。

导入所需的库

导入 Azure 机器学习所需的所有库以便在本文中使用:

# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential

from azure.ai.ml import MLClient
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component

为管道作业准备输入数据

需要为此图像分类管道准备输入数据。

Fashion-MNIST 是一个时尚图像数据集,包含 10 个类别。 每张图像都是 28x28 的灰度图像,有 60,000 张训练图像和 10,000 张测试图像。 作为图像分类问题,Fashion-MNIST 比经典 MNIST 手写数字数据库更难。 它以与原始手写数字数据库相同的压缩二进制形式分发。

导入你需要的 Azure 机器学习所需的所有库。

通过定义 Input,可以创建对数据源位置的引用。 数据会保留在其现有位置,因此不会产生额外的存储成本。

创建用于生成管道的组件

图像分类任务可划分为三个步骤:准备数据、训练模型和为模型评分。

Azure 机器学习组件是一段独立的代码,执行机器学习管道中的一个步骤。 在本文中,你将为图像分类任务创建三个组件:

  • 准备并测试用于训练的数据
  • 使用训练数据来训练用于图像分类的神经网络
  • 使用测试数据为模型评分

对于每个组件,需要准备以下内容:

  1. 准备包含执行逻辑的 Python 脚本

  2. 定义组件的接口

  3. 添加组件的其他元数据,包括运行时环境、用于运行组件的命令,等等。

下一部分将介绍如何以两种不同的方式创建组件:前两个组件使用 Python 函数,第三个组件使用 YAML 定义。

创建数据准备组件

此管道中的第一个组件将 fashion_ds 的压缩数据文件转换为两个 csv 文件,其中一个文件用于训练,另一个用于评分。 你将使用 Python 函数来定义此组件。

如果按照 Azure 机器学习示例存储库中的示例进行操作,则源文件已在 prep/ 文件夹中提供。 此文件夹包含两个用于构造组件的文件:prep_component.py 定义组件,conda.yaml 定义组件的运行时环境。

使用 Python 函数定义组件

使用 command_component() 函数作为修饰器,可以轻松定义组件的接口、元数据,以及要从 Python 函数执行的代码。 修饰的每个 Python 函数将转换为管道服务可以处理的单个静态规范 (YAML)。

# Converts MNIST-formatted files at the passed-in input path to training data output path and test data output path
import os
from pathlib import Path
from mldesigner import command_component, Input, Output


@command_component(
    name="prep_data",
    version="1",
    display_name="Prep Data",
    description="Convert data to CSV file, and split to training and test data",
    environment=dict(
        conda_file=Path(__file__).parent / "conda.yaml",
        image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
    ),
)
def prepare_data_component(
    input_data: Input(type="uri_folder"),
    training_data: Output(type="uri_folder"),
    test_data: Output(type="uri_folder"),
):
    convert(
        os.path.join(input_data, "train-images-idx3-ubyte"),
        os.path.join(input_data, "train-labels-idx1-ubyte"),
        os.path.join(training_data, "mnist_train.csv"),
        60000,
    )
    convert(
        os.path.join(input_data, "t10k-images-idx3-ubyte"),
        os.path.join(input_data, "t10k-labels-idx1-ubyte"),
        os.path.join(test_data, "mnist_test.csv"),
        10000,
    )


def convert(imgf, labelf, outf, n):
    f = open(imgf, "rb")
    l = open(labelf, "rb")
    o = open(outf, "w")

    f.read(16)
    l.read(8)
    images = []

    for i in range(n):
        image = [ord(l.read(1))]
        for j in range(28 * 28):
            image.append(ord(f.read(1)))
        images.append(image)

    for image in images:
        o.write(",".join(str(pix) for pix in image) + "\n")
    f.close()
    o.close()
    l.close()

以上代码使用 @command_component 修饰器定义一个显示名称为 Prep Data 的组件:

  • name 是组件的唯一标识符。

  • version 是组件的当前版本。 一个组件可以有多个版本。

  • display_name 是组件在 UI 中的友好显示名称,它不是唯一的。

  • description 通常描述此组件可以完成的任务。

  • environment 指定此组件的运行时环境。 此组件的环境指定一个 docker 映像并引用 conda.yaml 文件。

    conda.yaml 文件包含用于组件的所有包,如下所示:

name: imagekeras_prep_conda_env
channels:
  - defaults
dependencies:
  - python=3.7.11
  - pip=20.0
  - pip:
    - mldesigner==0.1.0b12
  • prepare_data_component 函数定义 input_data 的一个输入,并定义 training_datatest_data 的两个输出。 input_data 是输入数据路径。 training_datatest_data 是训练数据和测试数据的输出数据路径。
  • 此组件将 input_data 中的训练数据 csv 转换为 training_data,将测试数据 csv 转换为 test_data

下面组件在工作室 UI 中的外观。

  • 组件是管道图形中的一个块。
  • input_datatraining_datatest_data 是组件的端口,它们连接到其他组件以流式传输数据。

UI 和代码中“准备数据”组件的屏幕截图。

现在,已经为 Prep Data 组件准备了所有源文件。

创建 train-model 组件

在本部分,你将创建一个组件用于在 Python 函数中训练图像分类模型,这类似于 Prep Data 组件。

不同之处在于,由于训练逻辑更复杂,你可以将原始训练代码放在单独的 Python 文件中。

此组件的源文件位于 Azure 机器学习示例存储库中的 train/ 文件夹下。 此文件夹包含三个用于构造组件的文件:

  • train.py:包含用于训练模型的实际逻辑。
  • train_component.py:定义组件的接口,并在 train.py 中导入函数。
  • conda.yaml:定义组件的运行时环境。

获取包含执行逻辑的脚本

train.py 文件包含一个普通的 Python 函数,该函数执行训练模型逻辑,以训练用于图像分类的 Keras 神经网络。 若要查看代码,请参阅 GitHub 上的 train.py 文件

使用 Python 函数定义组件

成功定义训练函数后,可以使用 Azure 机器学习 SDK v2 中的 @command_component 将函数包装为可在 Azure 机器学习管道中使用的组件。

import os
from pathlib import Path
from mldesigner import command_component, Input, Output


@command_component(
    name="train_image_classification_keras",
    version="1",
    display_name="Train Image Classification Keras",
    description="train image classification with keras",
    environment=dict(
        conda_file=Path(__file__).parent / "conda.yaml",
        image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
    ),
)
def keras_train_component(
    input_data: Input(type="uri_folder"),
    output_model: Output(type="uri_folder"),
    epochs=10,
):
    # avoid dependency issue, execution logic is in train() func in train.py file
    from train import train

    train(input_data, output_model, epochs)

以上代码使用 @command_component 定义一个显示名称为 Train Image Classification Keras 的组件:

  • keras_train_component 函数定义一个输入 input_data(训练数据的来源于其中),一个输入 epochs(指定训练期间的循环),以及一个输出 output_model(在其中输出模型文件)。 epochs 的默认值为 10。 此组件的执行逻辑来自上述 train.py 中的 train() 函数。

train-model 组件的配置比 prep-data 组件稍微复杂一些。 conda.yaml 如下所示:

name: imagekeras_train_conda_env
channels:
  - defaults
dependencies:
  - python=3.7.11
  - pip=20.2
  - pip:
    - mldesigner==0.1.0b12
    - azureml-mlflow==1.50.0
    - tensorflow==2.7.0
    - numpy==1.21.4
    - scikit-learn==1.0.1
    - pandas==1.3.4
    - matplotlib==3.2.2
    - protobuf==3.20.0

现在,已经为 Train Image Classification Keras 组件准备了所有源文件。

创建 score-model 组件

创建上述组件后,你将按照本部分所述创建一个组件来通过 Yaml 规范和脚本为已训练的模型评分。

如果按照 Azure 机器学习示例存储库中的示例进行操作,则源文件已在 score/ 文件夹中提供。 此文件夹包含三个用于构造组件的文件:

  • score.py:包含组件的源代码。
  • score.yaml:定义组件的接口和其他详细信息。
  • conda.yaml:定义组件的运行时环境。

获取包含执行逻辑的脚本

score.py 文件包含一个执行训练模型逻辑的普通 Python 函数。

from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.utils import to_categorical
from keras.callbacks import Callback
from keras.models import load_model

import argparse
from pathlib import Path
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import mlflow


def get_file(f):

    f = Path(f)
    if f.is_file():
        return f
    else:
        files = list(f.iterdir())
        if len(files) == 1:
            return files[0]
        else:
            raise Exception("********This path contains more than one file*******")


def parse_args():
    # setup argparse
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument(
        "--input_data", type=str, help="path containing data for scoring"
    )
    parser.add_argument(
        "--input_model", type=str, default="./", help="input path for model"
    )

    parser.add_argument(
        "--output_result", type=str, default="./", help="output path for model"
    )

    # parse args
    args = parser.parse_args()

    # return args
    return args


def score(input_data, input_model, output_result):

    test_file = get_file(input_data)
    data_test = pd.read_csv(test_file, header=None)

    img_rows, img_cols = 28, 28
    input_shape = (img_rows, img_cols, 1)

    # Read test data
    X_test = np.array(data_test.iloc[:, 1:])
    y_test = to_categorical(np.array(data_test.iloc[:, 0]))
    X_test = (
        X_test.reshape(X_test.shape[0], img_rows, img_cols, 1).astype("float32") / 255
    )

    # Load model
    files = [f for f in os.listdir(input_model) if f.endswith(".h5")]
    model = load_model(input_model + "/" + files[0])

    # Log metrics of the model
    eval = model.evaluate(X_test, y_test, verbose=0)

    mlflow.log_metric("Final test loss", eval[0])
    print("Test loss:", eval[0])

    mlflow.log_metric("Final test accuracy", eval[1])
    print("Test accuracy:", eval[1])

    # Score model using test data
    y_predict = model.predict(X_test)
    y_result = np.argmax(y_predict, axis=1)

    # Output result
    np.savetxt(output_result + "/predict_result.csv", y_result, delimiter=",")


def main(args):
    score(args.input_data, args.input_model, args.output_result)


# run script
if __name__ == "__main__":
    # parse args
    args = parse_args()

    # call main function
    main(args)

score.py 中的代码采用三个命令行参数:input_datainput_modeloutput_result。 程序将使用输入数据为输入模型评分,然后输出评分结果。

通过 Yaml 定义组件

本部分介绍如何以有效的 YAML 组件规范格式创建组件规范。 此文件指定以下信息:

  • 元数据:名称、display_name、版本、类型等。
  • 接口:输入和输出
  • 命令、代码和环境:用于运行组件的命令、代码和环境
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
type: command

name: score_image_classification_keras
display_name: Score Image Classification Keras
inputs:
  input_data: 
    type: uri_folder
  input_model:
    type: uri_folder
outputs:
  output_result:
    type: uri_folder
code: ./
command: python score.py --input_data ${{inputs.input_data}} --input_model ${{inputs.input_model}} --output_result ${{outputs.output_result}}
environment:
  conda_file: ./conda.yaml
  image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04

  • name 是组件的唯一标识符。 其显示名称为 Score Image Classification Keras
  • 此组件有两个输入和一个输出。
  • 其源代码路径在 code 节中定义,在云中运行该组件时,该路径中的所有文件将作为该组件的快照上传。
  • command 节指定在运行此组件时要执行的命令。
  • environment 节包含一个 docker 映像和一个 conda yaml 文件。 源文件位于示例存储库中。

现已获取 score-model 组件的所有源文件。

加载组件以生成管道

对于 Python 函数定义的 prep-data 组件和 train-model 组件,可以像导入普通的 Python 函数那样导入组件。

以下代码分别从 prep 文件夹下的 prep_component.py 文件和 train 文件夹下的 train_component 文件导入 prepare_data_component()keras_train_component() 函数。

%load_ext autoreload
%autoreload 2

# load component function from component python file
from prep.prep_component import prepare_data_component
from train.train_component import keras_train_component

# print hint of components
help(prepare_data_component)
help(keras_train_component)

对于 yaml 定义的评分组件,可以使用 load_component() 函数来加载。

# load component function from yaml
keras_score_component = load_component(path="./score/score.yaml")

生成管道

现在你已创建并加载了所有用于生成管道的组件和输入数据。 可将它们组合成管道:

注意

若要使用无服务器计算,请将 from azure.ai.ml.entities import ResourceConfiguration 添加到顶部。 然后替换:

  • default_compute="serverless",default_compute=cpu_compute_target,
  • train_node.resources = "ResourceConfiguration(instance_type="Standard_NC6s_v3",instance_count=2)train_node.compute = gpu_compute_target
# define a pipeline containing 3 nodes: Prepare data node, train node, and score node
@pipeline(
    default_compute=cpu_compute_target,
)
def image_classification_keras_minist_convnet(pipeline_input_data):
    """E2E image classification pipeline with keras using python sdk."""
    prepare_data_node = prepare_data_component(input_data=pipeline_input_data)

    train_node = keras_train_component(
        input_data=prepare_data_node.outputs.training_data
    )
    train_node.compute = gpu_compute_target

    score_node = keras_score_component(
        input_data=prepare_data_node.outputs.test_data,
        input_model=train_node.outputs.output_model,
    )


# create a pipeline
pipeline_job = image_classification_keras_minist_convnet(pipeline_input_data=fashion_ds)

该管道具有默认计算 cpu_compute_target,这意味着,如果不为特定节点指定计算,则该节点将在默认计算上运行。

该管道具有管道级输入 pipeline_input_data。 可以在提交管道作业时为管道输入赋值。

该管道包含三个节点:prepare_data_node、train_node 和 score_node。

  • prepare_data_nodeinput_data 使用 pipeline_input_data 的值。

  • train_nodeinput_data 来自 prepare_data_node 的 training_data 输出。

  • score_node 的 input_data 来自 prepare_data_node 的 test_data 输出,input_model 来自 train_node 的 output_model

  • 由于 train_node 将训练 CNN 模型,因此你可以将其计算指定为 gpu_compute_target,从而提高训练性能。

提交管道作业

构造管道后,接下来可以提交到工作区。 若要提交作业,首先需要连接到工作区。

获取工作区访问权限

配置凭据

我们将使用 DefaultAzureCredential 来获取工作区访问权限。 DefaultAzureCredential 应该能够处理大多数 Azure SDK 身份验证方案。

如果此方法不起作用,请参考更多可用凭据:配置凭据示例azure-identity 参考文档

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.chinacloudapi.cn/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

使用计算获取工作区句柄

创建一个 MLClient 对象来管理 Azure 机器学习服务。 如果使用无服务器计算,则无需创建这些计算。

# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

# Retrieve an already attached Azure Machine Learning Compute.
cpu_compute_target = "cpu-cluster"
print(ml_client.compute.get(cpu_compute_target))
gpu_compute_target = "gpu-cluster"
print(ml_client.compute.get(gpu_compute_target))

重要

此代码片段需要将工作区配置 json 文件保存到当前目录或其父目录中。 若要详细了解如何创建工作区,请参阅创建工作区资源。 有关将配置保存到文件的详细信息,请参阅创建工作区配置文件

将管道作业提交到工作区

获取工作区句柄后,接下来可以提交管道作业。

pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_samples"
)
pipeline_job

以上代码将此图像分类管道作业提交到名为 pipeline_samples 的试验。 如果该试验不存在,它会自动创建该试验。 pipeline_input_data 使用 fashion_ds

pipeline_job 的调用会生成如下所示的输出:

submitExperiment 的调用很快完成,并生成类似于以下内容的输出:

试验 名称 类型 状态 详细信息页
pipeline_samples sharp_pipe_4gvqx6h1fb 管道 备餐中 Azure 机器学习工作室链接。

可以通过打开链接来监视管道运行,也可以通过运行以下代码来阻止管道运行,直到管道运行完成:

# wait until the job completes
ml_client.jobs.stream(pipeline_job.name)

重要

首次管道运行需要大约 15 分钟。 必须下载所有依赖项、创建 Docker 映像,并预配和创建 Python 环境。 再次运行管道所花费的时间会大幅减少,因为会重复使用这些资源,而无需再次创建。 但是,管道的总运行时间取决于脚本的工作负荷,以及每个管道步骤中运行的进程数。

在 UI 中签出输出并调试管道

可以打开 Link to Azure Machine Learning studio,即管道的作业详细信息页。 你将看到如下所示的管道图形。

管道作业详细信息页的屏幕截图。

可以通过右键单击每个组件来检查该组件的日志和输出,或者选择组件以打开其详细信息窗格。 若要详细了解如何在 UI 中调试管道,请参阅如何使用调试管道失败

(可选)将组件注册到工作区

在上一部分,你已使用三个组件生成了一个管道,以便端到端完成图像分类任务。 还可将组件注册到工作区,以便可以在工作区中共享和重用它们。 下面是注册 prep-data 组件的示例。

try:
    # try get back the component
    prep = ml_client.components.get(name="prep_data", version="1")
except:
    # if not exists, register component using following code
    prep = ml_client.components.create_or_update(prepare_data_component)

# list all components registered in workspace
for c in ml_client.components.list():
    print(c)

使用 ml_client.components.get(),可以按名称和版本获取已注册的组件。 使用 ml_client.components.create_or_update(),可以注册先前从 Python 函数或 yaml 加载的组件。

后续步骤