Compartir a través de

在 Azure 机器学习中使用 Apache Spark 进行交互式数据整理

数据整理已经成为机器学习项目最重要的方面之一。 Azure 机器学习与 Azure Synapse Analytics 集成,提供对 Apache Spark 池(由 Azure Synapse 支持)的访问,以便使用 Azure 机器学习笔记本进行交互式数据整理。

本文介绍如何使用以下方式处理数据整理

  • 无服务器 Spark 计算
  • 附加的 Synapse Spark 池

先决条件

在开始数据整理任务之前,请了解存储机密的过程

  • Azure Blob 存储帐户访问密钥
  • 共享访问签名 (SAS) 令牌
  • Azure Data Lake Storage (ADLS) Gen 2 服务主体信息

在 Azure 密钥保管库中。 还需要了解如何在 Azure 存储帐户中处理角色分配。 本文档中的以下部分介绍了这些概念。 然后,我们详细了解如何使用 Azure 机器学习笔记本中的 Spark 池进行交互式数据整理。

提示

若要了解 Azure 存储帐户角色分配配置,或者如果使用用户标识直通来访问存储帐户中的数据,请访问在 Azure 存储帐户中添加角色分配以了解详细信息。

使用 Apache Spark 进行交互式数据整理

为了使用 Azure 机器学习笔记本中的 Apache Spark 进行交互式数据整理,Azure 机器学习提供了无服务器 Spark 计算和附加的 Synapse Spark 池。 无服务器 Spark 计算不需要在 Azure Synapse 工作区中创建资源。 相反,在 Azure 机器学习笔记本中可以直接使用完全托管的无服务器 Spark 计算。 要访问 Azure 机器学习中的 Spark 群集,最简单的方法是使用无服务器 Spark 计算。

Azure 机器学习笔记本中的无服务器 Spark 计算

默认情况下,Azure 机器学习笔记本中提供了无服务器 Spark 计算。 若要在笔记本中访问它,请从“计算”选择菜单的“Azure 机器学习无服务器 Spark”下选择“无服务器 Spark 计算”。

笔记本 UI 还为无服务器 Spark 计算提供了 Spark 会话配置选项。 配置 Spark 会话:

  1. 选择屏幕顶部的“配置会话”。
  2. 从下拉菜单中选择“Apache Spark 版本”。

    重要

    适用于 Apache Spark 的 Azure Synapse 运行时:公告

    • 适用于 Apache Spark 3.2 的 Azure Synapse 运行时:
      • EOLA 公告日期:2023 年 7 月 8 日
      • 支持结束日期:2024 年 7 月 8 日。 在此日期之后,将会禁用运行时。
    • Apache Spark 3.3:
      • EOLA 公告日期:2024 年 7 月 12 日
      • 支持终止日期:2025 年 3 月 31 日。 在此日期之后,将会禁用运行时。
    • 为了获取持续支持和最佳性能,建议迁移到 Apache Sark 3.4
  3. 从下拉菜单中选择“实例类型”。 当前支持以下类型:
    • Standard_E4s_v3
    • Standard_E8s_v3
    • Standard_E16s_v3
    • Standard_E32s_v3
    • Standard_E64s_v3
  4. 输入 Spark 会话超时值(以分钟为单位)。
  5. 选择是否要动态分配执行程序
  6. 选择 Spark 会话的执行程序数量。
  7. 从下拉菜单中选择“执行程序大小”。
  8. 从下拉菜单中选择“驱动程序大小”。
  9. 要使用 Conda 文件配置 Spark 会话,请选中“上传 conda 文件”复选框。 然后,选择“浏览”,并选择具有所需 Spark 会话配置的 Conda 文件。
  10. 添加“配置设置”属性,在“属性”和“值”文本框中输入值,然后选择“添加”。
  11. 选择“应用”。
  12. 在“配置新会话?”弹出窗口中,选择“停止会话”

会话配置更改将被保存,并可用于使用无服务器 Spark 计算启动的另一个笔记本会话。

提示

如果使用会话级 Conda 包,并将配置变量 spark.hadoop.aml.enable_cache 设置为 true,则可以改善 Spark 会话冷启动时间。 会话首次启动时,具有会话级别 Conda 包的会话冷启动通常需要 10 到 15 分钟。 但是,配置变量设置为 true 时的后续会话冷启动通常需要 3 到 5 分钟。

从 Azure Data Lake Storage (ADLS) Gen 2 导入和整理数据

可以使用 abfss:// 数据 URI 访问和处理存储在 Azure Data Lake Storage (ADLS) Gen 2 存储帐户中的数据。 为此,必须遵循以下两种数据访问机制之一:

  • 用户标识传递
  • 基于服务主体的数据访问

提示

要使用无服务器 Spark 计算进行数据整理、对 Azure Data Lake Storage (ADLS) Gen 2 存储帐户中的数据进行用户标识直通访问,需要的配置步骤最少。

若要使用用户标识传递开始交互式数据整理,请执行以下命令:

  • 验证用户标识在 Azure Data Lake Storage (ADLS) Gen 2 存储帐户中是否具有“参与者”和“存储 Blob 数据参与者”角色分配

  • 要使用无服务器 Spark 计算,请在“计算”选择菜单中,选择“Azure 机器学习无服务器 Spark”下的“无服务器 Spark 计算”。

  • 要使用附加的 Synapse Spark 池,请从“计算”选择菜单中选择“Synapse Spark 池”下附加的 Synapse Spark 池。

  • 这个 Titanic 数据整理代码示例显示了 abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn/<PATH_TO_DATA> 格式的数据 URI 与 pyspark.pandaspyspark.ml.feature.Imputer 的搭配使用。

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn/data/wrangled",
        index_col="PassengerId",
    )
    

    注意

    此 Python 代码示例使用 pyspark.pandas。 只有 Spark 运行时版本 3.2 或更高版本才支持此功能。

若要通过服务主体按照访问权限来整理数据,请执行以下操作:

  1. 验证服务主体在 Azure Data Lake Storage (ADLS) Gen 2 存储帐户中是否具有“参与者”和“存储 Blob 数据参与者”角色分配

  2. 为服务主体租户 ID、客户端 ID 和客户端机密值创建 Azure 密钥保管库机密

  3. 在“计算”选择菜单中,选择“Azure 机器学习无服务器 Spark”下的“无服务器 Spark 计算”。 还可从“计算”选择菜单中选择“Synapse Spark 池”下附加的 Synapse Spark 池。

  4. 在配置中设置服务主体租户 ID、客户端 ID 和客户端密码值,并执行以下代码示例。

    • 代码中的 get_secret() 调用取决于 Azure 密钥保管库的名称,以及为服务主体租户 ID、客户端 ID 和客户端密码创建的 Azure 密钥保管库机密的名称。 在配置中设置这些相应的属性名称/值:

      • 客户端 ID 属性:fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn
      • 客户端机密属性:fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn
      • 租户 ID 属性:fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn
      • 租户 ID 值:https://login.partner.microsoftonline.cn/<TENANT_ID>/oauth2/token
      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      
      # Set up service principal tenant ID, client ID and secret from Azure Key Vault
      client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
      tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
      client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
      
      # Set up service principal which has access of the data
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn", "OAuth"
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn",
          "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn",
          client_id,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn",
          client_secret,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn",
          "https://login.partner.microsoftonline.cn/" + tenant_id + "/oauth2/token",
      )
      
  5. 使用 Titanic 数据,使用 abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn/<PATH_TO_DATA> 格式的数据 URI 导入和整理数据,如代码示例所示。

从 Azure Blob 存储导入和处理数据

可以使用存储帐户访问密钥或共享访问签名 (SAS) 令牌访问 Azure Blob 存储数据。 应将这些凭据作为机密存储在 Azure 密钥保管库中,并在会话配置中将其设置为属性。

若要开始交互式数据整理,请执行以下操作:

  1. 在左侧Azure 机器学习工作室面板中,选择“笔记本”。

  2. 在“计算”选择菜单中,选择“Azure 机器学习无服务器 Spark”下的“无服务器 Spark 计算”。 还可从“计算”选择菜单中选择“Synapse Spark 池”下附加的 Synapse Spark 池。

  3. 若要配置存储帐户访问密钥或共享访问签名 (SAS) 令牌以便在 Azure 机器学习笔记本中访问数据,请执行以下操作:

    • 对于访问密钥,请设置 fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn 属性,如以下代码片段所示:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn", access_key
      )
      
    • 对于 SAS 令牌,请设置 fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn 属性,如以下代码片段所示:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn",
          sas_token,
      )
      

      注意

      上述代码片段中的 get_secret() 调用需要 Azure Key Vault 的名称,以及为 Azure Blob 存储帐户访问密钥或 SAS 令牌创建的机密的名称。

  4. 在同一笔记本中执行数据整理代码。 将数据 URI 的格式设置为 wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn/<PATH_TO_DATA>,类似于此代码片段所示:

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn/data/wrangled",
        index_col="PassengerId",
    )
    

    注意

    此 Python 代码示例使用 pyspark.pandas。 只有 Spark 运行时版本 3.2 或更高版本才支持此功能。

从 Azure 机器学习数据存储导入和整理数据

要访问 Azure 机器学习数据存储中的数据,请使用 URI 格式 azureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA> 定义数据存储上的数据路径。 若要在笔记本会话中以交互方式处理 Azure 机器学习数据存储中的数据,请执行以下操作:

  1. 在“计算”选择菜单中,选择“Azure 机器学习无服务器 Spark”下的“无服务器 Spark 计算”,或者从“计算”选择菜单中选择“Synapse Spark 池”下附加的 Synapse Spark 池。

  2. 此代码示例显示如何使用 azureml:// 数据存储 URI pyspark.pandaspyspark.ml.feature.Imputer 从 Azure 机器学习数据存储中读取和处理 Titanic 数据。

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "azureml://datastores/workspaceblobstore/paths/data/wrangled",
        index_col="PassengerId",
    )
    

    注意

    此 Python 代码示例使用 pyspark.pandas。 只有 Spark 运行时版本 3.2 或更高版本才支持此功能。

Azure 机器学习数据存储可以使用 Azure 存储帐户凭据访问数据

  • 访问密钥
  • SAS 令牌
  • 服务主体 (service principal)

或使用无凭据的数据访问。 根据数据存储类型和基础 Azure 存储帐户类型,选择适当的身份验证机制来确保数据访问。 下表总结了用于访问 Azure 机器学习数据存储中的数据的身份验证机制:

存储帐户类型 无凭据数据访问 数据访问机制 角色分配
Azure Blob 访问密钥或 SAS 令牌 不需要角色分配
Azure Blob 用户标识传递* 用户标识应在 Azure Blob 存储帐户中具有适当的角色分配
Azure Data Lake Storage (ADLS) Gen 2 服务主体 服务主体应在 Azure Data Lake Storage (ADLS) Gen 2 存储帐户中具有适当的角色分配
Azure Data Lake Storage (ADLS) Gen 2 用户标识传递 用户标识应在 Azure Data Lake Storage (ADLS) Gen 2 存储帐户中具有适当的角色分配

只有在未启用软删除的情况下,* 用户标识直通才适用于指向 Azure Blob 存储帐户的无凭据数据存储。

访问默认文件共享上的数据

默认文件共享挂载到无服务器 Spark 计算和附加的 Synapse Spark 池。

显示使用文件共享的屏幕截图。

在 Azure 机器学习工作室中,默认文件共享中的文件显示在“文件”选项卡下的目录树中。笔记本代码可以使用 file:// 协议以及文件的绝对路径直接访问此文件共享中存储的文件,而无需进行更多配置。 此代码片段演示如何访问存储在默认文件共享上的文件:

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")

注意

此 Python 代码示例使用 pyspark.pandas。 只有 Spark 运行时版本 3.2 或更高版本才支持此功能。

后续步骤