本文介绍如何在 Databricks 中使用已注册的模型对 Spark 数据帧执行批处理推理。 该工作流适用于各种机器学习和深度学习模型,包括 TensorFlow、PyTorch 和 scikit-learn。 它包括有关数据加载、模型推理和性能优化的最佳做法。
对于深度学习应用程序的模型推理,Azure Databricks 建议以下使用工作流。 与使用 TensorFlow 和 PyTorch 的笔记本相关的示例,请参阅 Batch 推理示例。
Databricks 建议使用以下工作流,以使用 Spark DataFrame 执行批处理推理。
确保群集运行兼容的 Databricks ML 运行时版本以匹配训练环境。 使用 MLflow 记录的模型包含可以安装的要求,以确保训练和推理环境匹配。
requirements_path = os.path.join(local_path, "requirements.txt")
if not os.path.exists(requirements_path):
dbutils.fs.put("file:" + requirements_path, "", True)
%pip install -r $requirements_path
%restart_python
根据数据类型,使用适当的方法将数据加载到 Spark 数据帧中:
数据类型 | 方法 |
---|---|
Unity 数据目录中的表(建议) | table = spark.table(input_table_name) |
图像文件(JPG、PNG) | files_df = spark.createDataFrame(map(lambda path: (path,), file_paths), ["path"]) |
TFRecords | df = spark.read.format("tfrecords").load(image_path) |
其他格式(Parquet、CSV、JSON、JDBC) | 使用 Spark 数据源加载。 |
此示例使用 Databricks 模型注册表中的模型进行推理。
predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri)
Pandas UDF 利用 Apache Arrow 来实现数据的高效传输,并利用 pandas 进行数据处理。 使用 pandas UDF 进行推理的典型步骤包括:
加载训练的模型:使用 MLflow 创建 Spark UDF 进行推理。
预处理输入数据:确保输入架构符合模型要求。
运行模型预测:在数据帧上使用模型的 UDF 函数。
df_result = df_spark.withColumn("prediction", predict_udf(*df_spark.columns))
- (推荐)将预测保存到 Unity 目录。
以下示例将预测保存到 Unity 目录。
df_result.write.mode("overwrite").saveAsTable(output_table)
本部分提供有关在 Azure Databricks 上进行模型推理的调试和性能优化的一些提示。 有关概述,请参阅 使用 Spark 数据帧执行批处理推理。
模型推理中通常有两个主要部分:数据输入管道和模型推理。 数据输入管道在数据 I/O 上负担繁重,模型推理在计算上负担繁重。 确定工作流的瓶颈很简单。 以下是一些方法:
- 将模型减少到一个普通模型,并测量每秒的示例。 如果完整模型和普通模型之间的端到端时间差异最小,则数据输入管道可能是瓶颈,否则模型推理是瓶颈。
- 如果使用 GPU 运行模型推理,请检查 GPU 利用率 指标。 如果 GPU 利用率不持续高,则数据输入管道可能是瓶颈。
使用 GPU 可以有效地优化模型推理的运行速度。 随着 GPU 和其他加速器变得更快,数据输入管道必须跟上需求。 数据输入管道将数据读入 Spark 数据帧,对其进行转换,并将其作为模型推理的输入加载。 如果数据输入是瓶颈,下面是提高 I/O 吞吐量的一些提示:
设置每个批的最大记录。 只要记录可以容纳在内存中,更大的最大记录数可以减少调用 UDF 函数的 I/O 开销。 若要设置批大小,请设置以下配置:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")
在 pandas UDF 中预处理输入数据时,批量加载数据并预提取数据。
对于 TensorFlow,Azure Databricks 建议使用 tf.data API。 可以通过在
num_parallel_calls
函数中设置map
并调用prefetch
和batch
来实现映射的并行分析,进行预提取和批处理。dataset.map(parse_example, num_parallel_calls=num_process).prefetch(prefetch_size).batch(batch_size)
对于 PyTorch,Azure Databricks 建议使用 DataLoader 类。 可以设置
batch_size
批处理和num_workers
并行数据加载。torch.utils.data.DataLoader(images, batch_size=batch_size, num_workers=num_process)
本节中的示例遵循推荐的深度学习推理工作流。 以下示例演示了如何使用预先训练的深层残差网络 (ResNets) 神经网络模型执行模型推理。