教程:使用无服务器 Apache Spark 池中的 PREDICT 为机器学习模型评分
了解如何在 Azure Synapse Analytics 中使用无服务器 Apache Spark 池的 PREDICT 功能进行评分预测。 可以使用已注册到 Azure 机器学习 (AML) 中或 Synapse 工作区的默认 Azure Data Lake Storage (ADLS) 中的已训练模型。
借助 Synapse PySpark 笔记本中的 PREDICT,可以使用 SQL 语言、用户定义的函数 (UDF) 或转换器为机器学习模型评分。 使用 PREDICT,可以在 Synapse 外部训练现有的机器学习模型并在 Azure Data Lake Storage Gen2 或 Azure 机器学习中将其注册,以便在 Azure Synapse Analytics 的安全边界内为历史数据评分。 PREDICT 函数采用模型和数据作为输入。 此功能消除了将重要数据移出 Synapse 进行评分的步骤。 目标是使模型使用者能够轻松地在 Synapse 中推理机器学习模型,并与使用正确框架执行任务的模型生成者无缝协作。
本教程介绍以下操作:
- 使用已在 Synapse 外部训练并已在 Azure 机器学习或 Azure Data Lake Storage Gen2 中注册的机器学习模型为无服务器 Apache Spark 池中的数据预测评分。
如果没有 Azure 订阅,请在开始前创建一个试用帐户。
先决条件
- Azure Synapse Analytics 工作区,其中 Azure Data Lake Storage Gen2 存储帐户配置为默认存储。 你需要成为所使用的 Data Lake Storage Gen2 文件系统的存储 Blob 数据参与者。
- Azure Synapse Analytics 工作区中的无服务器 Apache Spark 池。 有关详细信息,请参阅在 Azure Synapse 中创建 Spark 池。
- 若要在 Azure 机器学习中训练或注册模型,需要使用 Azure 机器学习工作区。 有关详细信息,请参阅使用门户或 Python SDK 管理 Azure 机器学习工作区。
- 如果你的模型已在 Azure 机器学习中注册,则需要使用一个链接服务。 在 Azure Synapse Analytics 中,链接服务定义了该服务的连接信息。 在本教程中,你将添加 Azure Synapse Analytics 和 Azure 机器学习链接服务。 有关详细信息,请参阅在 Synapse 中创建新的 Azure 机器学习链接服务。
- PREDICT 功能要求有一个已注册到 Azure 机器学习中或者已上传到 Azure Data Lake Storage Gen2 的已训练模型。
注意
- Azure Synapse Analytics 中的 Spark3 无服务器 Apache Spark 池支持 PREDICT 功能。 建议使用版本 Python 3.8 来创建和训练模型。
- PREDICT 支持 MLflow 格式的大多数机器学习模型包:此预览版支持 TensorFlow、ONNX、PyTorch、SkLearn 和 pyfunc 。
- PREDICT 支持 AML 和 ADLS 模型源。 此处的 ADLS 帐户是指默认的 Synapse 工作区 ADLS 帐户。
登录到 Azure 门户
登录到 Azure 门户。
对 MLFLOW 打包模型使用 PREDICT
在按照这些步骤使用 PREDICT 之前,请确保满足所有先决条件。
导入库:导入以下库,以在 spark 会话中使用 PREDICT。
#Import libraries from pyspark.sql.functions import col, pandas_udf,udf,lit from azureml.core import Workspace from azureml.core.authentication import ServicePrincipalAuthentication import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
使用变量设置参数:需要使用输入变量来设置 Synapse ADLS 数据路径和模型 URI。 还需要定义运行时(“mlflow”)和模型输出返回数据类型。 请注意,PySpark 支持的所有数据类型也可以通过 PREDICT 来支持。
注意
在运行此脚本之前,请使用 ADLS Gen2 数据文件的 URI 以及模型输出返回数据类型和模型文件的 ADLS/AML URI 更新此脚本。
#Set input data path DATA_FILE = "abfss://<filesystemname>@<account name>.dfs.core.chinacloudapi.cn/<file path>" #Set model URI #Set AML URI, if trained model is registered in AML AML_MODEL_URI = "<aml model uri>" #In URI ":x" signifies model version in AML. You can choose which model version you want to run. If ":x" is not provided then by default latest version will be picked. #Set ADLS URI, if trained model is uploaded in ADLS ADLS_MODEL_URI = "abfss://<filesystemname>@<account name>.dfs.core.chinacloudapi.cn/<model mlflow folder path>" #Define model return type RETURN_TYPES = "<data_type>" # for ex: int, float etc. PySpark data types are supported #Define model runtime. This supports only mlflow RUNTIME = "mlflow"
对 AML 工作区进行身份验证的方法:如果模型存储在 Synapse 工作区的默认 ADLS 帐户中,则你不需要完成任何其他身份验证设置。 如果模型已在 Azure 机器学习中注册,则你可以选择以下两种受支持的身份验证方法之一。
注意
在运行此脚本之前,更新其中的租户、客户端、订阅、资源组、AML 工作区和链接服务详细信息。
通过服务主体:可以直接使用服务主体客户端 ID 和机密对 AML 工作区进行身份验证。 服务主体必须对 AML 工作区拥有“参与者”访问权限。
#AML workspace authentication using service principal AZURE_TENANT_ID = "<tenant_id>" AZURE_CLIENT_ID = "<client_id>" AZURE_CLIENT_SECRET = "<client_secret>" AML_SUBSCRIPTION_ID = "<subscription_id>" AML_RESOURCE_GROUP = "<resource_group_name>" AML_WORKSPACE_NAME = "<aml_workspace_name>" svc_pr = ServicePrincipalAuthentication( tenant_id=AZURE_TENANT_ID, service_principal_id=AZURE_CLIENT_ID, service_principal_password=AZURE_CLIENT_SECRET, cloud='AzureChinaCloud' ) ws = Workspace( workspace_name = AML_WORKSPACE_NAME, subscription_id = AML_SUBSCRIPTION_ID, resource_group = AML_RESOURCE_GROUP, auth=svc_pr )
通过链接服务:可以使用链接服务对 AML 工作区进行身份验证。 链接服务可以使用“服务主体”或 Synapse 工作区的“托管服务标识 (MSI)”进行身份验证。 “服务主体”或“托管服务标识 (MSI)”必须对 AML 工作区拥有“参与者”访问权限。
#AML workspace authentication using linked service from notebookutils.mssparkutils import azureML ws = azureML.getWorkspace("<linked_service_name>") # "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked service supports MSI and service principal both
在 spark 会话中启用 PREDICT:将 spark 配置
spark.synapse.ml.predict.enabled
设置为true
以启用库。#Enable SynapseML predict spark.conf.set("spark.synapse.ml.predict.enabled","true")
在 spark 会话中绑定模型:将模型绑定到所需的输入,以便可以在 spark 会话中引用该模型。 以外,请定义别名,以便可以在 PREDICT 调用中使用同一别名。
注意
在运行此脚本之前,更新其中的模型别名和模型 URI。
#Bind model within Spark session model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="<random_alias_name>", #This alias will be used in PREDICT call to refer this model model_uri=ADLS_MODEL_URI, #In case of AML, it will be AML_MODEL_URI aml_workspace=ws #This is only for AML. In case of ADLS, this parameter can be removed ).register()
从 ADLS 读取数据:从 ADLS 读取数据。 创建 spark 数据帧,并基于数据帧创建视图。
注意
在运行此脚本之前,更新其中的视图名称。
#Read data from ADLS df = spark.read \ .format("csv") \ .option("header", "true") \ .csv(DATA_FILE, inferSchema=True) df.createOrReplaceTempView('<view_name>')
使用 PREDICT 生成评分:可通过以下三种方式调用 PREDICT:使用 Spark SQL API、使用用户定义的函数 (UDF) 和使用转换器 API。 下面是示例。
注意
在运行此脚本之前,更新其中的模型别名、视图名称和逗号分隔的模型输入列名称。 逗号分隔的模型输入列与训练模型时使用的列相同。
#Call PREDICT using Spark SQL API predictions = spark.sql( """ SELECT PREDICT('<random_alias_name>', <comma_separated_model_input_column_name>) AS predict FROM <view_name> """ ).show()
#Call PREDICT using user defined function (UDF) df = df[<comma_separated_model_input_column_name>] # for ex. df["empid","empname"] df.withColumn("PREDICT",model.udf(lit("<random_alias_name>"),*df.columns)).show()
#Call PREDICT using Transformer API columns = [<comma_separated_model_input_column_name>] # for ex. df["empid","empname"] tranformer = model.create_transformer().setInputCols(columns).setOutputCol("PREDICT") tranformer.transform(df).show()
使用 PREDICT 的 Sklearn 示例
导入库并从 ADLS 读取训练数据集。
# Import libraries and read training dataset from ADLS import fsspec import pandas from fsspec.core import split_protocol adls_account_name = 'xyz' #Provide exact ADLS account name adls_account_key = 'xyz' #Provide exact ADLS account key fsspec_handle = fsspec.open('abfs[s]://<container>/<path-to-file>', account_name=adls_account_name, account_key=adls_account_key) with fsspec_handle.open() as f: train_df = pandas.read_csv(f)
训练模型并生成 mlflow 项目。
# Train model and generate mlflow artifacts import os import shutil import mlflow import json from mlflow.utils import model_utils import numpy as np import pandas as pd from sklearn.linear_model import LinearRegression class LinearRegressionModel(): _ARGS_FILENAME = 'args.json' FEATURES_KEY = 'features' TARGETS_KEY = 'targets' TARGETS_PRED_KEY = 'targets_pred' def __init__(self, fit_intercept, nb_input_features=9, nb_output_features=1): self.fit_intercept = fit_intercept self.nb_input_features = nb_input_features self.nb_output_features = nb_output_features def get_args(self): args = { 'nb_input_features': self.nb_input_features, 'nb_output_features': self.nb_output_features, 'fit_intercept': self.fit_intercept } return args def create_model(self): self.model = LinearRegression(fit_intercept=self.fit_intercept) def train(self, dataset): features = np.stack([sample for sample in iter( dataset[LinearRegressionModel.FEATURES_KEY])], axis=0) targets = np.stack([sample for sample in iter( dataset[LinearRegressionModel.TARGETS_KEY])], axis=0) self.model.fit(features, targets) def predict(self, dataset): features = np.stack([sample for sample in iter( dataset[LinearRegressionModel.FEATURES_KEY])], axis=0) targets_pred = self.model.predict(features) return targets_pred def save(self, path): if os.path.exists(path): shutil.rmtree(path) # save the sklearn model with mlflow mlflow.sklearn.save_model(self.model, path) # save args self._save_args(path) def _save_args(self, path): args_filename = os.path.join(path, LinearRegressionModel._ARGS_FILENAME) with open(args_filename, 'w') as f: args = self.get_args() json.dump(args, f) def train(train_df, output_model_path): print(f"Start to train LinearRegressionModel.") # Initialize input dataset dataset = train_df.to_numpy() datasets = {} datasets['targets'] = dataset[:, -1] datasets['features'] = dataset[:, :9] # Initialize model class obj model_class = LinearRegressionModel(fit_intercept=10) with mlflow.start_run(nested=True) as run: model_class.create_model() model_class.train(datasets) model_class.save(output_model_path) print(model_class.predict(datasets)) train(train_df, './artifacts/output')
将模型 MLFLOW 项目存储在 ADLS 中,或注册到 AML 中。
# Store model MLFLOW artifacts in ADLS STORAGE_PATH = 'abfs[s]://<container>/<path-to-store-folder>' protocol, _ = split_protocol(STORAGE_PATH) print (protocol) storage_options = { 'account_name': adls_account_name, 'account_key': adls_account_key } fs = fsspec.filesystem(protocol, **storage_options) fs.put( './artifacts/output', STORAGE_PATH, recursive=True, overwrite=True)
# Register model MLFLOW artifacts in AML from azureml.core import Workspace, Model from azureml.core.authentication import ServicePrincipalAuthentication AZURE_TENANT_ID = "xyz" AZURE_CLIENT_ID = "xyz" AZURE_CLIENT_SECRET = "xyz" AML_SUBSCRIPTION_ID = "xyz" AML_RESOURCE_GROUP = "xyz" AML_WORKSPACE_NAME = "xyz" svc_pr = ServicePrincipalAuthentication( tenant_id=AZURE_TENANT_ID, service_principal_id=AZURE_CLIENT_ID, service_principal_password=AZURE_CLIENT_SECRET ) ws = Workspace( workspace_name = AML_WORKSPACE_NAME, subscription_id = AML_SUBSCRIPTION_ID, resource_group = AML_RESOURCE_GROUP, auth=svc_pr ) model = Model.register( model_path="./artifacts/output", model_name="xyz", workspace=ws, )
使用变量设置所需的参数。
# If using ADLS uploaded model import pandas as pd from pyspark.sql import SparkSession from pyspark.sql.functions import col, pandas_udf,udf,lit import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger DATA_FILE = "abfss://xyz@xyz.dfs.core.chinacloudapi.cn/xyz.csv" ADLS_MODEL_URI_SKLEARN = "abfss://xyz@xyz.dfs.core.chinacloudapi.cn/mlflow/sklearn/ e2e_linear_regression/" RETURN_TYPES = "INT" RUNTIME = "mlflow"
# If using AML registered model from pyspark.sql.functions import col, pandas_udf,udf,lit from azureml.core import Workspace from azureml.core.authentication import ServicePrincipalAuthentication import azure.synapse.ml.predict as pcontext import azure.synapse.ml.predict.utils._logger as synapse_predict_logger DATA_FILE = "abfss://xyz@xyz.dfs.core.chinacloudapi.cn/xyz.csv" AML_MODEL_URI_SKLEARN = "aml://xyz" RETURN_TYPES = "INT" RUNTIME = "mlflow"
在 spark 会话中启用 SynapseML PREDICT 功能。
spark.conf.set("spark.synapse.ml.predict.enabled","true")
在 spark 会话中绑定模型。
# If using ADLS uploaded model model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="sklearn_linear_regression", model_uri=ADLS_MODEL_URI_SKLEARN, ).register()
# If using AML registered model model = pcontext.bind_model( return_types=RETURN_TYPES, runtime=RUNTIME, model_alias="sklearn_linear_regression", model_uri=AML_MODEL_URI_SKLEARN, aml_workspace=ws ).register()
从 ADLS 加载测试数据。
# Load data from ADLS df = spark.read \ .format("csv") \ .option("header", "true") \ .csv(DATA_FILE, inferSchema=True) df = df.select(df.columns[:9]) df.createOrReplaceTempView('data') df.show(10)
调用 PREDICT 以生成评分。
# Call PREDICT predictions = spark.sql( """ SELECT PREDICT('sklearn_linear_regression', *) AS predict FROM data """ ).show()