在 Azure 机器学习 SDK v2 中使用组件创建和运行机器学习管道

适用范围:Python SDK azure-ai-ml v2(最新版)

本文介绍如何使用 Azure 机器学习 Python SDK v2 生成 Azure 机器学习管道 ,以完成图像分类任务。 此管道包含三个步骤:准备数据、训练图像分类模型和对模型评分。 机器学习管道使用速度、可移植性和重用性优化工作流,以便你可以专注于机器学习,而不是基础结构和自动化。

示例管道训练小型 Keras 卷积神经网络,以对 Fashion MNIST 数据集中的图像进行分类。 管道如下所示:

显示图像分类示例的管道图的屏幕截图。

在本文中,你将完成以下任务:

  • 为管道作业准备输入数据
  • 创建三个组件来准备数据、训练模型和对模型评分
  • 从组件生成管道
  • 获取对具有计算的工作区的访问权限
  • 提交管道作业
  • 检查组件和训练神经网络的输出
  • (可选)注册组件,以便在工作区中进一步重复使用和共享

如果没有 Azure 订阅,请在开始前创建一个试用版订阅。 立即尝试试用版订阅

先决条件

  • 一个 Azure 机器学习工作区。 如果没有资源,请完成 “创建资源”教程
  • 安装了 Azure 机器学习 Python SDK v2 的 Python 环境。 有关安装说明,请参阅 入门。 此环境用于定义和控制 Azure 机器学习资源,独立于运行时用于训练的环境。
  • 示例存储库的克隆。

若要运行训练示例,请先克隆示例存储库并导航到 sdk 目录:

git clone --depth 1 https://github.com/Azure/azureml-examples
cd azureml-examples/sdk

启动交互式 Python 会话

本文使用 Azure 机器学习 Python SDK 创建和控制 Azure 机器学习管道。 本文假设你在 Python REPL 环境或 Jupyter 笔记本中以交互方式运行代码片段。

本文基于 Azure 机器学习示例存储库目录中sdk/python/jobs/pipelines/2e_image_classification_keras_minist_convnet 笔记本

导入所需的库

导入本文所需的所有 Azure 机器学习库:

# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential

from azure.ai.ml import MLClient
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component

为管道作业准备输入数据

需要为图像分类管道准备输入数据。

时尚 MNIST 是一个分为 10 个类的时尚图像数据集。 每个图像都是 28 x 28 灰度图像。 有 60,000 个训练图像和 10,000 个测试图像。

import urllib3
import shutil
import gzip
import os
from pathlib import Path
from azure.ai.ml import Input

base_url = "https://azureopendatastorage.blob.core.windows.net/mnist/"
base_dir = Path("mnist")
if not base_dir.exists():
    base_dir.mkdir(parents=True)

c = urllib3.PoolManager()
for target_file in [
    "train-images-idx3-ubyte.gz",
    "train-labels-idx1-ubyte.gz",
    "t10k-images-idx3-ubyte.gz",
    "t10k-labels-idx1-ubyte.gz",
]:
    if (base_dir / target_file[:-3]).exists():
        continue
    with c.request("GET", base_url + target_file, preload_content=False) as resp, open(
        base_dir / target_file, "wb"
    ) as out_file:
        shutil.copyfileobj(resp, out_file)
        resp.release_conn()
    with gzip.open(base_dir / target_file, "rb") as f_in, open(
        base_dir / target_file[:-3], "wb"
    ) as f_out:
        shutil.copyfileobj(f_in, f_out)
    os.unlink(base_dir / target_file)

mnist_ds = Input(path=base_dir.as_posix())

通过定义 Input,可以创建对数据源位置的引用。 数据会保留在其现有位置,因此不会产生额外的存储成本。

创建用于生成管道的组件

图像分类任务可以拆分为三个步骤:准备数据、训练模型和对模型评分。

Azure 机器学习组件是一个自包含的代码片段,可完成机器学习管道中的一个步骤。 在本文中,你将为图像分类任务创建三个组件:

  • 准备用于训练和测试的数据
  • 使用训练数据训练神经网络进行图像分类
  • 使用测试数据为模型评分

对于每个组件,完成以下步骤:

  1. 准备包含执行逻辑的 Python 脚本
  2. 定义组件的接口
  3. 添加组件的其他元数据,包括运行时环境和运行组件的命令

后续部分介绍如何通过两种方式创建组件。 对于前两个组件,请使用 Python 函数。 对于第三个组件,请使用 YAML 定义。

创建数据准备组件

此管道中的第一个组件将压缩数据文件 fashion_ds 转换为两个 .csv 文件,一个用于训练,另一个用于评分。 使用 Python 函数定义此组件。

如果您正在使用 Azure 机器学习示例库中的示例,文件夹 prep 中已经有源文件。 此文件夹包含两个用于构造组件的文件: prep_component.py用于定义组件,以及 conda.yaml定义组件的运行时环境。

使用 Python 函数定义组件

command_component()使用函数作为修饰器,可以轻松定义组件的接口、它的元数据以及从 Python 函数运行的代码。 每个修饰的 Python 函数都转换为管道服务可以处理的单个静态规范(YAML)。

# Converts MNIST-formatted files at the passed-in input path to training data output path and test data output path
import os
from pathlib import Path
from mldesigner import command_component, Input, Output


@command_component(
    name="prep_data",
    version="1",
    display_name="Prep Data",
    description="Convert data to CSV file, and split to training and test data",
    environment=dict(
        conda_file=Path(__file__).parent / "conda.yaml",
        image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
    ),
)
def prepare_data_component(
    input_data: Input(type="uri_folder"),
    training_data: Output(type="uri_folder"),
    test_data: Output(type="uri_folder"),
):
    convert(
        os.path.join(input_data, "train-images-idx3-ubyte"),
        os.path.join(input_data, "train-labels-idx1-ubyte"),
        os.path.join(training_data, "mnist_train.csv"),
        60000,
    )
    convert(
        os.path.join(input_data, "t10k-images-idx3-ubyte"),
        os.path.join(input_data, "t10k-labels-idx1-ubyte"),
        os.path.join(test_data, "mnist_test.csv"),
        10000,
    )


def convert(imgf, labelf, outf, n):
    f = open(imgf, "rb")
    l = open(labelf, "rb")
    o = open(outf, "w")

    f.read(16)
    l.read(8)
    images = []

    for i in range(n):
        image = [ord(l.read(1))]
        for j in range(28 * 28):
            image.append(ord(f.read(1)))
        images.append(image)

    for image in images:
        o.write(",".join(str(pix) for pix in image) + "\n")
    f.close()
    o.close()
    l.close()

前面的代码使用修饰器定义具有显示名称 Prep Data@command_component 组件:

  • name 是组件的唯一标识符

  • version 是组件的当前版本。 组件可以有多个版本

  • display_name 是 UI 组件的友好显示名称

  • description 描述组件可以完成的任务

  • environment 使用 conda.yaml 文件指定组件的运行时环境

    该文件 conda.yaml 包含用于组件的所有包:

name: imagekeras_prep_conda_env
channels:
  - defaults
dependencies:
  - python=3.7.11
  - pip=20.0
  - pip:
    - mldesigner==0.1.0b12
  • prepare_data_component函数为input_data定义一个输入,并为training_datatest_data定义两个输出。
    • input_data 是输入数据路径
    • training_datatest_data 是用于训练数据和测试数据的输出数据路径
  • 该组件将数据从 input_data 中转换为 training_data 用于训练数据的 .csv 文件,以及 test_data 用于测试数据的 .csv 文件

在工作室 UI 中,组件显示为:

  • 管道图中的块
  • input_datatraining_datatest_data 是组件的端口,用于连接到其他组件进行数据流处理。

UI 和代码中“准备数据”组件的屏幕截图。

现在,你已为 Prep Data 组件准备所有源文件。

创建模型训练组件

在本部分中,使用 Python 函数创建用于训练图像分类模型的组件,就像使用 Prep Data 组件一样。

由于训练逻辑更为复杂,因此请将训练代码放在单独的 Python 文件中。

此组件的源文件位于 trainAzure 机器学习示例存储库中的文件夹中。 此文件夹包含三个用于构造组件的文件:

  • train.py 包含用于训练模型的逻辑
  • train_component.py 定义组件的接口并从中导入函数 train.py
  • conda.yaml 定义组件的运行时环境

获取包含逻辑的脚本

该文件 train.py 包含一个正常的 Python 函数,用于为图像分类训练 Keras 神经网络执行逻辑。 若要查看代码,请参阅 GitHub 上的 train.py 文件

使用 Python 函数定义组件

定义训练函数后,可以在 @command_component Azure 机器学习 SDK v2 中使用,将函数包装为用于 Azure 机器学习管道的组件:

import os
from pathlib import Path
from mldesigner import command_component, Input, Output


@command_component(
    name="train_image_classification_keras",
    version="1",
    display_name="Train Image Classification Keras",
    description="train image classification with keras",
    environment=dict(
        conda_file=Path(__file__).parent / "conda.yaml",
        image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
    ),
)
def keras_train_component(
    input_data: Input(type="uri_folder"),
    output_model: Output(type="uri_folder"),
    epochs=10,
):
    # avoid dependency issue, execution logic is in train() func in train.py file
    from train import train

    train(input_data, output_model, epochs)

前面的代码使用Train Image Classification Keras定义了具有显示名称@command_component的组件。

keras_train_component 函数定义:

  • 一个输入, input_data用于源训练数据
  • 一个输入epochs,指定训练期间要使用的纪元数。
  • 一个输出, output_model指定模型文件的输出路径

epochs 的默认值为 10。 此组件的逻辑来自 train()train.py 中的函数。

训练模型组件具有比准备数据组件更复杂的配置。 conda.yaml 如下所示:

name: imagekeras_train_conda_env
channels:
  - defaults
dependencies:
  - python=3.7.11
  - pip=20.2
  - pip:
    - mldesigner==0.1.0b12
    - azureml-mlflow==1.50.0
    - tensorflow==2.7.0
    - numpy==1.21.4
    - scikit-learn==1.0.1
    - pandas==1.3.4
    - matplotlib==3.2.2
    - protobuf==3.20.0

现在,你已准备好组件 Train Image Classification Keras 的所有源文件。

创建模型评分组件

在本部分中,你将创建一个组件,以便使用 YAML 规范和脚本为训练的模型评分。

如果您正在使用 Azure 机器学习示例库中的示例,文件夹 score 中已经有源文件。 此文件夹包含三个用于构造组件的文件:

  • score.py 包含组件的源代码
  • score.yaml 定义组件的接口和其他详细信息
  • conda.yaml 定义组件的运行时环境

获取包含逻辑的脚本

该文件 score.py 包含执行模型评分逻辑的普通 Python 函数:

from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.utils import to_categorical
from keras.callbacks import Callback
from keras.models import load_model

import argparse
from pathlib import Path
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import mlflow


def get_file(f):

    f = Path(f)
    if f.is_file():
        return f
    else:
        files = list(f.iterdir())
        if len(files) == 1:
            return files[0]
        else:
            raise Exception("********This path contains more than one file*******")


def parse_args():
    # setup argparse
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument(
        "--input_data", type=str, help="path containing data for scoring"
    )
    parser.add_argument(
        "--input_model", type=str, default="./", help="input path for model"
    )

    parser.add_argument(
        "--output_result", type=str, default="./", help="output path for model"
    )

    # parse args
    args = parser.parse_args()

    # return args
    return args


def score(input_data, input_model, output_result):

    test_file = get_file(input_data)
    data_test = pd.read_csv(test_file, header=None)

    img_rows, img_cols = 28, 28
    input_shape = (img_rows, img_cols, 1)

    # Read test data
    X_test = np.array(data_test.iloc[:, 1:])
    y_test = to_categorical(np.array(data_test.iloc[:, 0]))
    X_test = (
        X_test.reshape(X_test.shape[0], img_rows, img_cols, 1).astype("float32") / 255
    )

    # Load model
    files = [f for f in os.listdir(input_model) if f.endswith(".h5")]
    model = load_model(input_model + "/" + files[0])

    # Log metrics of the model
    eval = model.evaluate(X_test, y_test, verbose=0)

    mlflow.log_metric("Final test loss", eval[0])
    print("Test loss:", eval[0])

    mlflow.log_metric("Final test accuracy", eval[1])
    print("Test accuracy:", eval[1])

    # Score model using test data
    y_predict = model.predict(X_test)
    y_result = np.argmax(y_predict, axis=1)

    # Output result
    np.savetxt(output_result + "/predict_result.csv", y_result, delimiter=",")


def main(args):
    score(args.input_data, args.input_model, args.output_result)


# run script
if __name__ == "__main__":
    # parse args
    args = parse_args()

    # call main function
    main(args)

中的 score.py 代码采用三个命令行参数: input_datainput_modeloutput_result。 程序使用输入数据为输入模型评分,然后输出结果。

使用 YAML 定义组件

本部分介绍如何使用有效的 YAML 组件规范格式创建组件规范。 此文件指定以下信息:

  • 元数据:名称、显示名称、版本、类型等
  • 接口:输入和输出
  • 命令、代码和环境:用于运行组件的命令、代码和环境
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
type: command

name: score_image_classification_keras
display_name: Score Image Classification Keras
inputs:
  input_data: 
    type: uri_folder
  input_model:
    type: uri_folder
outputs:
  output_result:
    type: uri_folder
code: ./
command: python score.py --input_data ${{inputs.input_data}} --input_model ${{inputs.input_model}} --output_result ${{outputs.output_result}}
environment:
  conda_file: ./conda.yaml
  image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04

  • name 是组件的唯一标识符。 它的显示名称为 Score Image Classification Keras
  • 此组件有两个输入和一个输出
  • 源代码路径在 code 节中定义。 组件在云中运行时,该路径中的所有文件将作为组件的快照上传
  • command 节指定要在组件运行时执行的命令
  • environment 部分包含 Docker 映像和 conda YAML 文件。 源文件位于示例存储库

现在,你拥有模型评分组件的所有源文件。

加载组件以生成管道

可以导入由 Python 函数定义的数据准备组件和模型训练组件,就像普通 Python 函数一样。

以下代码分别从prepare_data_component()文件夹中的文件keras_train_component()prep_component.py文件夹中的文件prep导入train_componenttrain函数。

%load_ext autoreload
%autoreload 2

# load component function from component python file
from prep.prep_component import prepare_data_component
from train.train_component import keras_train_component

# print hint of components
help(prepare_data_component)
help(keras_train_component)

可以使用函数 load_component() 加载 YAML 定义的分数组件。

# load component function from yaml
keras_score_component = load_component(source="./score/score.yaml")

从工作区加载已注册的组件

注意

若要从工作区加载已注册的组件,必须首先配置工作区连接,如 “获取工作区访问权限 ”部分中所述。 以下操作需要该 ml_client 对象。

如果你的组件已在工作区中注册,则可以使用该方法 ml_client.components.get() 直接加载它们。 如果想要重复使用以前由你注册的组件或由其他团队成员共享的组件,此方法非常有用。

# Load a registered component by name and version
registered_component = ml_client.components.get(
    name="my_registered_component", 
    version="1.0.0"
)

# Load the latest version of a registered component
latest_component = ml_client.components.get(
    name="my_registered_component"
)

可以列出工作区中的所有可用组件,以查找所需的组件:

# List all components in the workspace
components = ml_client.components.list()
for component in components:
    print(f"Name: {component.name}, Version: {component.version}")

加载后,可以在管道中使用已注册的组件,就像从本地文件或 Python 函数加载的组件一样。

生成管道

已创建并加载所有组件和输入数据以生成管道。 现在可以将它们组合成管道:

注意

若要使用 无服务器计算,请添加到 from azure.ai.ml.entities import ResourceConfiguration 文件的顶部。 然后替换:

  • default_compute=cpu_compute_target 替换为 default_compute="serverless"
  • train_node.compute = gpu_compute_target 替换为 train_node.resources = ResourceConfiguration(instance_type="Standard_NC6s_v3", instance_count=2)
# define a pipeline containing 3 nodes: Prepare data node, train node, and score node
@pipeline(
    default_compute=cpu_compute_target,
)
def image_classification_keras_minist_convnet(pipeline_input_data):
    """E2E image classification pipeline with keras using python sdk."""
    prepare_data_node = prepare_data_component(input_data=pipeline_input_data)

    train_node = keras_train_component(
        input_data=prepare_data_node.outputs.training_data
    )
    train_node.compute = gpu_compute_target

    score_node = keras_score_component(
        input_data=prepare_data_node.outputs.test_data,
        input_model=train_node.outputs.output_model,
    )


# create a pipeline
pipeline_job = image_classification_keras_minist_convnet(pipeline_input_data=fashion_ds)

此管道具有默认计算 cpu_compute_target。 如果未指定特定节点的计算,该节点将在默认计算上运行。

此管道具有管道级输入 pipeline_input_data。 在提交管道作业时,可以为管道输入设置一个值。

管道包含三个节点: prepare_data_nodetrain_nodescore_node

  • input_data使用prepare_data_nodepipeline_input_data
  • input_data 所属的 train_nodetraining_dataprepare_data_node 输出
  • input_datascore_nodetest_dataprepare_data_node输出,而input_modeloutput_modeltrain_node
  • 由于 train_node 训练 CNN 模型,因此可以将计算指定为 gpu_compute_target 提高训练性能

提交管道作业

构造管道后,即可将作业提交到工作区。 若要提交作业,首先需要连接到工作区。

获取工作区访问权限

配置凭据

DefaultAzureCredential 用于获取工作区的访问权限。 DefaultAzureCredential 应该能够处理大多数 Azure SDK 身份验证方案。

如果 DefaultAzureCredential 不起作用,请参阅 此配置凭据示例标识包

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.chinacloudapi.cn/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

获取具有计算的工作区的句柄

创建用于 MLClient 管理 Azure 机器学习服务的对象。 如果使用 无服务器计算,则无需创建这些计算。

# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

# Retrieve an already attached Azure Machine Learning Compute.
cpu_compute_target = "cpu-cluster"
print(ml_client.compute.get(cpu_compute_target))
gpu_compute_target = "gpu-cluster"
print(ml_client.compute.get(gpu_compute_target))

重要

此代码片段要求工作区配置 JSON 文件保存在当前目录或其父目录中。 若要详细了解如何创建工作区,请参阅创建工作区资源。 有关将配置保存到文件的详细信息,请参阅 “创建工作区配置文件”。

将管道作业提交到工作区

现在,你已拥有工作区的句柄,可以提交管道作业了:

pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_samples"
)
pipeline_job

前面的代码将此图像分类管道作业提交到名为pipeline_samples的实验。 如果实验不存在,它会自动创建试验。 pipeline_input_data 使用 fashion_ds

提交试验的调用将快速完成,并生成如下所示的输出:

试验 名称 类型 状态 详细信息页
pipeline_samples sharp_pipe_4gvqx6h1fb 管道 准备中 链接到 Azure 机器学习工作室

您可以选择该链接来监视管道的运行。 或者,可以通过运行以下代码等待它完成:

# wait until the job completes
ml_client.jobs.stream(pipeline_job.name)

重要

首次管道运行大约需要 15 分钟。 下载所有依赖项,创建 Docker 映像,并预配并创建 Python 环境。 再次运行管道所需的时间更少,因为这些资源被重复使用而不是创建。 但是,管道的总运行时取决于脚本的工作负荷以及每个管道步骤中运行的进程。

在 UI 中检查输出并调试管道

可以选择 Link to Azure Machine Learning studio,这是管道的作业详细信息页面。 可以看到管道图:

管道作业详细信息页的屏幕截图。

可以通过右键单击组件来检查每个组件的日志和输出,或选择组件以打开其详细信息窗格。 若要详细了解如何在 UI 中调试管道,请参阅 使用 Azure 机器学习工作室调试管道故障

(可选)将组件注册到工作区

在前面的部分中,你将使用三个组件生成管道来完成图像分类任务。 还可以将组件注册到工作区,以便可以在工作区中共享和重复使用它们。 以下示例演示如何注册数据准备组件:

try:
    # try get back the component
    prep = ml_client.components.get(name="prep_data", version="1")
except:
    # if not exists, register component using following code
    prep = ml_client.components.create_or_update(prepare_data_component)

# list all components registered in workspace
for c in ml_client.components.list():
    print(c)

您可以使用 ml_client.components.get() 根据名称和版本获取已注册的组件。 可用于 ml_client.components.create_or_update() 注册以前从 Python 函数或 YAML 加载的组件。

后续步骤