在 Spark 作业中部署和运行 MLflow 模型
本文内容
本文介绍如何在 Spark 作业中部署和运行 MLflow 模型,以针对大量数据或在数据整理过程中执行推理。
此示例演示如何将 Azure 机器学习中注册的 MLflow 模型部署到托管 Spark 群集(预览)中运行的 Spark 作业、Azure Databricks 或 Azure Synapse Analytics,以便对大量数据执行推理。
该模型基于 UCI 心脏病数据集。 数据库包含 76 个属性,但我们使用其中 14 个。 该模型尝试预测患者是否存在心脏疾病。 它是从 0(不存在)到 1(存在)的整数值。 它已使用 XGBBoost
分类器进行训练,所有必需的预处理都打包为 scikit-learn
管道,使此模型成为从原始数据到预测的端到端管道。
本文中的信息基于 azureml-examples 存储库中包含的代码示例。 若要在不复制/粘贴文件的情况下在本地运行命令,请克隆存储库,然后将目录更改为 sdk/using-mlflow/deploy
。
git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy
在按照本文中的步骤操作之前,请确保满足以下先决条件:
如下所示,安装 MLflow SDK
mlflow
包和适用于 MLflow 的 Azure 机器学习azureml-mlflow
插件。pip install mlflow azureml-mlflow
提示
可以使用
mlflow-skinny
包,它是一个不带 SQL 存储、服务器、UI 或数据科学依赖项的轻型 MLflow 包。 对于主要需要 MLflow 的跟踪和记录功能但不需要导入整个功能套件(包括部署)的用户,建议使用此包。创建 Azure 机器学习工作区。 若要创建工作区,请参阅创建入门所需的资源。 查看在工作区中执行 MLflow 操作所需的访问权限。
若要执行远程跟踪,或者跟踪在 Azure 机器学习外部运行的试验,请将 MLflow 配置为指向 Azure 机器学习工作区的跟踪 URI。 有关如何将 MLflow 连接到工作区的详细信息,请参阅为 Azure 机器学习配置 MLflow。
- 必须在工作区中注册 MLflow 模型。 具体而言,此示例将注册针对糖尿病数据集训练的模型。
首先,我们连接到要在其中注册模型的 Azure 机器学习工作区。
已为你配置跟踪。 使用 MLflow 时,还会使用你的默认凭据。
我们需要在 Azure 机器学习注册表中注册的模型来执行推理。 在这种情况下,我们已在存储库中拥有模型的本地副本,因此我们只需要将模型发布到工作区中的注册表。 如果打算部署的模型已注册,则可以跳过此步骤。
model_name = 'heart-classifier'
model_local_path = "model"
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version
或者,如果模型是在运行中记录的,则你可以直接注册它。
提示
若要注册模型,需要知道模型的存储位置。 如果使用的是 MLflow 的 autolog
功能,则路径将取决于所使用模型的类型和框架。 建议检查作业输出以确定该文件夹的名称。 可以查找包含名为 MLModel
的文件的文件夹。 如果使用 log_model
手动记录模型,则路径是传递给此类方法的参数。 例如,如果使用 mlflow.sklearn.log_model(my_model, "classifier")
记录模型,则存储模型的路径为 classifier
。
model_name = 'heart-classifier'
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version
备注
路径 MODEL_PATH
是模型在运行中存储的位置。
我们需要一些用于运行作业的输入数据。 在本示例中,我们将从 Internet 下载示例数据,并将其放置在 Spark 群集使用的共享存储中。
import urllib
urllib.request.urlretrieve("https://azuremlexampledata.blob.core.chinacloudapi.cn/data/heart-disease-uci/data/heart.csv", "/tmp/data")
将数据移动到可供整个群集使用的已装载存储帐户。
dbutils.fs.mv("file:/tmp/data", "dbfs:/")
重要
前面的代码使用 dbutils
,这是 Azure Databricks 群集中可用的工具。 根据所使用的平台使用适当的工具。
然后,输入数据放置在以下文件夹中:
input_data_path = "dbfs:/data"
以下部分介绍如何在 Spark 作业中运行在 Azure 机器学习中注册的 MLflow 模型。
- 确保在群集中安装以下库:
- mlflow<3,>=2.1
- cloudpickle==2.2.0
- scikit-learn==1.2.0
- xgboost==1.7.2
我们将使用笔记本来演示如何使用 Azure 机器学习中注册的 MLflow 模型来创建评分例程。 创建笔记本并使用 PySpark 作为默认语言。
导入所需的命名空间:
import mlflow import pyspark.sql.functions as f
配置模型 URI。 以下 URI 在其最新版本中引入名为
heart-classifier
的模型。model_uri = "models:/heart-classifier/latest"
将模型加载为 UDF 函数。 用户定义函数 (UDF) 是由用户定义的函数,用于在用户环境中重复使用自定义逻辑。
predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double')
提示
使用参数
result_type
控制predict()
函数返回的类型。读取要评分的数据:
df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
在本示例中,输入数据采用
CSV
格式并放置在dbfs:/data/
文件夹中。 我们还要删除target
列,因为此数据集包含要预测的目标变量。 在生产方案中,数据不包含此列。运行函数
predict_function
,并将预测结果放在新列中。 在本示例中,我们要将预测放在predictions
列中。df.withColumn("predictions", score_function(*df.columns))
提示
predict_function
会接收所需的列作为参数。 在本示例中,数据帧的所有列都是模型预期,因此使用了df.columns
。 如果模型需要列的子集,可以手动将其引入。 如果模型具有签名,则类型需要在输入与预期类型之间兼容。可以将预测写回到存储:
scored_data_path = "dbfs:/scored-data" scored_data.to_csv(scored_data_path)
Azure 机器学习支持创建独立的 Spark 作业,以及创建可在 Azure 机器学习管道中使用的可重用 Spark 组件。 在本例中,我们将部署评分作业(在 Azure 机器学习独立 Spark 作业中运行并运行 MLflow 模型)来执行推理。
备注
若要详细了解 Azure 机器学习中的 Spark 作业,请参阅在 Azure 机器学习中提交 Spark 作业(预览版)。
Spark 作业需要采用参数的 Python 脚本。 创建评分脚本:
score.py
import argparse parser = argparse.ArgumentParser() parser.add_argument("--model") parser.add_argument("--input_data") parser.add_argument("--scored_data") args = parser.parse_args() print(args.model) print(args.input_data) # Load the model as an UDF function predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda") # Read the data you want to score df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target") # Run the function `predict_function` and place the predictions on a new column scored_data = df.withColumn("predictions", score_function(*df.columns)) # Save the predictions scored_data.to_csv(args.scored_data)
上述脚本采用
--model
、--input_data
和--scored_data
这三个参数。 前两个是输入,表示要运行的模型和输入数据,最后一个是输出,这是用于放置预测的输出文件夹。提示
Python 包安装:前面的评分脚本将 MLflow 模型加载到 UDF 函数中,但它指示参数
env_manager="conda"
。 设置此参数后,MLflow 将在仅有 UDF 函数运行的隔离环境中还原模型定义中指定的所需包。 有关更多详细信息,请参阅mlflow.pyfunc.spark_udf
文档。创建作业定义:
mlflow-score-spark-job.yml
$schema: http://azureml/sdk-2-0/SparkJob.json type: spark code: ./src entry: file: score.py conf: spark.driver.cores: 1 spark.driver.memory: 2g spark.executor.cores: 2 spark.executor.memory: 2g spark.executor.instances: 2 inputs: model: type: mlflow_model path: azureml:heart-classifier@latest input_data: type: uri_file path: https://azuremlexampledata.blob.core.chinacloudapi.cn/data/heart-disease-uci/data/heart.csv mode: direct outputs: scored_data: type: uri_folder args: >- --model ${{inputs.model}} --input_data ${{inputs.input_data}} --scored_data ${{outputs.scored_data}} identity: type: user_identity resources: instance_type: standard_e4s_v3 runtime_version: "3.2"
提示
若要使用附加的 Synapse Spark 池,请在上面所示的示例 YAML 规范文件中定义
compute
属性,而不是resources
属性。上面显示的 YAML 文件可以在带有
--file
参数的az ml job create
命令中使用,以创建独立的 Spark 作业,如下所示:az ml job create -f mlflow-score-spark-job.yml