Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Note
有关使用 SDK v2 生成管道的教程,请参阅教程:在 Jupyter Notebook 中通过 Python SDK v2 使用用于生产 ML 工作流的 ML 管道。
本教程介绍如何生成 Azure 机器学习管道来准备数据和训练机器学习模型。 机器学习管道可以优化工作流以提高其速度、可移植性和可重用性,使你能够将工作重心放在机器学习上,而不必关注基础结构和自动化。
该示例将训练一个小型 Keras 卷积神经网络,以对 Fashion MNIST 数据集中的图像进行分类。
在本教程中,请完成以下任务:
- 配置工作区
- 创建试验来保存工作
- 预配 ComputeTarget 以执行该工作
- 创建用于存储压缩数据的数据集
- 创建管道步骤以准备要训练的数据
- 定义执行训练的运行时环境
- 创建管道步骤以定义神经网络并执行训练
- 通过管道步骤撰写管道
- 在试验中运行管道
- 查看步骤的输出和经训练的神经网络
- 注册模型供进一步使用
如果没有 Azure 订阅,请在开始前创建一个试用版订阅。 立即尝试试用版订阅。
- 如果还没有 Azure 机器学习工作区,请完成创建帮助入门的资源。
- 已在其中安装
azureml-core
和azureml-pipeline
包的 Python 环境。 此环境用于定义和控制 Azure 机器学习资源,独立于运行时用于训练的环境。
Important
目前,与 azureml-pipeline
兼容的最新 Python 版本是 Python 3.8。 如果在安装 azureml-pipeline
包时遇到困难,请确保 python --version
是兼容版本。 有关说明,请参阅 Python 虚拟环境管理器(venv
、conda
等)的文档。
本教程使用适用于 Azure 机器学习的 Python SDK 创建和控制 Azure 机器学习管道。 本教程假定你将在 Python REPL 环境或 Jupyter 笔记本中以交互方式运行代码片段。
- 本教程基于 Azure 机器学习示例 存储库的
python-sdk/tutorial/using-pipelines
目录中的image-classification.ipynb
笔记本。 步骤本身的源代码位于keras-mnist-fashion
子目录中。
导入本教程所需的所有 Azure 机器学习类型:
import os
import azureml.core
from azureml.core import (
Workspace,
Experiment,
Dataset,
Datastore,
ComputeTarget,
Environment,
ScriptRunConfig
)
from azureml.data import OutputFileDatasetConfig
from azureml.core.compute import AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline
# check core SDK version number
print("Azure Machine Learning SDK Version: ", azureml.core.VERSION)
Azure 机器学习 SDK 版本应为 1.37 或更高版本。 如果不是,请使用 pip install --upgrade azureml-core
进行升级。
从现有的 Azure 机器学习工作区创建工作区对象。
workspace = Workspace.from_config()
创建一个 Experiment
对象来保存管道运行的结果:
exp = Experiment(workspace=workspace, name="keras-mnist-fashion")
创建一个 ComputeTarget
,表示管道将在其上运行的计算机资源。 即使在基于 CPU 的计算机上,本教程中使用的简单神经网络也只需几分钟即可完成训练。 如果要使用 GPU 进行训练,请将 use_gpu
设置为 True
。 预配计算目标通常需要大约五分钟。
use_gpu = False
# choose a name for your cluster
cluster_name = "gpu-cluster" if use_gpu else "cpu-cluster"
found = False
# Check if this compute target already exists in the workspace.
cts = workspace.compute_targets
if cluster_name in cts and cts[cluster_name].type == "AmlCompute":
found = True
print("Found existing compute target.")
compute_target = cts[cluster_name]
if not found:
print("Creating a new compute target...")
compute_config = AmlCompute.provisioning_configuration(
vm_size= "STANDARD_NC6" if use_gpu else "STANDARD_D2_V2"
# vm_priority = 'lowpriority', # optional
max_nodes=4,
)
# Create the cluster.
compute_target = ComputeTarget.create(workspace, cluster_name, compute_config)
# Can poll for a minimum number of nodes and for a specific timeout.
# If no min_node_count is provided, it will use the scale settings for the cluster.
compute_target.wait_for_completion(
show_output=True, min_node_count=None, timeout_in_minutes=10
)
# For a more detailed view of current AmlCompute status, use get_status().print(compute_target.get_status().serialize())
Note
GPU 可用性取决于 Azure 订阅的配额和 Azure 容量。 请参阅管理和增大 Azure 机器学习资源的配额。
Fashion-MNIST 是一个时尚图像数据集,包含 10 个类别。 每张图像都是 28x28 的灰度图像,有 60,000 张训练图像和 10,000 张测试图像。 作为图像分类问题,Fashion-MNIST 比经典 MNIST 手写数字数据库更难。 它以与原始手写数字数据库相同的压缩二进制形式分发。
若要创建引用基于 Web 的数据的 Dataset
,请运行:
data_urls = ["https://data4mldemo6150520719.blob.core.chinacloudapi.cn/demo/mnist-fashion"]
fashion_ds = Dataset.File.from_files(data_urls)
# list the files referenced by fashion_ds
print(fashion_ds.to_path())
此代码将快速完成。 基础数据保留在 data_urls
数组中指定的 Azure 存储资源中。
此管道的第一步是将 fashion_ds
的压缩数据文件转换为你自己的工作区中的数据集,其中包含可供训练使用的 CSV 文件。 向工作区注册后,你的协作者可以访问此数据进行自己的分析、训练等
datastore = workspace.get_default_datastore()
prepared_fashion_ds = OutputFileDatasetConfig(
destination=(datastore, "outputdataset/{run-id}")
).register_on_complete(name="prepared_fashion_ds")
上述代码指定了一个基于管道步骤输出的数据集。 基础已处理文件将放入工作区的默认数据存储的 blob 存储中,位于 destination
中指定的路径。 数据集将在名为 prepared_fashion_ds
的工作区中注册。
到目前为止,执行的代码已创建并控制了 Azure 资源。 现在,可以编写在域中执行第一步的代码。
如果按照 Azure 机器学习示例存储库中的示例进行操作,则源文件已作为 keras-mnist-fashion/prepare.py
提供。
如果是从头开始操作,请创建名为 keras-mnist-fashion/
的子目录。 创建一个新文件,将以下代码添加到其中,并将文件命名为 prepare.py
。
# prepare.py
# Converts MNIST-formatted files at the passed-in input path to a passed-in output path
import os
import sys
# Conversion routine for MNIST binary format
def convert(imgf, labelf, outf, n):
f = open(imgf, "rb")
l = open(labelf, "rb")
o = open(outf, "w")
f.read(16)
l.read(8)
images = []
for i in range(n):
image = [ord(l.read(1))]
for j in range(28 * 28):
image.append(ord(f.read(1)))
images.append(image)
for image in images:
o.write(",".join(str(pix) for pix in image) + "\n")
f.close()
o.close()
l.close()
# The MNIST-formatted source
mounted_input_path = sys.argv[1]
# The output directory at which the outputs will be written
mounted_output_path = sys.argv[2]
# Create the output directory
os.makedirs(mounted_output_path, exist_ok=True)
# Convert the training data
convert(
os.path.join(mounted_input_path, "mnist-fashion/train-images-idx3-ubyte"),
os.path.join(mounted_input_path, "mnist-fashion/train-labels-idx1-ubyte"),
os.path.join(mounted_output_path, "mnist_train.csv"),
60000,
)
# Convert the test data
convert(
os.path.join(mounted_input_path, "mnist-fashion/t10k-images-idx3-ubyte"),
os.path.join(mounted_input_path, "mnist-fashion/t10k-labels-idx1-ubyte"),
os.path.join(mounted_output_path, "mnist_test.csv"),
10000,
)
prepare.py
中的代码采用两个命令行参数:第一个分配给 mounted_input_path
,第二个分配给 mounted_output_path
。 如果该子目录不存在,则调用 os.makedirs
会创建该目录。 然后,程序将转换训练和测试数据,并将逗号分隔的文件输出到 mounted_output_path
。
返回用于指定管道的 Python 环境,运行以下代码为准备代码创建 PythonScriptStep
:
script_folder = "./keras-mnist-fashion"
prep_step = PythonScriptStep(
name="prepare step",
script_name="prepare.py",
# On the compute target, mount fashion_ds dataset as input, prepared_fashion_ds as output
arguments=[fashion_ds.as_named_input("fashion_ds").as_mount(), prepared_fashion_ds],
source_directory=script_folder,
compute_target=compute_target,
allow_reuse=True,
)
对 PythonScriptStep
的调用指定在运行管道步骤时:
script_folder
目录中的所有文件都上传到compute_target
- 在这些上传的源文件中,将运行文件
prepare.py
fashion_ds
和prepared_fashion_ds
数据集将装载在compute_target
上,并显示为目录fashion_ds
文件的路径将是prepare.py
的第一个参数。 在prepare.py
中,此参数分配给mounted_input_path
prepared_fashion_ds
的路径将是prepare.py
的第二个参数。 在prepare.py
中,此参数分配给mounted_output_path
- 因为
allow_reuse
是True
,所以在其源文件或输入更改之前,它不会重新运行 - 此
PythonScriptStep
将被命名为prepare step
模块化和重用是管道的主要优势。 Azure 机器学习可自动确定源代码或数据集更改。 如果 allow_reuse
为 True
,则将重用不受影响的步骤的输出,而不会再次重新运行这些步骤。 如果某个步骤依赖于 Azure 机器学习外部可能发生变化的数据源(例如,包含销售数据的 URL),请将 allow_reuse
设置为 False
,在每次运行管道时都运行管道步骤。
数据从压缩格式转换为 CSV 文件后,可用于训练卷积神经网络。
使用较大的管道时,最佳做法是将每个步骤的源代码放在单独的目录(src/prepare/
、src/train/
等)中,但对于本教程,只需在同一 keras-mnist-fashion/
源目录中使用或创建文件 train.py
。
import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.layers.normalization import BatchNormalization
from keras.utils import to_categorical
from keras.callbacks import Callback
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from azureml.core import Run
# dataset object from the run
run = Run.get_context()
dataset = run.input_datasets["prepared_fashion_ds"]
# split dataset into train and test set
(train_dataset, test_dataset) = dataset.random_split(percentage=0.8, seed=111)
# load dataset into pandas dataframe
data_train = train_dataset.to_pandas_dataframe()
data_test = test_dataset.to_pandas_dataframe()
img_rows, img_cols = 28, 28
input_shape = (img_rows, img_cols, 1)
X = np.array(data_train.iloc[:, 1:])
y = to_categorical(np.array(data_train.iloc[:, 0]))
# here we split validation data to optimiza classifier during training
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=13)
# test data
X_test = np.array(data_test.iloc[:, 1:])
y_test = to_categorical(np.array(data_test.iloc[:, 0]))
X_train = (
X_train.reshape(X_train.shape[0], img_rows, img_cols, 1).astype("float32") / 255
)
X_test = X_test.reshape(X_test.shape[0], img_rows, img_cols, 1).astype("float32") / 255
X_val = X_val.reshape(X_val.shape[0], img_rows, img_cols, 1).astype("float32") / 255
batch_size = 256
num_classes = 10
epochs = 10
# construct neuron network
model = Sequential()
model.add(
Conv2D(
32,
kernel_size=(3, 3),
activation="relu",
kernel_initializer="he_normal",
input_shape=input_shape,
)
)
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))
model.add(Conv2D(64, (3, 3), activation="relu"))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Conv2D(128, (3, 3), activation="relu"))
model.add(Dropout(0.4))
model.add(Flatten())
model.add(Dense(128, activation="relu"))
model.add(Dropout(0.3))
model.add(Dense(num_classes, activation="softmax"))
model.compile(
loss=keras.losses.categorical_crossentropy,
optimizer=keras.optimizers.Adam(),
metrics=["accuracy"],
)
# start an Azure ML run
run = Run.get_context()
class LogRunMetrics(Callback):
# callback at the end of every epoch
def on_epoch_end(self, epoch, log):
# log a value repeated which creates a list
run.log("Loss", log["loss"])
run.log("Accuracy", log["accuracy"])
history = model.fit(
X_train,
y_train,
batch_size=batch_size,
epochs=epochs,
verbose=1,
validation_data=(X_val, y_val),
callbacks=[LogRunMetrics()],
)
score = model.evaluate(X_test, y_test, verbose=0)
# log a single value
run.log("Final test loss", score[0])
print("Test loss:", score[0])
run.log("Final test accuracy", score[1])
print("Test accuracy:", score[1])
plt.figure(figsize=(6, 3))
plt.title("Fashion MNIST with Keras ({} epochs)".format(epochs), fontsize=14)
plt.plot(history.history["accuracy"], "b-", label="Accuracy", lw=4, alpha=0.5)
plt.plot(history.history["loss"], "r--", label="Loss", lw=4, alpha=0.5)
plt.legend(fontsize=12)
plt.grid(True)
# log an image
run.log_image("Loss v.s. Accuracy", plot=plt)
# create a ./outputs/model folder in the compute target
# files saved in the "./outputs" folder are automatically uploaded into run history
os.makedirs("./outputs/model", exist_ok=True)
# serialize NN architecture to JSON
model_json = model.to_json()
# save model JSON
with open("./outputs/model/model.json", "w") as f:
f.write(model_json)
# save model weights
model.save_weights("./outputs/model/model.h5")
print("model saved in ./outputs/model folder")
ML 开发人员应熟悉这些代码的大部分内容:
- 数据已分区为用于训练的训练集和验证集,以及用于最终评分的单独测试子集
- 输入形状为 28x28x1(仅为 1,因为输入是灰度),一个批中将包含 256 个输入,共有 10 个类
- 训练循环数为 10
- 该模型有三个卷积层,包括最大池化和随机失活,后跟全连接层和 softmax 头
- 该模型适合 10 个循环,然后进行评估
- 模型体系结构写入
outputs/model/model.json
,权重写入outputs/model/model.h5
不过,某些代码特定于 Azure 机器学习。 run = Run.get_context()
检索包含当前服务上下文的 Run
对象。 train.py
源使用此 run
对象通过其名称检索输入数据集(替代 prepare.py
中通过脚本参数数组 argv
检索数据集的代码)。
run
对象还用于在每个循环结束时记录训练进度,并在训练结束时记录损失和准确度随时间变化的图表。
训练步骤的配置比准备步骤稍微复杂一些。 准备步骤仅使用标准 Python 库。 更常见的是,需要修改运行源代码的运行时环境。
创建具有以下内容的文件 conda_dependencies.yml
:
dependencies:
- python=3.7
- pip:
- azureml-core
- azureml-dataset-runtime
- keras==2.4.3
- tensorflow==2.4.3
- numpy
- scikit-learn
- pandas
- matplotlib
Environment
类表示运行机器学习任务的运行时环境。 将上述规范与训练代码相关联:
keras_env = Environment.from_conda_specification(
name="keras-env", file_path="./conda_dependencies.yml"
)
train_cfg = ScriptRunConfig(
source_directory=script_folder,
script="train.py",
compute_target=compute_target,
environment=keras_env,
)
创建训练步骤本身使用的代码类似于用于创建准备步骤的代码:
train_step = PythonScriptStep(
name="train step",
arguments=[
prepared_fashion_ds.read_delimited_files().as_input(name="prepared_fashion_ds")
],
source_directory=train_cfg.source_directory,
script_name=train_cfg.script,
runconfig=train_cfg.run_config,
)
现在,你已指定数据输入和输出并创建了管道的步骤,可以将它们组合到管道中并运行管道:
pipeline = Pipeline(workspace, steps=[prep_step, train_step])
run = exp.submit(pipeline)
你创建的 Pipeline
对象在 workspace
中运行,由指定的准备和训练步骤组成。
Note
此管道有一个简单的依赖项关系图:训练步骤依赖于准备步骤,准备步骤依赖于 fashion_ds
数据集。 生产管道通常具有更复杂的依赖项。 步骤可能依赖于多个上游步骤,早期步骤中的源代码更改可能会产生深远的影响,等等。 Azure 机器学习会为你跟踪这些问题。 你只需传入 steps
数组,Azure 机器学习会负责计算执行图。
对 submit
和 Experiment
的调用很快完成,并生成类似于以下内容的输出:
Submitted PipelineRun 5968530a-abcd-1234-9cc1-46168951b5eb
Link to Azure Machine Learning Portal: https://studio.ml.azure.cn/runs/abc-xyz...
可以通过打开链接来监视管道运行,也可以通过运行以下代码来阻止管道运行,直到管道运行完成:
run.wait_for_completion(show_output=True)
Important
首次管道运行需要大约 15 分钟。 必须下载所有依赖项、创建 Docker 映像,并预配和创建 Python 环境。 再次运行管道所花费的时间会大幅减少,因为会重复使用这些资源,而无需再次创建。 但是,管道的总运行时间取决于脚本的工作负荷,以及每个管道步骤中运行的进程数。
管道完成后,可以检索在训练步骤中记录的指标:
run.find_step_run("train step")[0].get_metrics()
如果对指标感到满意,可以在工作区中注册模型:
run.find_step_run("train step")[0].register_model(
model_name="keras-model",
model_path="outputs/model/",
datasets=[("train test data", fashion_ds)],
)
如果你打算运行其他 Azure 机器学习教程,请不要完成本部分。
如果使用了计算实例,请在不使用 VM 时将其停止,以降低成本。
在工作区中选择“计算”。
从列表中选择计算实例的名称。
选择“停止” 。
准备好再次使用服务器时,选择“启动” 。
如果不打算使用已创建的资源,请删除它们,以免产生任何费用:
- 在 Azure 门户的左侧菜单中选择“资源组”。
- 在资源组列表中,选择创建的资源组。
- 选择“删除资源组”。
- 输入资源组名称。 然后选择“删除”。
还可保留资源组,但请删除单个工作区。 显示工作区属性,然后选择“删除”。
在本教程中,你使用了以下类型:
Workspace
代表你的 Azure 机器学习工作区。 其中包含:- 包含管道训练运行结果的
Experiment
- 延迟加载 Fashion-MNIST 数据存储中保存的数据的
Dataset
- 表示运行管道步骤的计算机的
ComputeTarget
- 运行管道步骤的运行时环境
Environment
- 将
PythonScriptStep
步骤组合成一个整体的Pipeline
- 对训练过程满意后注册的
Model
- 包含管道训练运行结果的
Workspace
对象包含对本教程中未使用的其他资源(笔记本、终结点等)的引用。 有关详细信息,请参阅什么是 Azure 机器学习工作区?。
OutputFileDatasetConfig
将运行的输出提升为基于文件的数据集。 有关数据集和处理数据的详细信息,请参阅如何访问数据。
有关计算目标和环境的详细信息,请参阅什么是 Azure 机器学习中的计算目标?和什么是 Azure 机器学习环境?
ScriptRunConfig
将 ComputeTarget
和 Environment
与 Python 源文件相关联。 PythonScriptStep
采用该 ScriptRunConfig
并定义其输入和输出,在此管道中这是由 OutputFileDatasetConfig
生成的文件数据集。
有关如何使用机器学习 SDK 生成管道的更多示例,请参阅示例存储库。