使用 Azure 机器学习 SDK v2 和组件创建和运行机器学习管道
适用范围:Python SDK azure-ai-ml v2(最新版)
本文介绍如何使用 Python SDK v2 生成 Azure 机器学习管道,以完成包含三个步骤的图像分类任务:准备数据、训练图像分类模型和为模型评分。 机器学习管道可以优化工作流以提高其速度、可移植性和可重用性,使你能够将工作重心放在机器学习上,而不必关注基础结构和自动化。
该示例将训练一个小型 Keras 卷积神经网络,以对 Fashion MNIST 数据集中的图像进行分类。 管道如下所示。
在本文中,你将完成以下任务:
- 为管道作业准备输入数据
- 创建三个组件来准备数据、训练和评分
- 从组件构建管道
- 使用计算获取工作区访问权限
- 提交管道作业
- 查看组件的输出和已训练的神经网络
- (可选)注册组件供将来重用以及在工作区中共享
如果没有 Azure 订阅,请在开始前创建一个试用版订阅。 立即尝试试用版订阅。
Azure 机器学习工作区 - 如果没有工作区,请完成“创建资源”教程。
装有 Azure 机器学习 Python SDK v2 的 Python 环境 - 安装说明 - 查看入门部分。 此环境用于定义和控制 Azure 机器学习资源,独立于运行时用于训练的环境。
克隆示例存储库
若要运行训练示例,请首先克隆示例存储库,然后更改为
sdk
目录:git clone --depth 1 https://github.com/Azure/azureml-examples cd azureml-examples/sdk
本文使用适用于 Azure 机器学习的 Python SDK 创建和控制 Azure 机器学习管道。 本文假设你将在 Python REPL 环境或 Jupyter 笔记本中以交互方式运行代码片段。
本文基于 Azure 机器学习示例存储库的 sdk/python/jobs/pipelines/2e_image_classification_keras_minist_convnet
目录中的 image_classification_keras_minist_convnet.ipynb 笔记本。
导入 Azure 机器学习所需的所有库以便在本文中使用:
# import required libraries
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component
需要为此图像分类管道准备输入数据。
Fashion-MNIST 是一个时尚图像数据集,包含 10 个类别。 每张图像都是 28x28 的灰度图像,有 60,000 张训练图像和 10,000 张测试图像。 作为图像分类问题,Fashion-MNIST 比经典 MNIST 手写数字数据库更难。 它以与原始手写数字数据库相同的压缩二进制形式分发。
导入你需要的 Azure 机器学习所需的所有库。
通过定义 Input
,可以创建对数据源位置的引用。 数据会保留在其现有位置,因此不会产生额外的存储成本。
图像分类任务可划分为三个步骤:准备数据、训练模型和为模型评分。
Azure 机器学习组件是一段独立的代码,执行机器学习管道中的一个步骤。 在本文中,你将为图像分类任务创建三个组件:
- 准备并测试用于训练的数据
- 使用训练数据来训练用于图像分类的神经网络
- 使用测试数据为模型评分
对于每个组件,需要准备以下内容:
准备包含执行逻辑的 Python 脚本
定义组件的接口
添加组件的其他元数据,包括运行时环境、用于运行组件的命令,等等。
下一部分将介绍如何以两种不同的方式创建组件:前两个组件使用 Python 函数,第三个组件使用 YAML 定义。
此管道中的第一个组件将 fashion_ds
的压缩数据文件转换为两个 csv 文件,其中一个文件用于训练,另一个用于评分。 你将使用 Python 函数来定义此组件。
如果按照 Azure 机器学习示例存储库中的示例进行操作,则源文件已在 prep/
文件夹中提供。 此文件夹包含两个用于构造组件的文件:prep_component.py
定义组件,conda.yaml
定义组件的运行时环境。
使用 command_component()
函数作为修饰器,可以轻松定义组件的接口、元数据,以及要从 Python 函数执行的代码。 修饰的每个 Python 函数将转换为管道服务可以处理的单个静态规范 (YAML)。
# Converts MNIST-formatted files at the passed-in input path to training data output path and test data output path
import os
from pathlib import Path
from mldesigner import command_component, Input, Output
@command_component(
name="prep_data",
version="1",
display_name="Prep Data",
description="Convert data to CSV file, and split to training and test data",
environment=dict(
conda_file=Path(__file__).parent / "conda.yaml",
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
),
)
def prepare_data_component(
input_data: Input(type="uri_folder"),
training_data: Output(type="uri_folder"),
test_data: Output(type="uri_folder"),
):
convert(
os.path.join(input_data, "train-images-idx3-ubyte"),
os.path.join(input_data, "train-labels-idx1-ubyte"),
os.path.join(training_data, "mnist_train.csv"),
60000,
)
convert(
os.path.join(input_data, "t10k-images-idx3-ubyte"),
os.path.join(input_data, "t10k-labels-idx1-ubyte"),
os.path.join(test_data, "mnist_test.csv"),
10000,
)
def convert(imgf, labelf, outf, n):
f = open(imgf, "rb")
l = open(labelf, "rb")
o = open(outf, "w")
f.read(16)
l.read(8)
images = []
for i in range(n):
image = [ord(l.read(1))]
for j in range(28 * 28):
image.append(ord(f.read(1)))
images.append(image)
for image in images:
o.write(",".join(str(pix) for pix in image) + "\n")
f.close()
o.close()
l.close()
以上代码使用 @command_component
修饰器定义一个显示名称为 Prep Data
的组件:
name
是组件的唯一标识符。version
是组件的当前版本。 一个组件可以有多个版本。display_name
是组件在 UI 中的友好显示名称,它不是唯一的。description
通常描述此组件可以完成的任务。environment
指定此组件的运行时环境。 此组件的环境指定一个 docker 映像并引用conda.yaml
文件。conda.yaml
文件包含用于组件的所有包,如下所示:
name: imagekeras_prep_conda_env
channels:
- defaults
dependencies:
- python=3.7.11
- pip=20.0
- pip:
- mldesigner==0.1.0b12
prepare_data_component
函数定义input_data
的一个输入,并定义training_data
和test_data
的两个输出。input_data
是输入数据路径。training_data
和test_data
是训练数据和测试数据的输出数据路径。- 此组件将
input_data
中的训练数据 csv 转换为training_data
,将测试数据 csv 转换为test_data
。
下面组件在工作室 UI 中的外观。
- 组件是管道图形中的一个块。
input_data
、training_data
和test_data
是组件的端口,它们连接到其他组件以流式传输数据。
现在,已经为 Prep Data
组件准备了所有源文件。
在本部分,你将创建一个组件用于在 Python 函数中训练图像分类模型,这类似于 Prep Data
组件。
不同之处在于,由于训练逻辑更复杂,你可以将原始训练代码放在单独的 Python 文件中。
此组件的源文件位于 Azure 机器学习示例存储库中的 train/
文件夹下。 此文件夹包含三个用于构造组件的文件:
train.py
:包含用于训练模型的实际逻辑。train_component.py
:定义组件的接口,并在train.py
中导入函数。conda.yaml
:定义组件的运行时环境。
train.py
文件包含一个普通的 Python 函数,该函数执行训练模型逻辑,以训练用于图像分类的 Keras 神经网络。 若要查看代码,请参阅 GitHub 上的 train.py 文件。
成功定义训练函数后,可以使用 Azure 机器学习 SDK v2 中的 @command_component
将函数包装为可在 Azure 机器学习管道中使用的组件。
import os
from pathlib import Path
from mldesigner import command_component, Input, Output
@command_component(
name="train_image_classification_keras",
version="1",
display_name="Train Image Classification Keras",
description="train image classification with keras",
environment=dict(
conda_file=Path(__file__).parent / "conda.yaml",
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
),
)
def keras_train_component(
input_data: Input(type="uri_folder"),
output_model: Output(type="uri_folder"),
epochs=10,
):
# avoid dependency issue, execution logic is in train() func in train.py file
from train import train
train(input_data, output_model, epochs)
以上代码使用 @command_component
定义一个显示名称为 Train Image Classification Keras
的组件:
keras_train_component
函数定义一个输入input_data
(训练数据的来源于其中),一个输入epochs
(指定训练期间的循环),以及一个输出output_model
(在其中输出模型文件)。epochs
的默认值为 10。 此组件的执行逻辑来自上述train.py
中的train()
函数。
train-model 组件的配置比 prep-data 组件稍微复杂一些。 conda.yaml
如下所示:
name: imagekeras_train_conda_env
channels:
- defaults
dependencies:
- python=3.7.11
- pip=20.2
- pip:
- mldesigner==0.1.0b12
- azureml-mlflow==1.50.0
- tensorflow==2.7.0
- numpy==1.21.4
- scikit-learn==1.0.1
- pandas==1.3.4
- matplotlib==3.2.2
- protobuf==3.20.0
现在,已经为 Train Image Classification Keras
组件准备了所有源文件。
创建上述组件后,你将按照本部分所述创建一个组件来通过 Yaml 规范和脚本为已训练的模型评分。
如果按照 Azure 机器学习示例存储库中的示例进行操作,则源文件已在 score/
文件夹中提供。 此文件夹包含三个用于构造组件的文件:
score.py
:包含组件的源代码。score.yaml
:定义组件的接口和其他详细信息。conda.yaml
:定义组件的运行时环境。
score.py
文件包含一个执行训练模型逻辑的普通 Python 函数。
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.utils import to_categorical
from keras.callbacks import Callback
from keras.models import load_model
import argparse
from pathlib import Path
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import mlflow
def get_file(f):
f = Path(f)
if f.is_file():
return f
else:
files = list(f.iterdir())
if len(files) == 1:
return files[0]
else:
raise Exception("********This path contains more than one file*******")
def parse_args():
# setup argparse
parser = argparse.ArgumentParser()
# add arguments
parser.add_argument(
"--input_data", type=str, help="path containing data for scoring"
)
parser.add_argument(
"--input_model", type=str, default="./", help="input path for model"
)
parser.add_argument(
"--output_result", type=str, default="./", help="output path for model"
)
# parse args
args = parser.parse_args()
# return args
return args
def score(input_data, input_model, output_result):
test_file = get_file(input_data)
data_test = pd.read_csv(test_file, header=None)
img_rows, img_cols = 28, 28
input_shape = (img_rows, img_cols, 1)
# Read test data
X_test = np.array(data_test.iloc[:, 1:])
y_test = to_categorical(np.array(data_test.iloc[:, 0]))
X_test = (
X_test.reshape(X_test.shape[0], img_rows, img_cols, 1).astype("float32") / 255
)
# Load model
files = [f for f in os.listdir(input_model) if f.endswith(".h5")]
model = load_model(input_model + "/" + files[0])
# Log metrics of the model
eval = model.evaluate(X_test, y_test, verbose=0)
mlflow.log_metric("Final test loss", eval[0])
print("Test loss:", eval[0])
mlflow.log_metric("Final test accuracy", eval[1])
print("Test accuracy:", eval[1])
# Score model using test data
y_predict = model.predict(X_test)
y_result = np.argmax(y_predict, axis=1)
# Output result
np.savetxt(output_result + "/predict_result.csv", y_result, delimiter=",")
def main(args):
score(args.input_data, args.input_model, args.output_result)
# run script
if __name__ == "__main__":
# parse args
args = parse_args()
# call main function
main(args)
score.py 中的代码采用三个命令行参数:input_data
、input_model
和 output_result
。 程序将使用输入数据为输入模型评分,然后输出评分结果。
本部分介绍如何以有效的 YAML 组件规范格式创建组件规范。 此文件指定以下信息:
- 元数据:名称、display_name、版本、类型等。
- 接口:输入和输出
- 命令、代码和环境:用于运行组件的命令、代码和环境
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
type: command
name: score_image_classification_keras
display_name: Score Image Classification Keras
inputs:
input_data:
type: uri_folder
input_model:
type: uri_folder
outputs:
output_result:
type: uri_folder
code: ./
command: python score.py --input_data ${{inputs.input_data}} --input_model ${{inputs.input_model}} --output_result ${{outputs.output_result}}
environment:
conda_file: ./conda.yaml
image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
name
是组件的唯一标识符。 其显示名称为Score Image Classification Keras
。- 此组件有两个输入和一个输出。
- 其源代码路径在
code
节中定义,在云中运行该组件时,该路径中的所有文件将作为该组件的快照上传。 command
节指定在运行此组件时要执行的命令。environment
节包含一个 docker 映像和一个 conda yaml 文件。 源文件位于示例存储库中。
现已获取 score-model 组件的所有源文件。
对于 Python 函数定义的 prep-data 组件和 train-model 组件,可以像导入普通的 Python 函数那样导入组件。
以下代码分别从 prep
文件夹下的 prep_component.py
文件和 train
文件夹下的 train_component
文件导入 prepare_data_component()
和 keras_train_component()
函数。
%load_ext autoreload
%autoreload 2
# load component function from component python file
from prep.prep_component import prepare_data_component
from train.train_component import keras_train_component
# print hint of components
help(prepare_data_component)
help(keras_train_component)
对于 yaml 定义的评分组件,可以使用 load_component()
函数来加载。
# load component function from yaml
keras_score_component = load_component(path="./score/score.yaml")
现在你已创建并加载了所有用于生成管道的组件和输入数据。 可将它们组合成管道:
备注
若要使用无服务器计算,请将 from azure.ai.ml.entities import ResourceConfiguration
添加到顶部。
然后替换:
- 带
default_compute="serverless",
的default_compute=cpu_compute_target,
- 带
train_node.resources = "ResourceConfiguration(instance_type="Standard_NC6s_v3",instance_count=2)
的train_node.compute = gpu_compute_target
# define a pipeline containing 3 nodes: Prepare data node, train node, and score node
@pipeline(
default_compute=cpu_compute_target,
)
def image_classification_keras_minist_convnet(pipeline_input_data):
"""E2E image classification pipeline with keras using python sdk."""
prepare_data_node = prepare_data_component(input_data=pipeline_input_data)
train_node = keras_train_component(
input_data=prepare_data_node.outputs.training_data
)
train_node.compute = gpu_compute_target
score_node = keras_score_component(
input_data=prepare_data_node.outputs.test_data,
input_model=train_node.outputs.output_model,
)
# create a pipeline
pipeline_job = image_classification_keras_minist_convnet(pipeline_input_data=fashion_ds)
该管道具有默认计算 cpu_compute_target
,这意味着,如果不为特定节点指定计算,则该节点将在默认计算上运行。
该管道具有管道级输入 pipeline_input_data
。 可以在提交管道作业时为管道输入赋值。
该管道包含三个节点:prepare_data_node、train_node 和 score_node。
prepare_data_node
的input_data
使用pipeline_input_data
的值。train_node
的input_data
来自 prepare_data_node 的training_data
输出。score_node 的
input_data
来自 prepare_data_node 的test_data
输出,input_model
来自 train_node 的output_model
。由于
train_node
将训练 CNN 模型,因此你可以将其计算指定为 gpu_compute_target,从而提高训练性能。
构造管道后,接下来可以提交到工作区。 若要提交作业,首先需要连接到工作区。
我们将使用 DefaultAzureCredential
来获取工作区访问权限。 DefaultAzureCredential
应该能够处理大多数 Azure SDK 身份验证方案。
如果此方法不起作用,请参考更多可用凭据:配置凭据示例、azure-identity 参考文档。
try:
credential = DefaultAzureCredential()
# Check if given credential can get token successfully.
credential.get_token("https://management.chinacloudapi.cn/.default")
except Exception as ex:
# Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
credential = InteractiveBrowserCredential()
创建一个 MLClient
对象来管理 Azure 机器学习服务。 如果使用无服务器计算,则无需创建这些计算。
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)
# Retrieve an already attached Azure Machine Learning Compute.
cpu_compute_target = "cpu-cluster"
print(ml_client.compute.get(cpu_compute_target))
gpu_compute_target = "gpu-cluster"
print(ml_client.compute.get(gpu_compute_target))
获取工作区句柄后,接下来可以提交管道作业。
pipeline_job = ml_client.jobs.create_or_update(
pipeline_job, experiment_name="pipeline_samples"
)
pipeline_job
以上代码将此图像分类管道作业提交到名为 pipeline_samples
的试验。 如果该试验不存在,它会自动创建该试验。 pipeline_input_data
使用 fashion_ds
。
对 pipeline_job
的调用会生成如下所示的输出:
对 submit
和 Experiment
的调用很快完成,并生成类似于以下内容的输出:
试验 | 名称 | 类型 | 状态 | 详细信息页 |
---|---|---|---|---|
pipeline_samples | sharp_pipe_4gvqx6h1fb | 管道 | 备餐中 | Azure 机器学习工作室链接。 |
可以通过打开链接来监视管道运行,也可以通过运行以下代码来阻止管道运行,直到管道运行完成:
# wait until the job completes
ml_client.jobs.stream(pipeline_job.name)
重要
首次管道运行需要大约 15 分钟。 必须下载所有依赖项、创建 Docker 映像,并预配和创建 Python 环境。 再次运行管道所花费的时间会大幅减少,因为会重复使用这些资源,而无需再次创建。 但是,管道的总运行时间取决于脚本的工作负荷,以及每个管道步骤中运行的进程数。
可以打开 Link to Azure Machine Learning studio
,即管道的作业详细信息页。 你将看到如下所示的管道图形。
可以通过右键单击每个组件来检查该组件的日志和输出,或者选择组件以打开其详细信息窗格。 若要详细了解如何在 UI 中调试管道,请参阅如何使用调试管道失败。
在上一部分,你已使用三个组件生成了一个管道,以便端到端完成图像分类任务。 还可将组件注册到工作区,以便可以在工作区中共享和重用它们。 下面是注册 prep-data 组件的示例。
try:
# try get back the component
prep = ml_client.components.get(name="prep_data", version="1")
except:
# if not exists, register component using following code
prep = ml_client.components.create_or_update(prepare_data_component)
# list all components registered in workspace
for c in ml_client.components.list():
print(c)
使用 ml_client.components.get()
,可以按名称和版本获取已注册的组件。 使用 ml_client.components.create_or_update()
,可以注册先前从 Python 函数或 yaml 加载的组件。
- 有关如何使用机器学习 SDK 生成管道的更多示例,请参阅示例存储库。
- 有关如何使用工作室 UI 提交和调试管道的信息,请参阅如何在 UI 中使用组件创建管道。
- 有关如何使用 Azure 机器学习 CLI 创建组件和管道的信息,请参阅如何通过 CLI 使用组件创建管道。