Compartir a través de

在 Spark 作业中部署和运行 MLflow 模型

本文介绍如何在 Spark 作业中部署和运行 MLflow 模型,以针对大量数据或在数据整理过程中执行推理。

关于此示例

此示例演示如何将 Azure 机器学习中注册的 MLflow 模型部署到托管 Spark 群集(预览)中运行的 Spark 作业、Azure Databricks 或 Azure Synapse Analytics,以便对大量数据执行推理。

该模型基于 UCI 心脏病数据集。 数据库包含 76 个属性,但我们使用其中 14 个。 该模型尝试预测患者是否存在心脏疾病。 它是从 0(不存在)到 1(存在)的整数值。 它已使用 XGBBoost 分类器进行训练,所有必需的预处理都打包为 scikit-learn 管道,使此模型成为从原始数据到预测的端到端管道。

本文中的信息基于 azureml-examples 存储库中包含的代码示例。 若要在不复制/粘贴文件的情况下在本地运行命令,请克隆存储库,然后将目录更改为 sdk/using-mlflow/deploy

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

先决条件

在按照本文中的步骤操作之前,请确保满足以下先决条件:

  • 如下所示,安装 MLflow SDK mlflow 包和适用于 MLflow 的 Azure 机器学习 azureml-mlflow 插件。

    pip install mlflow azureml-mlflow
    

    提示

    可以使用 mlflow-skinny 包,它是一个不带 SQL 存储、服务器、UI 或数据科学依赖项的轻型 MLflow 包。 对于主要需要 MLflow 的跟踪和记录功能但不需要导入整个功能套件(包括部署)的用户,建议使用此包。

  • 创建 Azure 机器学习工作区。 若要创建工作区,请参阅创建入门所需的资源。 查看在工作区中执行 MLflow 操作所需的访问权限

  • 若要执行远程跟踪,或者跟踪在 Azure 机器学习外部运行的试验,请将 MLflow 配置为指向 Azure 机器学习工作区的跟踪 URI。 有关如何将 MLflow 连接到工作区的详细信息,请参阅为 Azure 机器学习配置 MLflow

  • 必须在工作区中注册 MLflow 模型。 具体而言,此示例将注册针对糖尿病数据集训练的模型。

连接到工作区

首先,我们连接到要在其中注册模型的 Azure 机器学习工作区。

已为你配置跟踪。 使用 MLflow 时,还会使用你的默认凭据。

注册模型

我们需要在 Azure 机器学习注册表中注册的模型来执行推理。 在这种情况下,我们已在存储库中拥有模型的本地副本,因此我们只需要将模型发布到工作区中的注册表。 如果打算部署的模型已注册,则可以跳过此步骤。

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

或者,如果模型是在运行中记录的,则你可以直接注册它。

提示

若要注册模型,需要知道模型的存储位置。 如果使用的是 MLflow 的 autolog 功能,则路径将取决于所使用模型的类型和框架。 建议检查作业输出以确定该文件夹的名称。 可以查找包含名为 MLModel 的文件的文件夹。 如果使用 log_model 手动记录模型,则路径是传递给此类方法的参数。 例如,如果使用 mlflow.sklearn.log_model(my_model, "classifier") 记录模型,则存储模型的路径为 classifier

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

注意

路径 MODEL_PATH 是模型在运行中存储的位置。


获取要评分的输入数据

我们需要一些用于运行作业的输入数据。 在本示例中,我们将从 Internet 下载示例数据,并将其放置在 Spark 群集使用的共享存储中。

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.chinacloudapi.cn/data/heart-disease-uci/data/heart.csv", "/tmp/data")

将数据移动到可供整个群集使用的已装载存储帐户。

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

重要

前面的代码使用 dbutils,这是 Azure Databricks 群集中可用的工具。 根据所使用的平台使用适当的工具。

然后,输入数据放置在以下文件夹中:

input_data_path = "dbfs:/data"

在 Spark 群集中运行模型

以下部分介绍如何在 Spark 作业中运行在 Azure 机器学习中注册的 MLflow 模型。

  1. 确保在群集中安装以下库:
  - mlflow<3,>=2.1
  - cloudpickle==2.2.0
  - scikit-learn==1.2.0
  - xgboost==1.7.2
  1. 我们将使用笔记本来演示如何使用 Azure 机器学习中注册的 MLflow 模型来创建评分例程。 创建笔记本并使用 PySpark 作为默认语言。

  2. 导入所需的命名空间:

    import mlflow
    import pyspark.sql.functions as f
    
  3. 配置模型 URI。 以下 URI 在其最新版本中引入名为 heart-classifier 的模型。

    model_uri = "models:/heart-classifier/latest"
    
  4. 将模型加载为 UDF 函数。 用户定义函数 (UDF) 是由用户定义的函数,用于在用户环境中重复使用自定义逻辑。

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    提示

    使用参数 result_type 控制 predict() 函数返回的类型。

  5. 读取要评分的数据:

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    在本示例中,输入数据采用 CSV 格式并放置在 dbfs:/data/ 文件夹中。 我们还要删除 target 列,因为此数据集包含要预测的目标变量。 在生产方案中,数据不包含此列。

  6. 运行函数 predict_function,并将预测结果放在新列中。 在本示例中,我们要将预测放在 predictions 列中。

    df.withColumn("predictions", score_function(*df.columns))
    

    提示

    predict_function 会接收所需的列作为参数。 在本示例中,数据帧的所有列都是模型预期,因此使用了 df.columns。 如果模型需要列的子集,可以手动将其引入。 如果模型具有签名,则类型需要在输入与预期类型之间兼容。

  7. 可以将预测写回到存储:

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

在 Azure 机器学习的独立 Spark 作业中运行模型

Azure 机器学习支持创建独立的 Spark 作业,以及创建可在 Azure 机器学习管道中使用的可重用 Spark 组件。 在本例中,我们将部署评分作业(在 Azure 机器学习独立 Spark 作业中运行并运行 MLflow 模型)来执行推理。

注意

若要详细了解 Azure 机器学习中的 Spark 作业,请参阅在 Azure 机器学习中提交 Spark 作业(预览版)

  1. Spark 作业需要采用参数的 Python 脚本。 创建评分脚本:

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    上述脚本采用 --model--input_data--scored_data 这三个参数。 前两个是输入,表示要运行的模型和输入数据,最后一个是输出,这是用于放置预测的输出文件夹。

    提示

    Python 包安装:前面的评分脚本将 MLflow 模型加载到 UDF 函数中,但它指示参数 env_manager="conda"。 设置此参数后,MLflow 将在仅有 UDF 函数运行的隔离环境中还原模型定义中指定的所需包。 有关更多详细信息,请参阅 mlflow.pyfunc.spark_udf 文档。

  2. 创建作业定义:

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.chinacloudapi.cn/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    提示

    若要使用附加的 Synapse Spark 池,请在上面所示的示例 YAML 规范文件中定义 compute 属性,而不是 resources 属性。

  3. 上面显示的 YAML 文件可以在带有 --file 参数的 az ml job create 命令中使用,以创建独立的 Spark 作业,如下所示:

    az ml job create -f mlflow-score-spark-job.yml
    

后续步骤