使用 Azure 机器学习 SDK v2 和组件创建和运行机器学习管道
适用范围:Python SDK azure-ai-ml v2(最新版)
本文介绍如何使用 Python SDK v2 生成 Azure 机器学习管道,以完成包含三个步骤的图像分类任务:准备数据、训练图像分类模型和为模型评分。 机器学习管道可以优化工作流以提高其速度、可移植性和可重用性,使你能够将工作重心放在机器学习上,而不必关注基础结构和自动化。
该示例将训练一个小型 Keras 卷积神经网络,以对 Fashion MNIST 数据集中的图像进行分类。 管道如下所示。
在本文中,你将完成以下任务:
- 为管道作业准备输入数据
- 创建三个组件来准备数据、训练和评分
- 从组件构建管道
- 使用计算获取工作区访问权限
- 提交管道作业
- 查看组件的输出和已训练的神经网络
- (可选)注册组件供将来重用以及在工作区中共享
如果没有 Azure 订阅,请在开始前创建一个试用版订阅。 立即尝试试用版订阅。
先决条件
Azure 机器学习工作区 - 如果没有工作区,请完成“创建资源”教程。
装有 Azure 机器学习 Python SDK v2 的 Python 环境 - 安装说明 - 查看入门部分。 此环境用于定义和控制 Azure 机器学习资源,独立于运行时用于训练的环境。
克隆示例存储库
若要运行训练示例,请首先克隆示例存储库,然后更改为
sdk
目录:git clone --depth 1 https://github.com/Azure/azureml-examples cd azureml-examples/sdk
启动交互式 Python 会话
本文使用适用于 Azure 机器学习的 Python SDK 创建和控制 Azure 机器学习管道。 本文假设你将在 Python REPL 环境或 Jupyter 笔记本中以交互方式运行代码片段。
本文基于 Azure 机器学习示例存储库的 sdk/python/jobs/pipelines/2e_image_classification_keras_minist_convnet
目录中的 image_classification_keras_minist_convnet.ipynb 笔记本。
导入所需的库
导入 Azure 机器学习所需的所有库以便在本文中使用:
# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component
为管道作业准备输入数据
需要为此图像分类管道准备输入数据。
Fashion-MNIST 是一个时尚图像数据集,包含 10 个类别。 每张图像都是 28x28 的灰度图像,有 60,000 张训练图像和 10,000 张测试图像。 作为图像分类问题,Fashion-MNIST 比经典 MNIST 手写数字数据库更难。 它以与原始手写数字数据库相同的压缩二进制形式分发。
导入你需要的 Azure 机器学习所需的所有库。
通过定义 Input
,可以创建对数据源位置的引用。 数据会保留在其现有位置,因此不会产生额外的存储成本。
创建用于生成管道的组件
图像分类任务可划分为三个步骤:准备数据、训练模型和为模型评分。
Azure 机器学习组件是一段独立的代码,执行机器学习管道中的一个步骤。 在本文中,你将为图像分类任务创建三个组件:
- 准备并测试用于训练的数据
- 使用训练数据来训练用于图像分类的神经网络
- 使用测试数据为模型评分
对于每个组件,需要准备以下内容:
准备包含执行逻辑的 Python 脚本
定义组件的接口
添加组件的其他元数据,包括运行时环境、用于运行组件的命令,等等。
下一部分将介绍如何以两种不同的方式创建组件:前两个组件使用 Python 函数,第三个组件使用 YAML 定义。
创建数据准备组件
此管道中的第一个组件将 fashion_ds
的压缩数据文件转换为两个 csv 文件,其中一个文件用于训练,另一个用于评分。 你将使用 Python 函数来定义此组件。
如果按照 Azure 机器学习示例存储库中的示例进行操作,则源文件已在 prep/
文件夹中提供。 此文件夹包含两个用于构造组件的文件:prep_component.py
定义组件,conda.yaml
定义组件的运行时环境。
使用 Python 函数定义组件
使用 command_component()
函数作为修饰器,可以轻松定义组件的接口、元数据,以及要从 Python 函数执行的代码。 修饰的每个 Python 函数将转换为管道服务可以处理的单个静态规范 (YAML)。
# Converts MNIST-formatted files at the passed-in input path to training data output path and test data output path
import os
from pathlib import Path
from mldesigner import command_component, Input, Output
@command_component(
name="prep_data",
version="1",
display_name="Prep Data",
description="Convert data to CSV file, and split to training and test data",
environment=dict(
conda_file=Path(__file__).parent / "conda.yaml",
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
),
)
def prepare_data_component(
input_data: Input(type="uri_folder"),
training_data: Output(type="uri_folder"),
test_data: Output(type="uri_folder"),
):
convert(
os.path.join(input_data, "train-images-idx3-ubyte"),
os.path.join(input_data, "train-labels-idx1-ubyte"),
os.path.join(training_data, "mnist_train.csv"),
60000,
)
convert(
os.path.join(input_data, "t10k-images-idx3-ubyte"),
os.path.join(input_data, "t10k-labels-idx1-ubyte"),
os.path.join(test_data, "mnist_test.csv"),
10000,
)
def convert(imgf, labelf, outf, n):
f = open(imgf, "rb")
l = open(labelf, "rb")
o = open(outf, "w")
f.read(16)
l.read(8)
images = []
for i in range(n):
image = [ord(l.read(1))]
for j in range(28 * 28):
image.append(ord(f.read(1)))
images.append(image)
for image in images:
o.write(",".join(str(pix) for pix in image) + "\n")
f.close()
o.close()
l.close()
以上代码使用 @command_component
修饰器定义一个显示名称为 Prep Data
的组件:
name
是组件的唯一标识符。version
是组件的当前版本。 一个组件可以有多个版本。display_name
是组件在 UI 中的友好显示名称,它不是唯一的。description
通常描述此组件可以完成的任务。environment
指定此组件的运行时环境。 此组件的环境指定一个 docker 映像并引用conda.yaml
文件。conda.yaml
文件包含用于组件的所有包,如下所示:
name: imagekeras_prep_conda_env
channels:
- defaults
dependencies:
- python=3.7.11
- pip=20.0
- pip:
- mldesigner==0.1.0b12
prepare_data_component
函数定义input_data
的一个输入,并定义training_data
和test_data
的两个输出。input_data
是输入数据路径。training_data
和test_data
是训练数据和测试数据的输出数据路径。- 此组件将
input_data
中的训练数据 csv 转换为training_data
,将测试数据 csv 转换为test_data
。
下面组件在工作室 UI 中的外观。
- 组件是管道图形中的一个块。
input_data
、training_data
和test_data
是组件的端口,它们连接到其他组件以流式传输数据。
现在,已经为 Prep Data
组件准备了所有源文件。
创建 train-model 组件
在本部分,你将创建一个组件用于在 Python 函数中训练图像分类模型,这类似于 Prep Data
组件。
不同之处在于,由于训练逻辑更复杂,你可以将原始训练代码放在单独的 Python 文件中。
此组件的源文件位于 Azure 机器学习示例存储库中的 train/
文件夹下。 此文件夹包含三个用于构造组件的文件:
train.py
:包含用于训练模型的实际逻辑。train_component.py
:定义组件的接口,并在train.py
中导入函数。conda.yaml
:定义组件的运行时环境。
获取包含执行逻辑的脚本
train.py
文件包含一个普通的 Python 函数,该函数执行训练模型逻辑,以训练用于图像分类的 Keras 神经网络。 若要查看代码,请参阅 GitHub 上的 train.py 文件。
使用 Python 函数定义组件
成功定义训练函数后,可以使用 Azure 机器学习 SDK v2 中的 @command_component
将函数包装为可在 Azure 机器学习管道中使用的组件。
import os
from pathlib import Path
from mldesigner import command_component, Input, Output
@command_component(
name="train_image_classification_keras",
version="1",
display_name="Train Image Classification Keras",
description="train image classification with keras",
environment=dict(
conda_file=Path(__file__).parent / "conda.yaml",
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
),
)
def keras_train_component(
input_data: Input(type="uri_folder"),
output_model: Output(type="uri_folder"),
epochs=10,
):
# avoid dependency issue, execution logic is in train() func in train.py file
from train import train
train(input_data, output_model, epochs)
以上代码使用 @command_component
定义一个显示名称为 Train Image Classification Keras
的组件:
keras_train_component
函数定义一个输入input_data
(训练数据的来源于其中),一个输入epochs
(指定训练期间的循环),以及一个输出output_model
(在其中输出模型文件)。epochs
的默认值为 10。 此组件的执行逻辑来自上述train.py
中的train()
函数。
train-model 组件的配置比 prep-data 组件稍微复杂一些。 conda.yaml
如下所示:
name: imagekeras_train_conda_env
channels:
- defaults
dependencies:
- python=3.7.11
- pip=20.2
- pip:
- mldesigner==0.1.0b12
- azureml-mlflow==1.50.0
- tensorflow==2.7.0
- numpy==1.21.4
- scikit-learn==1.0.1
- pandas==1.3.4
- matplotlib==3.2.2
- protobuf==3.20.0
现在,已经为 Train Image Classification Keras
组件准备了所有源文件。
创建 score-model 组件
创建上述组件后,你将按照本部分所述创建一个组件来通过 Yaml 规范和脚本为已训练的模型评分。
如果按照 Azure 机器学习示例存储库中的示例进行操作,则源文件已在 score/
文件夹中提供。 此文件夹包含三个用于构造组件的文件:
score.py
:包含组件的源代码。score.yaml
:定义组件的接口和其他详细信息。conda.yaml
:定义组件的运行时环境。
获取包含执行逻辑的脚本
score.py
文件包含一个执行训练模型逻辑的普通 Python 函数。
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.utils import to_categorical
from keras.callbacks import Callback
from keras.models import load_model
import argparse
from pathlib import Path
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import mlflow
def get_file(f):
f = Path(f)
if f.is_file():
return f
else:
files = list(f.iterdir())
if len(files) == 1:
return files[0]
else:
raise Exception("********This path contains more than one file*******")
def parse_args():
# setup argparse
parser = argparse.ArgumentParser()
# add arguments
parser.add_argument(
"--input_data", type=str, help="path containing data for scoring"
)
parser.add_argument(
"--input_model", type=str, default="./", help="input path for model"
)
parser.add_argument(
"--output_result", type=str, default="./", help="output path for model"
)
# parse args
args = parser.parse_args()
# return args
return args
def score(input_data, input_model, output_result):
test_file = get_file(input_data)
data_test = pd.read_csv(test_file, header=None)
img_rows, img_cols = 28, 28
input_shape = (img_rows, img_cols, 1)
# Read test data
X_test = np.array(data_test.iloc[:, 1:])
y_test = to_categorical(np.array(data_test.iloc[:, 0]))
X_test = (
X_test.reshape(X_test.shape[0], img_rows, img_cols, 1).astype("float32") / 255
)
# Load model
files = [f for f in os.listdir(input_model) if f.endswith(".h5")]
model = load_model(input_model + "/" + files[0])
# Log metrics of the model
eval = model.evaluate(X_test, y_test, verbose=0)
mlflow.log_metric("Final test loss", eval[0])
print("Test loss:", eval[0])
mlflow.log_metric("Final test accuracy", eval[1])
print("Test accuracy:", eval[1])
# Score model using test data
y_predict = model.predict(X_test)
y_result = np.argmax(y_predict, axis=1)
# Output result
np.savetxt(output_result + "/predict_result.csv", y_result, delimiter=",")
def main(args):
score(args.input_data, args.input_model, args.output_result)
# run script
if __name__ == "__main__":
# parse args
args = parse_args()
# call main function
main(args)
score.py 中的代码采用三个命令行参数:input_data
、input_model
和 output_result
。 程序将使用输入数据为输入模型评分,然后输出评分结果。
通过 Yaml 定义组件
本部分介绍如何以有效的 YAML 组件规范格式创建组件规范。 此文件指定以下信息:
- 元数据:名称、display_name、版本、类型等。
- 接口:输入和输出
- 命令、代码和环境:用于运行组件的命令、代码和环境
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
type: command
name: score_image_classification_keras
display_name: Score Image Classification Keras
inputs:
input_data:
type: uri_folder
input_model:
type: uri_folder
outputs:
output_result:
type: uri_folder
code: ./
command: python score.py --input_data ${{inputs.input_data}} --input_model ${{inputs.input_model}} --output_result ${{outputs.output_result}}
environment:
conda_file: ./conda.yaml
image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
name
是组件的唯一标识符。 其显示名称为Score Image Classification Keras
。- 此组件有两个输入和一个输出。
- 其源代码路径在
code
节中定义,在云中运行该组件时,该路径中的所有文件将作为该组件的快照上传。 command
节指定在运行此组件时要执行的命令。environment
节包含一个 docker 映像和一个 conda yaml 文件。 源文件位于示例存储库中。
现已获取 score-model 组件的所有源文件。
加载组件以生成管道
对于 Python 函数定义的 prep-data 组件和 train-model 组件,可以像导入普通的 Python 函数那样导入组件。
以下代码分别从 prep
文件夹下的 prep_component.py
文件和 train
文件夹下的 train_component
文件导入 prepare_data_component()
和 keras_train_component()
函数。
%load_ext autoreload
%autoreload 2
# load component function from component python file
from prep.prep_component import prepare_data_component
from train.train_component import keras_train_component
# print hint of components
help(prepare_data_component)
help(keras_train_component)
对于 yaml 定义的评分组件,可以使用 load_component()
函数来加载。
# load component function from yaml
keras_score_component = load_component(path="./score/score.yaml")
生成管道
现在你已创建并加载了所有用于生成管道的组件和输入数据。 可将它们组合成管道:
注意
若要使用无服务器计算,请将 from azure.ai.ml.entities import ResourceConfiguration
添加到顶部。
然后替换:
- 带
default_compute="serverless",
的default_compute=cpu_compute_target,
- 带
train_node.resources = "ResourceConfiguration(instance_type="Standard_NC6s_v3",instance_count=2)
的train_node.compute = gpu_compute_target
# define a pipeline containing 3 nodes: Prepare data node, train node, and score node
@pipeline(
default_compute=cpu_compute_target,
)
def image_classification_keras_minist_convnet(pipeline_input_data):
"""E2E image classification pipeline with keras using python sdk."""
prepare_data_node = prepare_data_component(input_data=pipeline_input_data)
train_node = keras_train_component(
input_data=prepare_data_node.outputs.training_data
)
train_node.compute = gpu_compute_target
score_node = keras_score_component(
input_data=prepare_data_node.outputs.test_data,
input_model=train_node.outputs.output_model,
)
# create a pipeline
pipeline_job = image_classification_keras_minist_convnet(pipeline_input_data=fashion_ds)
该管道具有默认计算 cpu_compute_target
,这意味着,如果不为特定节点指定计算,则该节点将在默认计算上运行。
该管道具有管道级输入 pipeline_input_data
。 可以在提交管道作业时为管道输入赋值。
该管道包含三个节点:prepare_data_node、train_node 和 score_node。
prepare_data_node
的input_data
使用pipeline_input_data
的值。train_node
的input_data
来自 prepare_data_node 的training_data
输出。score_node 的
input_data
来自 prepare_data_node 的test_data
输出,input_model
来自 train_node 的output_model
。由于
train_node
将训练 CNN 模型,因此你可以将其计算指定为 gpu_compute_target,从而提高训练性能。
提交管道作业
构造管道后,接下来可以提交到工作区。 若要提交作业,首先需要连接到工作区。
获取工作区访问权限
配置凭据
我们将使用 DefaultAzureCredential
来获取工作区访问权限。 DefaultAzureCredential
应该能够处理大多数 Azure SDK 身份验证方案。
如果此方法不起作用,请参考更多可用凭据:配置凭据示例、azure-identity 参考文档。
try:
credential = DefaultAzureCredential()
# Check if given credential can get token successfully.
credential.get_token("https://management.chinacloudapi.cn/.default")
except Exception as ex:
# Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
credential = InteractiveBrowserCredential()
使用计算获取工作区句柄
创建一个 MLClient
对象来管理 Azure 机器学习服务。 如果使用无服务器计算,则无需创建这些计算。
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)
# Retrieve an already attached Azure Machine Learning Compute.
cpu_compute_target = "cpu-cluster"
print(ml_client.compute.get(cpu_compute_target))
gpu_compute_target = "gpu-cluster"
print(ml_client.compute.get(gpu_compute_target))
将管道作业提交到工作区
获取工作区句柄后,接下来可以提交管道作业。
pipeline_job = ml_client.jobs.create_or_update(
pipeline_job, experiment_name="pipeline_samples"
)
pipeline_job
以上代码将此图像分类管道作业提交到名为 pipeline_samples
的试验。 如果该试验不存在,它会自动创建该试验。 pipeline_input_data
使用 fashion_ds
。
对 pipeline_job
的调用会生成如下所示的输出:
对 submit
和 Experiment
的调用很快完成,并生成类似于以下内容的输出:
试验 | 名称 | 类型 | 状态 | 详细信息页 |
---|---|---|---|---|
pipeline_samples | sharp_pipe_4gvqx6h1fb | 管道 | 备餐中 | Azure 机器学习工作室链接。 |
可以通过打开链接来监视管道运行,也可以通过运行以下代码来阻止管道运行,直到管道运行完成:
# wait until the job completes
ml_client.jobs.stream(pipeline_job.name)
重要
首次管道运行需要大约 15 分钟。 必须下载所有依赖项、创建 Docker 映像,并预配和创建 Python 环境。 再次运行管道所花费的时间会大幅减少,因为会重复使用这些资源,而无需再次创建。 但是,管道的总运行时间取决于脚本的工作负荷,以及每个管道步骤中运行的进程数。
在 UI 中签出输出并调试管道
可以打开 Link to Azure Machine Learning studio
,即管道的作业详细信息页。 你将看到如下所示的管道图形。
可以通过右键单击每个组件来检查该组件的日志和输出,或者选择组件以打开其详细信息窗格。 若要详细了解如何在 UI 中调试管道,请参阅如何使用调试管道失败。
(可选)将组件注册到工作区
在上一部分,你已使用三个组件生成了一个管道,以便端到端完成图像分类任务。 还可将组件注册到工作区,以便可以在工作区中共享和重用它们。 下面是注册 prep-data 组件的示例。
try:
# try get back the component
prep = ml_client.components.get(name="prep_data", version="1")
except:
# if not exists, register component using following code
prep = ml_client.components.create_or_update(prepare_data_component)
# list all components registered in workspace
for c in ml_client.components.list():
print(c)
使用 ml_client.components.get()
,可以按名称和版本获取已注册的组件。 使用 ml_client.components.create_or_update()
,可以注册先前从 Python 函数或 yaml 加载的组件。
后续步骤
- 有关如何使用机器学习 SDK 生成管道的更多示例,请参阅示例存储库。
- 有关如何使用工作室 UI 提交和调试管道的信息,请参阅如何在 UI 中使用组件创建管道。
- 有关如何使用 Azure 机器学习 CLI 创建组件和管道的信息,请参阅如何通过 CLI 使用组件创建管道。