在 Spark 作业中部署和运行 MLflow 模型

本文介绍如何在 Spark 作业中部署和运行 MLflow 模型,以对大量数据执行推理或作为数据整理作业的一部分。

关于此示例

此示例演示如何将 Azure 机器学习中注册的 MLflow 模型部署到 在 Azure 机器学习无服务器 Spark 计算、Azure Databricks 或 Azure Synapse Analytics 中运行的 Spark 作业,以对大量数据执行推理。

该模型基于 UCI 心脏病数据集。 数据库包含 76 个属性,但本例使用其中的 14 个。 该模型尝试预测患者是否存在心脏疾病。 它是从 0(不存在)到 1(存在)的整数值。 该模型使用 XGBBoost 分类器进行训练,所有必需的预处理都打包为 scikit-learn 管道,使此模型成为从原始数据到预测的端到端管道。

本文中的信息基于 azureml-examples 存储库中包含的代码示例。 若要在本地运行命令,而无需复制和粘贴文件,请克隆存储库,然后将目录 sdk/using-mlflow/deploy更改为 。

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

先决条件

在按照本文中的步骤操作之前,请确保满足以下先决条件:

  • 安装 MLflow SDK mlflow 包和适用于 MLflow 的 Azure 机器学习 azureml-mlflow 插件:

    pip install mlflow azureml-mlflow
    

    提示

    可以使用 mlflow-skinny 包,它是一个不带 SQL 存储、服务器、UI 或数据科学依赖项的轻型 MLflow 包。 对于主要需要 MLflow 跟踪和日志记录功能但不是完整的功能套件(包括部署)的用户,我们建议使用此包。

  • 创建 Azure 机器学习工作区。 若要创建工作区,请参阅创建入门所需的资源。 查看在工作区中执行 MLflow 操作所需的 访问权限

  • 若要执行远程跟踪(即跟踪在 Azure 机器学习以外运行的试验),请将 MLflow 配置为指向 Azure 机器学习工作区的跟踪 URI。 有关如何将 MLflow 连接到工作区的详细信息,请参阅为 Azure 机器学习配置 MLflow

  • 您必须在工作区中拥有一个已注册的 MLflow 模型。 特别是,此示例注册了一个为糖尿病数据集训练的模型。

连接到工作区

首先,连接到注册模型的 Azure 机器学习工作区。

已为您设置好跟踪功能。 使用 MLflow 时,也会使用默认凭据。

注册模型

若要执行推理,需要在 Azure 机器学习注册表中注册模型。 在本例中,你已在存储库中拥有模型的本地副本,因此只需要将模型发布到工作区中的注册表。 如果想要部署的模型已注册,则可以跳过此步骤。

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

或者,如果在运行中记录了模型,可以直接注册它。

提示

若要注册模型,需要知道模型存储的位置。 如果使用 autolog MLflow 的功能,路径取决于所使用的模型的类型和框架。 检查作业的输出以标识此文件夹的名称。 查找包含文件名为MLModel的文件的文件夹。 如果要使用 log_model手动记录模型,则路径是传递给此方法的参数。 例如,如果使用mlflow.sklearn.log_model(my_model, "classifier")记录模型,则模型存储路径为classifier

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs:/{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

注意

路径 MODEL_PATH 是模型存储在运行中的位置。


获取用于评分的输入数据

需要一些输入数据才能运行这些作业。 在此示例中,将从 Internet 下载示例数据并将其放置在 Spark 群集使用的共享存储中。

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.chinacloudapi.cn/data/heart-disease-uci/data/heart.csv", "/tmp/data")

将数据移动到可供整个群集使用的已装载存储帐户。

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

重要

前面的代码使用 dbutils,这是 Azure Databricks 群集中可用的工具。 根据所使用的平台使用适当的工具。

然后,输入数据放置在以下文件夹中:

input_data_path = "dbfs:/data"

在 Spark 群集中运行模型

以下部分介绍如何在 Spark 作业中运行在 Azure 机器学习中注册的 MLflow 模型。

  1. 确保群集已安装以下库:
  - mlflow<3,>=2.1
  - cloudpickle==2.2.0
  - scikit-learn==1.2.0
  - xgboost==1.7.2
  1. 使用笔记本演示如何使用在 Azure 机器学习中注册的 MLflow 模型创建评分例程。 创建笔记本并使用 PySpark 作为默认语言。

  2. 导入所需的命名空间:

    import mlflow
    import pyspark.sql.functions as f
    
  3. 配置模型 URI。 以下 URI 将一个名为 heart-classifier 版本 1 的模型引入。 还可以按别名引用模型,例如 models:/heart-classifier@champion

    model_uri = "models:/heart-classifier/1"
    
  4. 将模型加载为 UDF 函数。 用户定义的函数(UDF)是定义的函数,可用于在环境中重复使用自定义逻辑。

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    提示

    使用参数 result_type 控制 predict() 函数返回的类型。

  5. 读取要评分的数据:

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    在这种情况下,输入数据采用 CSV 格式,并放置在文件夹中 dbfs:/data/。 你还将删除列 target ,因为此数据集包含要预测的目标变量。 在生产方案中,数据没有此列。

  6. 运行函数 predict_function 并将预测置于新列中。 在本例中,你将预测放在列中 predictions

    scored_data = df.withColumn("predictions", predict_function(*df.columns))
    

    提示

    predict_function 会接收所需的列作为参数。 在这种情况下,模型需要数据帧的所有列,因此请使用 df.columns。 如果模型需要列的子集,请手动添加它们。 如果模型具有签名,类型需要在输入和预期类型之间兼容。

  7. 可以将预测结果写回存储器。

    scored_data_path = "dbfs:/scored-data"
    scored_data.write.csv(scored_data_path)
    

在 Azure 机器学习的独立 Spark 作业中运行模型

Azure 机器学习支持创建独立的 Spark 作业,以及创建可在 Azure 机器学习管道中使用的可重用 Spark 组件。 在此示例中,部署在 Azure 机器学习独立 Spark 作业中运行的评分作业,并运行 MLflow 模型来执行推理。

注意

若要详细了解 Azure 机器学习中的 Spark 作业,请参阅 Azure 机器学习中的提交 Spark 作业

  1. Spark 作业需要采用参数的 Python 脚本。 创建评分脚本:

    score.py

    import argparse
    import mlflow
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.getOrCreate()
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(args.input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", predict_function(*df.columns))
    
    # Save the predictions
    scored_data.write.csv(args.scored_data)
    

    前面的脚本采用三个参数: --model--input_data以及 --scored_data。 前两个参数是输入,表示要运行的模型和输入数据。 最后一个参数是输出,它是放置预测的输出文件夹。

    提示

    安装 Python 包: 前面的评分脚本将 MLflow 模型加载到 UDF 函数中,但它指示参数 env_manager="conda"。 设置此参数时,MLflow 会在仅运行 UDF 函数的独立环境中还原模型定义中指定的所需包。 有关详细信息,请参阅 mlflow.pyfunc.spark_udf 文档。

  2. 创建作业定义:

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.chinacloudapi.cn/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.4"
    

    提示

    若要使用附加的 Synapse Spark 池,请在前面所示的示例 YAML 规范文件中定义 compute 该属性,而不是该 resources 属性。

  3. 使用命令 az ml job create 和参数 --file 来创建独立的 Spark 作业。 以下命令显示了一个示例:

    az ml job create -f mlflow-score-spark-job.yml
    

后续步骤