使用 Apache Spark 进行交互式数据整理

数据整理是机器学习项目的重要方面。 本文介绍如何通过在 Azure Synapse 支持的无服务器 Apache Spark 计算上运行 Azure 机器学习笔记本来执行交互式数据整理。

本文介绍如何附加和配置无服务器 Spark 计算。 然后,本文介绍如何使用无服务器 Spark 计算从多个源访问和整理数据。

先决条件

  • Azure 订阅中的所有者或角色分配权限。 可以 创建 Azure 帐户
  • 一个 Azure 机器学习工作区。 有关详细信息,请参阅创建工作区资源
  • 上传到工作区默认文件共享中的titanic.csv数据集
  • Azure Data Lake Storage Gen 2 存储帐户。 有关详细信息,请参阅 创建 Azure Data Lake Storage Gen 2 存储帐户
  • 已授予以下 角色分配
    • 对于 Azure 存储帐户访问权限,Azure 存储帐户中的 参与者存储 Blob 数据参与者 角色。
    • 对于访问 Azure 密钥保管库中的机密信息,需要密钥保管库的 机密用户 角色。

有关详细信息,请参见:

在笔记本会话中使用无服务器 Spark 计算

使用无服务器 Spark 计算是访问 Spark 群集以进行交互式数据整理的最简单方法。 附加到 Synapse Spark 池 的完全托管无服务器 Spark 计算直接在 Azure 机器学习笔记本中可用。

若要使用以下任何数据访问和整理源和方法,请通过选择 Azure 机器学习无服务器 Spark,在文件或笔记本页面顶部靠近 >的地方选择 无服务器 Spark 计算 - 可用 来附加 Spark 无服务器计算。 计算可能需要一两分钟才能附加到会话。

配置无服务器 Spark 会话

附加无服务器 Spark 计算后,可以选择通过设置或更改多个值来配置 Spark 会话。 配置 Spark 会话:

  1. 选择文件或笔记本页上左上角的 “配置会话 ”。
  2. “配置会话 ”屏幕上,更改以下任何设置:
    • “计算 ”窗格中:

      • 通过从 节点大小下的下拉菜单中选择不同的大小来更改计算机大小。
      • 选择是否 动态分配执行程序
      • 选择 Spark 会话的执行程序数量。
      • 从下拉菜单中选择其他 执行程序大小 (如果可用)。
    • “设置” 窗格中:

      • 如果可用,请将 Apache Spark 版本 更改为与 3.4 不同的版本。
      • 会话超时 值(以分钟为单位)更改为更高的数字,以帮助防止会话超时。
      • “配置设置”下,添加 属性 名称/值设置以根据需要配置会话。

        提示

        如果使用会话级 Conda 包,则添加值为 spark.hadoop.aml.enable_cachetrue 配置属性可以提高 Spark 会话冷启动的速度。 首次会话冷启动并使用会话级别的Conda包通常需要10到15分钟。 后续会话的冷启动,当配置变量设置为 true 时,通常需要 3 到 5 分钟。

    • Python 包 窗格中:

      • 若要使用 Conda 文件配置会话,请选择 “上传 conda 文件”。 在“选择 conda 文件”旁边,选择“浏览”,然后浏览至计算机并打开相应的 Conda YAML 文件,并上传它。
      • 若要使用自定义环境,请选择 “自定义环境 ”,然后在“ 环境类型”下选择自定义环境。 有关详细信息,请参阅 “管理软件环境”。
  3. 若要应用所有配置,请选择 “应用”。

会话配置更改将保持不变,可供使用附加无服务器 Spark 计算的其他笔记本会话使用。

从 Azure Data Lake Storage 导入和整理数据

若要访问和整理存储在 Azure Data Lake Storage 存储帐户中的数据,请使用具有用户标识直通或基于服务主体访问的协议 URI。 用户标识直通无需添加任何配置。

若要使用任一方法,用户标识或服务主体必须在 Azure Data Lake Storage 帐户中分配参与者存储 Blob 数据参与者角色。

对于用户身份透传,请运行以下数据整理代码示例,使用数据 URI 格式 abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn/<PATH_TO_DATA>pyspark.pandaspyspark.ml.feature.Imputer。 将 <STORAGE_ACCOUNT_NAME> 占位符替换为 Azure Data Lake Storage 帐户的名称,并将 <FILE_SYSTEM_NAME> 该占位符替换为数据容器的名称。

import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

df = pd.read_csv(
    "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn/data/titanic.csv",
    index_col="PassengerId",
)
imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
    "mean"
)  # Replace missing values in Age column with the mean value
df.fillna(
    value={"Cabin": "None"}, inplace=True
)  # Fill Cabin column with value "None" if missing
df.dropna(inplace=True)  # Drop the rows which still have any missing value
df.to_csv(
    "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn/data/wrangled",
    index_col="PassengerId",
)

使用服务主体

若要使用服务主体从 Azure Data Lake Storage 访问和整理数据,请先设置服务主体,如下所示:

  1. 创建服务主体,并为其分配必要的存储 Blob 数据贡献者和 Key Vault Secrets 用户角色

  2. 从应用注册中获取服务主体租户 ID、客户端 ID 和客户端机密值,并为这些值 创建 Azure Key Vault 机密

  3. 通过在会话配置中添加以下属性名称/值对来设置服务主体租户 ID、客户端 ID 和客户端密码。 将 <STORAGE_ACCOUNT_NAME> 替换为您的存储帐户名称,将 <TENANT_ID> 替换为服务主体租户 ID。

    属性名称 价值
    fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn 应用程序(客户端)ID 值
    fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn https://login.partner.microsoftonline.cn/<TENANT_ID>/oauth2/token
    fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn 客户端机密值
  4. 运行以下代码。 get_secret() 代码中的调用依赖于 Key Vault 的名称,以及为服务主体的租户 ID、客户端 ID 和客户端密钥创建的 Key Vault 机密的名称。

    from pyspark.sql import SparkSession
    
    sc = SparkSession.builder.getOrCreate()
    token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
    
    # Set up service principal tenant ID, client ID, and secret from Azure Key Vault
    client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
    tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
    client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
    
    # Set up a service principal that has access to the data
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn", "OAuth"
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn",
        "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn",
        client_id,
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn",
        client_secret,
    )
    sc._jsc.hadoopConfiguration().set(
        "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn",
        "https://login.partner.microsoftonline.cn/" + tenant_id + "/oauth2/token",
    )
    
  5. 使用数据 URI 格式导入并整理数据,如代码示例所示。 将 <STORAGE_ACCOUNT_NAME> 占位符替换为 Azure Data Lake Storage 帐户的名称,并将 <FILE_SYSTEM_NAME> 该占位符替换为数据容器的名称。

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.chinacloudapi.cn/data/wrangled",
        index_col="PassengerId",
    )
    

从 Azure Blob 存储导入和处理数据

可以使用 存储帐户访问密钥共享访问签名(SAS)令牌访问 Azure Blob 存储数据。 将凭据作为机密存储在 Azure Key Vault 中,并将其设置为 Spark 会话配置中的属性。

  1. 运行以下代码片段之一。 代码 get_secret() 片段中的调用需要密钥保管库的名称以及为 Azure Blob 存储帐户访问密钥或 SAS 令牌创建的机密的名称。

    • 若要配置存储帐户访问密钥,请设置 fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn 属性,如以下代码片段所示:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn", access_key
      )
      
    • 若要配置 SAS 令牌,请设置 fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn 属性,如以下代码片段所示:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn",
          sas_token,
      )
      
  2. 运行以下数据整理代码,其格式 wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn/<PATH_TO_DATA>为数据 URI。

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.chinacloudapi.cn/data/wrangled",
        index_col="PassengerId",
    )
    

从 Azure 机器学习数据存储导入和整理数据

若要从 Azure 机器学习数据存储访问数据,请使用 URI 格式azureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>定义数据存储上数据的路径。

运行以下代码示例,使用数据存储 URI 从 Azure 机器学习数据存储读取和整理 azureml:// 数据,pyspark.pandas以及pyspark.ml.feature.Imputer

import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

df = pd.read_csv(
    "azureml://datastores/<DATASTORE_NAME>/paths/data/titanic.csv",
    index_col="PassengerId",
)
imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
    "mean"
)  # Replace missing values in Age column with the mean value
df.fillna(
    value={"Cabin": "None"}, inplace=True
)  # Fill Cabin column with value "None" if missing
df.dropna(inplace=True)  # Drop the rows which still have any missing value
df.to_csv(
    "azureml://datastores/<DATASTORE_NAME>/paths/data/wrangled",
    index_col="PassengerId",
)

Azure 机器学习数据存储可以使用 Azure 存储帐户访问密钥、SAS 令牌、服务主体凭据或无凭据数据访问来访问数据。 根据数据存储类型和基础 Azure 存储帐户类型选择适当的身份验证机制。

下表总结了用于访问 Azure 机器学习数据存储中的数据的身份验证机制:

存储帐户类型 无凭据数据访问 数据访问机制 角色分配
Azure Blob 访问密钥或 SAS 令牌 不需要角色分配。
Azure Blob 用户身份传递* 用户标识应在 Azure Blob 存储帐户中具有 适当的角色分配
Azure Data Lake Storage 服务主体 服务主体应在 Azure Data Lake Storage 存储帐户中具有 适当的角色分配
Azure Data Lake Storage 用户标识传递 用户标识应在 Azure Data Lake Storage 存储帐户中具有 适当的角色分配

* 仅当未启用 软删除 时,用户标识直通才适用于指向 Azure Blob 存储帐户的无凭据数据存储。

访问默认文件共享上的数据

在 Azure 机器学习工作室中,默认工作区文件共享是笔记本“文件”选项卡下的目录树。 笔记本代码可以直接使用 file:// 协议访问存储在此文件共享中的文件,并使用文件的绝对路径而无需其他配置。 默认文件共享挂载到无服务器 Spark 计算和附加的 Synapse Spark 池。

显示使用文件共享的屏幕截图。

以下代码片段访问和整理存储在默认文件共享上直接位于用户名下的data文件夹中的titanic.csv文件的数据。 将 <USER> 占位符替换为用户名。

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")