在批量部署中自定义输出

适用范围:Azure CLI ml 扩展 v2(最新版)Python SDK azure-ai-ml v2(最新版)

本指南介绍如何创建生成自定义输出和文件的部署。 有时,需要更严格地控制作为批处理推理作业的输出写入的内容。 这些情况包括:

  • 需要控制如何在输出中写入预测。 例如,需要将预测追加到原始数据(如果数据为表格)。
  • 需要以不同于批处理部署现成支持的格式的文件格式编写预测。
  • 模型是一种生成模型,无法以表格格式写入输出。 例如,生成映像作为输出的模型。
  • 模型生成多个表格文件,而不是单个文件。 例如,通过考虑多个方案来执行预测的模型。

批处理部署使你可以通过直接写入批处理部署作业的输出来控制作业的输出。 本教程介绍如何部署模型来执行批处理推理,并通过将预测追加到原始输入数据来以 parquet 格式写入输出

关于此示例

此示例演示如何部署模型来执行批处理推理,并自定义预测在输出中的写入方式。 该模型基于 UCI 心脏病数据集。 数据库包含 76 个属性,但本例使用其中的 14 个。 该模型尝试预测患者是否存在心脏疾病。 它是从 0(不存在)到 1(存在)的整数值。

该模型已使用 XGBBoost 分类器进行训练,所有必需的预处理都打包为 scikit-learn 管道,使此模型成为从原始数据到预测的端到端管道。

本文中的示例基于 azureml-examples 存储库中包含的代码示例。 要在无需复制/粘贴 YAML 和其他文件的情况下在本地运行命令,请先克隆存储库,然后将目录更改为以下文件夹:

git clone https://github.com/Azure/azureml-examples --depth 1
cd azureml-examples/cli

此示例的文件位于以下位置:

cd endpoints/batch/deploy-models/custom-outputs-parquet

在 Jupyter 笔记本中跟进操作

可以使用 Jupyter 笔记本来按照此示例操作。 在克隆的存储库中,打开笔记本:custom-output-batch.ipynb

先决条件

在按照本文中的步骤操作之前,请确保满足以下先决条件:

  • Azure 订阅。 如果没有 Azure 订阅,可在开始前创建一个试用帐户。 试用 Azure 机器学习

  • 一个 Azure 机器学习工作区。 如果你没有,请按照管理 Azure 机器学习工作区一文中的步骤创建一个。

  • 确保你在工作区中具有以下权限:

    • 创建或管理批处理终结点和部署:使用允许 Microsoft.MachineLearningServices/workspaces/batchEndpoints/* 的所有者、参与者或自定义角色。

    • 在工作区资源组中创建 ARM 部署:在部署工作区的资源组中使用允许 Microsoft.Resources/deployments/write 的所有者角色、参与者角色或自定义角色。

  • 需要安装以下软件才能使用 Azure 机器学习:

    Azure CLIml适用于 Azure 机器学习的扩展

    az extension add -n ml
    

    注意

    Azure CLI 的 ml 扩展版本 2.7 中引入了 Batch 终结点的管道组件部署。 使用 az extension update --name ml 获取它的上一个版本。

连接到工作区

工作区是 Azure 机器学习的顶级资源,为使用 Azure 机器学习时创建的所有项目提供了一个集中的处理位置。 在本部分,我们将连接到要在其中执行部署任务的工作区。

在以下代码中传入订阅 ID、工作区、位置和资源组的值:

az account set --subscription <subscription>
az configure --defaults workspace=<workspace> group=<resource-group> location=<location>

使用自定义输出创建批处理部署

在此示例中,创建一个可直接写入批处理部署作业的输出文件夹的部署。 部署使用此功能来编写自定义 parquet 文件。

注册模型

只能使用批处理终结点部署已注册的模型。 在本例中,你已在存储库中拥有模型的本地副本,因此只需要将模型发布到工作区中的注册表。 如果打算部署的模型已注册,则可以跳过此步骤。

set -e

# <set_variables>
export ENDPOINT_NAME="<YOUR_ENDPOINT_NAME>"
# </set_variables>

# <name_endpoint>
ENDPOINT_NAME="heart-classifier-custom"
# </name_endpoint>

# The following code ensures the created deployment has a unique name
ENDPOINT_SUFIX=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w ${1:-5} | head -n 1)
ENDPOINT_NAME="$ENDPOINT_NAME-$ENDPOINT_SUFIX"

# <register_model>
MODEL_NAME='heart-classifier-sklpipe'
az ml model create --name $MODEL_NAME --type "custom_model" --path "model"
# </register_model>

echo "Creating compute"
# <create_compute>
az ml compute create -n batch-cluster --type amlcompute --min-instances 0 --max-instances 5
# </create_compute>

echo "Creating batch endpoint $ENDPOINT_NAME"
# <create_endpoint>
az ml batch-endpoint create -n $ENDPOINT_NAME -f endpoint.yml
# </create_endpoint>

echo "Showing details of the batch endpoint"
# <query_endpoint>
az ml batch-endpoint show --name $ENDPOINT_NAME
# </query_endpoint>

echo "Creating batch deployment $DEPLOYMENT_NAME for endpoint $ENDPOINT_NAME"
# <create_deployment>
az ml batch-deployment create --file deployment.yml --endpoint-name $ENDPOINT_NAME --set-default
# </create_deployment>

echo "Update the batch deployment as default for the endpoint"
# <set_default_deployment>
DEPLOYMENT_NAME="classifier-xgboost-custom"
az ml batch-endpoint update --name $ENDPOINT_NAME --set defaults.deployment_name=$DEPLOYMENT_NAME
# </set_default_deployment>

echo "Showing details of the batch deployment"
# <query_deployment>
az ml batch-deployment show --name $DEPLOYMENT_NAME --endpoint-name $ENDPOINT_NAME
# </query_deployment>

echo "Invoking batch endpoint"
# <start_batch_scoring_job>
JOB_NAME = $(az ml batch-endpoint invoke --name $ENDPOINT_NAME --input https://azuremlexampledata.blob.core.chinacloudapi.cn/data/heart-disease-uci/data --query name -o tsv)
# </start_batch_scoring_job>

echo "Showing job detail"
# <show_job_in_studio>
az ml job show -n $JOB_NAME --web
# </show_job_in_studio>

echo "List all jobs under the batch deployment"
# <list_all_jobs>
az ml batch-deployment list-jobs --name $DEPLOYMENT_NAME --endpoint-name $ENDPOINT_NAME --query [].name
# </list_all_jobs>

echo "Stream job logs to console"
# <stream_job_logs>
az ml job stream -n $JOB_NAME
# </stream_job_logs>

# <check_job_status>
STATUS=$(az ml job show -n $JOB_NAME --query status -o tsv)
echo $STATUS
if [[ $STATUS == "Completed" ]]
then
  echo "Job completed"
elif [[ $STATUS ==  "Failed" ]]
then
  echo "Job failed"
  exit 1
else 
  echo "Job status not failed or completed"
  exit 2
fi
# </check_job_status>

echo "Download scores to local path"
# <download_outputs>
az ml job download --name $JOB_NAME --output-name score --download-path ./
# </download_outputs>

echo "Delete resources"
# <delete_endpoint>
az ml batch-endpoint delete --name $ENDPOINT_NAME --yes
# </delete_endpoint>

创建评分脚本

你需要创建评分脚本,该脚本应能读取批处理部署提供的输入数据并返回模型的分数。 你还将直接写入作业的输出文件夹。 总之,建议的评分脚本如下所示:

  1. 将输入数据作为 CSV 文件读取。
  2. 通过输入数据运行 MLflow 模型 predict 函数。
  3. 将预测以及输入数据追加到 pandas.DataFrame
  4. 在名为输入文件的文件中写入数据,但采用 parquet 格式。

code/batch_driver.py

import os
import pickle
import glob
import pandas as pd
from pathlib import Path
from typing import List


def init():
    global model
    global output_path

    # AZUREML_MODEL_DIR is an environment variable created during deployment
    # It is the path to the model folder
    # Please provide your model's folder name if there's one:
    output_path = os.environ["AZUREML_BI_OUTPUT_PATH"]
    model_path = os.environ["AZUREML_MODEL_DIR"]
    model_file = glob.glob(f"{model_path}/*/*.pkl")[-1]

    with open(model_file, "rb") as file:
        model = pickle.load(file)


def run(mini_batch: List[str]):
    for file_path in mini_batch:
        data = pd.read_csv(file_path)
        pred = model.predict(data)

        data["prediction"] = pred

        output_file_name = Path(file_path).stem
        output_file_path = os.path.join(output_path, output_file_name + ".parquet")
        data.to_parquet(output_file_path)

    return mini_batch

备注:

  • 请注意,环境变量 AZUREML_BI_OUTPUT_PATH 用于访问部署作业的输出路径。
  • init() 函数将填充一个名为 output_path 的全局变量,该变量稍后可用于确定写入位置。
  • run 方法返回已处理文件的列表。 run 函数需要该方法才能返回 listpandas.DataFrame 对象。

警告

请注意,所有批处理执行程序都同时具有对此路径的写入访问权限。 这意味着你需要考虑并发性。 在这种情况下,请确保每个执行程序使用输入文件名作为输出文件夹的名称来写入自己的文件。

创建终结点

接下来,创建一个名为 heart-classifier-batch 的批处理终结点来部署模型。

  1. 确定终结点的名称。 终结点名称显示在与终结点关联的 URI 中,因此,批处理终结点名称在 Azure 区域中必须是唯一的。 例如,westus2 中只能有一个名为 mybatchendpoint 的批处理终结点。

    在本例中,将终结点名称放在一个变量中,以便稍后可以轻松引用它。

     ENDPOINT_NAME="heart-classifier-custom"
    
  2. 配置批处理终结点。

    以下 YAML 文件定义了批处理终结点:

    endpoint.yml

     $schema: https://azuremlschemas.azureedge.net/latest/batchEndpoint.schema.json
     name: heart-classifier-batch
     description: A heart condition classifier for batch inference
     auth_mode: aad_token
    
  3. 创建终结点:

    az ml batch-endpoint create -n $ENDPOINT_NAME -f endpoint.yml
    

创建部署

按照后续步骤使用以前的评分脚本创建部署:

  1. 首先,创建一个可以执行评分脚本的环境:

    Azure 机器学习 CLI 无需执行额外步骤。 环境定义将包含在部署文件中。

    environment:
      name: batch-mlflow-xgboost
      image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest
      conda_file: environment/conda.yaml
    
  2. 创建部署。 请注意,output_action 现在设置为 SUMMARY_ONLY.

    注意

    本示例假设你有一个名为 batch-cluster 的计算群集。 可自行更改此名称。

    如需在创建的终结点下创建新部署,请创建如下所示的 YAML 配置。 可以检查额外属性中的完整批处理终结点 YAML 机构

    $schema: https://azuremlschemas.azureedge.net/latest/modelBatchDeployment.schema.json
    endpoint_name: heart-classifier-batch
    name: classifier-xgboost-custom
    description: A heart condition classifier based on XGBoost and Scikit-Learn pipelines that append predictions on parquet files.
    type: model
    model: azureml:heart-classifier-sklpipe@latest
    environment:
      name: batch-mlflow-xgboost
      image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest
      conda_file: environment/conda.yaml
    code_configuration:
      code: code
      scoring_script: batch_driver.py
    compute: azureml:batch-cluster
    resources:
      instance_count: 2
    settings:
      max_concurrency_per_instance: 2
      mini_batch_size: 2
      output_action: summary_only
      retry_settings:
        max_retries: 3
        timeout: 300
      error_threshold: -1
      logging_level: info
    

    然后,用以下命令创建部署:

    DEPLOYMENT_NAME="classifier-xgboost-parquet"
    az ml batch-deployment create -f endpoint.yml
    
  3. 此时,我们的批处理终结点随时可用。

测试部署

若要测试终结点,请使用此存储库中的未标记数据示例,此示例可与模型一起使用。 批处理终结点只能处理云中可从 Azure 机器学习工作区访问的数据。 在该示例中,将其上传到 Azure 机器学习数据存储中。 你将创建可用于调用终结点进行评分的数据资产。 但是,请注意,批处理终结点接受可存放在多个位置的数据。

  1. 使用存储帐户中的数据调用终结点:

    JOB_NAME = $(az ml batch-endpoint invoke --name $ENDPOINT_NAME --deployment-name $DEPLOYMENT_NAME --input azureml:heart-dataset-unlabeled@latest | jq -r '.name')
    

    注意

    并不是每次安装时都会安装实用工具 jq。 可以在 GitHub 上获取说明

  2. 命令返回后立即启动批处理作业。 在作业完成前可监视作业状态:

    az ml job show --name $JOB_NAME
    

分析输出

作业生成所有生成的文件所在的命名输出(名为 score)。 由于你是直接写入到目录中,每个输入文件一个文件,因此将获得相同数量的文件。 在此特定示例中,将输出文件命名为与输入相同的名称,但它们具有 parquet 扩展名。

注意

请注意,predictions.csv 文件还包含在输出文件夹中。 此文件包含已处理文件的摘要。

可以使用作业名称下载作业的结果:

如需下载预测,请使用以下命令:

az ml job download --name $JOB_NAME --output-name score --download-path ./

下载文件后,可以使用你喜欢的工具将其打开。 以下示例使用 Pandas 数据帧加载预测。

import pandas as pd
import glob

output_files = glob.glob("named-outputs/score/*.parquet")
score = pd.concat((pd.read_parquet(f) for f in output_files))

输出如下所示:

age sex ... thal prediction
63 1 ... fixed 0
67 1 ... 一般 1
67 1 ... reversible 0
37 1 ... 一般 0

清理资源

运行以下代码以删除批处理终结点和所有基础部署。 系统不会删除批处理评分作业。

az ml batch-endpoint delete --name $ENDPOINT_NAME --yes