教程:使用无服务器 Apache Spark 池中的 PREDICT 为机器学习模型评分

了解如何在 Azure Synapse Analytics 中使用无服务器 Apache Spark 池的 PREDICT 功能进行评分预测。 可以使用已注册到 Azure 机器学习 (AML) 中或 Synapse 工作区的默认 Azure Data Lake Storage (ADLS) 中的已训练模型。

借助 Synapse PySpark 笔记本中的 PREDICT,可以使用 SQL 语言、用户定义的函数 (UDF) 或转换器为机器学习模型评分。 使用 PREDICT,可以在 Synapse 外部训练现有的机器学习模型并在 Azure Data Lake Storage Gen2 或 Azure 机器学习中将其注册,以便在 Azure Synapse Analytics 的安全边界内为历史数据评分。 PREDICT 函数采用模型和数据作为输入。 此功能消除了将重要数据移出 Synapse 进行评分的步骤。 目标是使模型使用者能够轻松地在 Synapse 中推理机器学习模型,并与使用正确框架执行任务的模型生成者无缝协作。

本教程介绍以下操作:

  • 使用已在 Synapse 外部训练并已在 Azure 机器学习或 Azure Data Lake Storage Gen2 中注册的机器学习模型为无服务器 Apache Spark 池中的数据预测评分。

如果没有 Azure 订阅,可在开始前创建一个试用帐户

先决条件

  • Azure Synapse Analytics 工作区,其中 Azure Data Lake Storage Gen2 存储帐户配置为默认存储。 你需要成为所使用的 Data Lake Storage Gen2 文件系统的存储 Blob 数据参与者。
  • Azure Synapse Analytics 工作区中的无服务器 Apache Spark 池。 有关详细信息,请参阅在 Azure Synapse 中创建 Spark 池
  • 若要在 Azure 机器学习中训练或注册模型,需要使用 Azure 机器学习工作区。 有关详细信息,请参阅使用门户或 Python SDK 管理 Azure 机器学习工作区
  • 如果你的模型已在 Azure 机器学习中注册,则需要使用一个链接服务。 在 Azure Synapse Analytics 中,链接服务定义了该服务的连接信息。 在本教程中,你将添加 Azure Synapse Analytics 和 Azure 机器学习链接服务。 有关详细信息,请参阅在 Synapse 中创建新的 Azure 机器学习链接服务
  • PREDICT 功能要求有一个已注册到 Azure 机器学习中或者已上传到 Azure Data Lake Storage Gen2 的已训练模型。

注意

  • Azure Synapse Analytics 中的 Spark3 无服务器 Apache Spark 池支持 PREDICT 功能。 建议使用版本 Python 3.8 来创建和训练模型。
  • PREDICT 支持 MLflow 格式的大多数机器学习模型包:此预览版支持 TensorFlow、ONNX、PyTorch、SkLearn 和 pyfunc 。
  • PREDICT 支持 AML 和 ADLS 模型源。 此处的 ADLS 帐户是指默认的 Synapse 工作区 ADLS 帐户。

登录到 Azure 门户

登录到 Azure 门户

对 MLFLOW 打包模型使用 PREDICT

在按照这些步骤使用 PREDICT 之前,请确保满足所有先决条件。

  1. 导入库:导入以下库,以在 spark 会话中使用 PREDICT。

    #Import libraries
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    from azureml.core import Workspace
    from azureml.core.authentication import ServicePrincipalAuthentication
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
  2. 使用变量设置参数:需要使用输入变量来设置 Synapse ADLS 数据路径和模型 URI。 还需要定义运行时(“mlflow”)和模型输出返回数据类型。 请注意,PySpark 支持的所有数据类型也可以通过 PREDICT 来支持。

    注意

    在运行此脚本之前,请使用 ADLS Gen2 数据文件的 URI 以及模型输出返回数据类型和模型文件的 ADLS/AML URI 更新此脚本。

    #Set input data path
    DATA_FILE = "abfss://<filesystemname>@<account name>.dfs.core.chinacloudapi.cn/<file path>"
    
    #Set model URI
        #Set AML URI, if trained model is registered in AML
           AML_MODEL_URI = "<aml model uri>" #In URI ":x" signifies model version in AML. You can   choose which model version you want to run. If ":x" is not provided then by default   latest version will be picked.
    
        #Set ADLS URI, if trained model is uploaded in ADLS
           ADLS_MODEL_URI = "abfss://<filesystemname>@<account name>.dfs.core.chinacloudapi.cn/<model   mlflow folder path>"
    
    #Define model return type
    RETURN_TYPES = "<data_type>" # for ex: int, float etc. PySpark data types are supported
    
    #Define model runtime. This supports only mlflow
    RUNTIME = "mlflow"
    
  3. 对 AML 工作区进行身份验证的方法:如果模型存储在 Synapse 工作区的默认 ADLS 帐户中,则你不需要完成任何其他身份验证设置。 如果模型已在 Azure 机器学习中注册,则你可以选择以下两种受支持的身份验证方法之一。

    注意

    在运行此脚本之前,更新其中的租户、客户端、订阅、资源组、AML 工作区和链接服务详细信息。

    • (建议)通过链接服务:可以使用链接服务对 AML 工作区进行身份验证。 链接服务可以使用“服务主体”或 Synapse 工作区的“托管服务标识 (MSI)”进行身份验证。 “服务主体”或“托管服务标识 (MSI)”必须对 AML 工作区拥有“参与者”访问权限。

      #AML workspace authentication using linked service
      from notebookutils.mssparkutils import azureML
      ws = azureML.getWorkspace("<linked_service_name>") #   "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked   service supports MSI and service principal both
      
    • 通过服务主体:尽管不建议这样做,但你可以直接使用服务主体客户端 ID 和机密对 AML 工作区进行身份验证。 提供服务主体密码会直接造成一些安全风险,因此我们建议尽可能地使用链接服务。 服务主体必须对 AML 工作区拥有“参与者”访问权限。

      #AML workspace authentication using service principal
      AZURE_TENANT_ID = "<tenant_id>"
      AZURE_CLIENT_ID = "<client_id>"
      AZURE_CLIENT_SECRET = "<client_secret>"
      
      AML_SUBSCRIPTION_ID = "<subscription_id>"
      AML_RESOURCE_GROUP = "<resource_group_name>"
      AML_WORKSPACE_NAME = "<aml_workspace_name>"
      
      svc_pr = ServicePrincipalAuthentication( 
           tenant_id=AZURE_TENANT_ID,
           service_principal_id=AZURE_CLIENT_ID,
           service_principal_password=AZURE_CLIENT_SECRET,
      cloud='AzureChinaCloud'
      )
      
      ws = Workspace(
           workspace_name = AML_WORKSPACE_NAME,
           subscription_id = AML_SUBSCRIPTION_ID,
           resource_group = AML_RESOURCE_GROUP,
           auth=svc_pr
      )
      
  4. 在 spark 会话中启用 PREDICT:将 spark 配置 spark.synapse.ml.predict.enabled 设置为 true 以启用库。

    #Enable SynapseML predict
    spark.conf.set("spark.synapse.ml.predict.enabled","true")
    
  5. 在 spark 会话中绑定模型:将模型绑定到所需的输入,以便可以在 spark 会话中引用该模型。 以外,请定义别名,以便可以在 PREDICT 调用中使用同一别名。

    注意

    在运行此脚本之前,更新其中的模型别名和模型 URI。

    #Bind model within Spark session
     model = pcontext.bind_model(
      return_types=RETURN_TYPES, 
      runtime=RUNTIME, 
      model_alias="<random_alias_name>", #This alias will be used in PREDICT call to refer  this   model
      model_uri=ADLS_MODEL_URI, #In case of AML, it will be AML_MODEL_URI
      aml_workspace=ws #This is only for AML. In case of ADLS, this parameter can be removed
      ).register()
    
  6. 从 ADLS 读取数据:从 ADLS 读取数据。 创建 spark 数据帧,并基于数据帧创建视图。

    注意

    在运行此脚本之前,更新其中的视图名称。

    #Read data from ADLS
    df = spark.read \
     .format("csv") \
     .option("header", "true") \
     .csv(DATA_FILE,
         inferSchema=True)
    df.createOrReplaceTempView('<view_name>')
    
  7. 使用 PREDICT 生成评分:可通过以下三种方式调用 PREDICT:使用 Spark SQL API、使用用户定义的函数 (UDF) 和使用转换器 API。 下面是示例。

    注意

    在运行此脚本之前,更新其中的模型别名、视图名称和逗号分隔的模型输入列名称。 逗号分隔的模型输入列与训练模型时使用的列相同。

    #Call PREDICT using Spark SQL API
    
    predictions = spark.sql(
                   """
                       SELECT PREDICT('<random_alias_name>',
                                 <comma_separated_model_input_column_name>) AS predict 
                       FROM <view_name>
                   """
               ).show()
    
    #Call PREDICT using user defined function (UDF)
    
    df = df[<comma_separated_model_input_column_name>] # for ex. df["empid","empname"]
    
    df.withColumn("PREDICT",model.udf(lit("<random_alias_name>"),*df.columns)).show()
    
    #Call PREDICT using Transformer API
    
    columns = [<comma_separated_model_input_column_name>] # for ex. df["empid","empname"]
    
    tranformer = model.create_transformer().setInputCols(columns).setOutputCol("PREDICT")
    
    tranformer.transform(df).show()
    

使用 PREDICT 的 Sklearn 示例

  1. 导入库并从 ADLS 读取训练数据集。

    # Import libraries and read training dataset from ADLS
    
    import fsspec
    import pandas
    from fsspec.core import split_protocol
    
    adls_account_name = 'xyz' #Provide exact ADLS account name
    adls_account_key = 'xyz' #Provide exact ADLS account key
    
    fsspec_handle = fsspec.open('abfs[s]://<container>/<path-to-file>',      account_name=adls_account_name, account_key=adls_account_key)
    
    with fsspec_handle.open() as f:
        train_df = pandas.read_csv(f)
    
  2. 训练模型并生成 mlflow 项目。

    # Train model and generate mlflow artifacts
    
    import os
    import shutil
    import mlflow
    import json
    from mlflow.utils import model_utils
    import numpy as np
    import pandas as pd
    from sklearn.linear_model import LinearRegression
    
    
    class LinearRegressionModel():
      _ARGS_FILENAME = 'args.json'
      FEATURES_KEY = 'features'
      TARGETS_KEY = 'targets'
      TARGETS_PRED_KEY = 'targets_pred'
    
      def __init__(self, fit_intercept, nb_input_features=9, nb_output_features=1):
        self.fit_intercept = fit_intercept
        self.nb_input_features = nb_input_features
        self.nb_output_features = nb_output_features
    
      def get_args(self):
        args = {
            'nb_input_features': self.nb_input_features,
            'nb_output_features': self.nb_output_features,
            'fit_intercept': self.fit_intercept
        }
        return args
    
      def create_model(self):
        self.model = LinearRegression(fit_intercept=self.fit_intercept)
    
      def train(self, dataset):
    
        features = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.FEATURES_KEY])], axis=0)
    
        targets = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.TARGETS_KEY])], axis=0)
    
    
        self.model.fit(features, targets)
    
      def predict(self, dataset):
        features = np.stack([sample for sample in iter(
            dataset[LinearRegressionModel.FEATURES_KEY])], axis=0)
        targets_pred = self.model.predict(features)
        return targets_pred
    
      def save(self, path):
        if os.path.exists(path):
          shutil.rmtree(path)
    
        # save the sklearn model with mlflow
        mlflow.sklearn.save_model(self.model, path)
    
        # save args
        self._save_args(path)
    
      def _save_args(self, path):
        args_filename = os.path.join(path, LinearRegressionModel._ARGS_FILENAME)
        with open(args_filename, 'w') as f:
          args = self.get_args()
          json.dump(args, f)
    
    
    def train(train_df, output_model_path):
      print(f"Start to train LinearRegressionModel.")
    
      # Initialize input dataset
      dataset = train_df.to_numpy()
      datasets = {}
      datasets['targets'] = dataset[:, -1]
      datasets['features'] = dataset[:, :9]
    
      # Initialize model class obj
      model_class = LinearRegressionModel(fit_intercept=10)
      with mlflow.start_run(nested=True) as run:
        model_class.create_model()
        model_class.train(datasets)
        model_class.save(output_model_path)
        print(model_class.predict(datasets))
    
    
    train(train_df, './artifacts/output')
    
  3. 将模型 MLFLOW 项目存储在 ADLS 中,或注册到 AML 中。

    # Store model MLFLOW artifacts in ADLS
    
    STORAGE_PATH = 'abfs[s]://<container>/<path-to-store-folder>'
    
    protocol, _ = split_protocol(STORAGE_PATH)
    print (protocol)
    
    storage_options = {
        'account_name': adls_account_name,
        'account_key': adls_account_key
    }
    fs = fsspec.filesystem(protocol, **storage_options)
    fs.put(
        './artifacts/output',
        STORAGE_PATH, 
        recursive=True, overwrite=True)
    
    # Register model MLFLOW artifacts in AML
    
    from azureml.core import Workspace, Model
    from azureml.core.authentication import ServicePrincipalAuthentication
    from notebookutils.mssparkutils import azureML
    
    AZURE_TENANT_ID = "xyz"
    AZURE_CLIENT_ID = "xyz"
    AZURE_CLIENT_SECRET = "xyz"
    
    AML_SUBSCRIPTION_ID = "xyz"
    AML_RESOURCE_GROUP = "xyz"
    AML_WORKSPACE_NAME = "xyz"
    
    #AML workspace authentication using linked service
    ws = azureML.getWorkspace("<linked_service_name>") #   "<linked_service_name>" is the linked service name, not AML workspace name. Also, linked   service supports MSI and service principal both
    
    model = Model.register(
        model_path="./artifacts/output",
        model_name="xyz",
        workspace=ws,
    )
    
  4. 使用变量设置所需的参数。

    # If using ADLS uploaded model
    
    import pandas as pd
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
    DATA_FILE = "abfss://xyz@xyz.dfs.core.chinacloudapi.cn/xyz.csv"
    ADLS_MODEL_URI_SKLEARN = "abfss://xyz@xyz.dfs.core.chinacloudapi.cn/mlflow/sklearn/     e2e_linear_regression/"
    RETURN_TYPES = "INT"
    RUNTIME = "mlflow"
    
    # If using AML registered model
    
    from pyspark.sql.functions import col, pandas_udf,udf,lit
    from azureml.core import Workspace
    from azureml.core.authentication import ServicePrincipalAuthentication
    import azure.synapse.ml.predict as pcontext
    import azure.synapse.ml.predict.utils._logger as synapse_predict_logger
    
    DATA_FILE = "abfss://xyz@xyz.dfs.core.chinacloudapi.cn/xyz.csv"
    AML_MODEL_URI_SKLEARN = "aml://xyz"
    RETURN_TYPES = "INT"
    RUNTIME = "mlflow"
    
  5. 在 spark 会话中启用 SynapseML PREDICT 功能。

    spark.conf.set("spark.synapse.ml.predict.enabled","true")
    
  6. 在 spark 会话中绑定模型。

    # If using ADLS uploaded model
    
    model = pcontext.bind_model(
     return_types=RETURN_TYPES, 
     runtime=RUNTIME, 
     model_alias="sklearn_linear_regression",
     model_uri=ADLS_MODEL_URI_SKLEARN,
     ).register()
    
    # If using AML registered model
    
    model = pcontext.bind_model(
     return_types=RETURN_TYPES, 
     runtime=RUNTIME, 
     model_alias="sklearn_linear_regression",
     model_uri=AML_MODEL_URI_SKLEARN,
     aml_workspace=ws
     ).register()
    
  7. 从 ADLS 加载测试数据。

    # Load data from ADLS
    
    df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .csv(DATA_FILE,
            inferSchema=True)
    df = df.select(df.columns[:9])
    df.createOrReplaceTempView('data')
    df.show(10)
    
  8. 调用 PREDICT 以生成评分。

    # Call PREDICT
    
    predictions = spark.sql(
                      """
                          SELECT PREDICT('sklearn_linear_regression', *) AS predict FROM data
                      """
                  ).show()
    

后续步骤