教程:使用 Spark OLTP 连接器从 Delta Lake 反向提取、转换和加载(ETL)到 Azure Cosmos DB for NoSQL

在本教程中,将反向 ETL 管道设置为将扩充数据从 Azure Databricks 中的 Delta 表移动到 Azure Cosmos DB for NoSQL。 然后使用 Azure Cosmos DB for NoSQL 的联机事务处理 (OLTP) Spark 连接器来同步数据。

反向 ETL 管道设置的先决条件

使用 Microsoft Entra 配置基于角色的访问控制

Azure 托管标识确保 Azure Cosmos DB for NoSQL 的安全无密码身份验证,而无需手动管理凭据。 在此先决条件步骤中,设置一个用户分配的托管标识,此标识由 Azure Databricks 自动创建,具有对元数据的读取权限和对数据的写入权限,以便访问您的 Azure Cosmos DB for NoSQL 帐户。 此步骤为托管标识配置控制和数据平面的基于角色的访问控制角色。

  1. 登录到 Azure 门户 (https://portal.azure.cn)。

  2. 导航到现有的 Azure Databricks 资源。

  3. “概要” 窗格中,找到并导航到与工作区关联的托管资源组。

  4. 在受管理资源组中,选择使用工作区自动创建的用户分配的托管标识。

  5. “概要”窗格中记录客户端 ID对象(主体)ID 字段的值。 稍后使用此值来分配控制和数据平面角色。

    小窍门

    也可使用 Azure CLI 获取托管标识的主体 ID。 假设托管标识的名称是 dbmanagedidentity,请使用 az resource show 命令获取主体 ID。

    az resource show \
        --resource-group "<name-of-managed-resource-group>" \
        --name "dbmanagedidentity" \
        --resource-type "Microsoft.ManagedIdentity/userAssignedIdentities" \
        --query "{clientId: properties.clientId, principalId: properties.principalId}"
    
  6. 导航到目标 Azure Cosmos DB for NoSQL 帐户。

  7. 在帐户的页面上,选择“访问控制”(IAM)。

  8. “访问控制 ”窗格中,选择 “添加” ,然后选择 “添加角色分配 ”选项,开始将控制平面角色分配给用户分配的托管标识的过程。

  9. 在要分配的角色列表中选择 Cosmos DB 帐户读取者 角色。

  10. 在用于分配对 用户、组或服务主体 的访问权限的部分中,使用 选择成员 选项进行交互。

  11. 在成员对话框中,输入主体 ID 以筛选与 Azure Databricks 关联的用户分配的托管标识。 选择该标识。

  12. 最后,选择“ 查看 + 分配 ”以创建控制平面角色分配。

  13. 使用 az cosmosdb sql role assignment create 命令将“Cosmos DB Built-in Data Contributor”数据平面角色和 / 范围分配给与 Azure Databricks 关联的用户分配的托管标识。

    az cosmosdb sql role assignment create \
        --resource-group "<name-of-resource-group>" \
        --account-name "<name-of-cosmos-nosql-account>" \
        --principal-id "<managed-identity-principal-id>" \
        --role-definition-name "Cosmos DB Built-in Data Contributor" \ --scope "/"
    
  14. 使用 az account show 来获取订阅和租户标识符。 在后续步骤中使用 Spark 连接器使用 Microsoft Entra 身份验证时需要这些值。

    az account show --query '{subscriptionId: id, tenantId: tenantId}'
    

创建 Databricks 笔记本

  1. 导航到现有的 Azure Databricks 资源,然后打开工作区 UI。

  2. 如果还没有群集,请创建新群集。

    重要

    确保群集的运行时版本为15.4或更高,以获得对Spark 3.5.0和Scala 2.12的长期支持。 本指南中的剩余步骤假定这些版本的工具。

  3. 导航到“安装新>>”和“Maven”以安装 Maven 包。

  4. 使用“组 ID”筛选器 com.azure.cosmos.spark 并选择“项目 ID”azure-cosmos-spark_3-5_2-12 的包来搜索 Azure Cosmos DB for NoSQL 的 Spark 连接器。

  5. 导航到工作区>[文件夹]>,然后选择新建>笔记本创建新笔记本。

  6. 将笔记本附加到群集。

在 Azure Databricks 中配置 Spark 连接器

配置 Spark 连接器,以使用 Microsoft Entra 身份验证连接到帐户的容器。 另外,请将连接器配置为仅对 Spark 操作使用有限的吞吐量阈值。 要配置 Spark 连接器,请定义一个包含凭据的配置字典,以便连接到您的帐户。 这些凭据包括:

价值
spark.cosmos.accountEndpoint NoSQL 帐户终结点
spark.cosmos.database 目标数据库的名称
spark.cosmos.container 目标容器的名称
spark.cosmos.auth.type ManagedIdentity
spark.cosmos.auth.aad.clientId 用户分配的托管标识的“客户端 ID”
spark.cosmos.account.subscriptionId 订阅的 ID
spark.cosmos.account.tenantId 关联的 Microsoft Entra 租户的 ID
spark.cosmos.account.resourceGroupName 资源组的名称
spark.cosmos.throughputControl.enabled true
spark.cosmos.throughputControl.name TargetContainerThroughputControl
spark.cosmos.throughputControl.targetThroughputThreshold 0.30
spark.cosmos.throughputControl.globalControl.useDedicatedContainer false
cosmos_config = {
    # General settings
    "spark.cosmos.accountEndpoint": "<endpoint>",
    "spark.cosmos.database": "products",
    "spark.cosmos.container": "recommendations",
    # Entra authentication settings
    "spark.cosmos.auth.type": "ManagedIdentity",
    "spark.cosmos.account.subscriptionId": "<subscriptionId>",
    "spark.cosmos.account.tenantId": "<tenantId>",
    "spark.cosmos.account.resourceGroupName": "<resourceGroupName>",
    # Throughput control settings
    "spark.cosmos.throughputControl.enabled": "true",
    "spark.cosmos.throughputControl.name": "TargetContainerThroughputControl",
    "spark.cosmos.throughputControl.targetThroughputThreshold": "0.30",
    "spark.cosmos.throughputControl.globalControl.useDedicatedContainer": "false",
}
val cosmosconfig = Map(
  // General settings
  "spark.cosmos.accountEndpoint" -> "<endpoint>",
  "spark.cosmos.database" -> "products",
  "spark.cosmos.container" -> "recommendations",
  // Entra authentication settings
  "spark.cosmos.auth.type" -> "ManagedIdentity",
  "spark.cosmos.account.subscriptionId" -> "<subscriptionId>",
  "spark.cosmos.account.tenantId" -> "<tenantId>",
  "spark.cosmos.account.resourceGroupName" -> "<resourceGroupName>",
  // Throughput control settings
  "spark.cosmos.throughputControl.enabled" -> "true",
  "spark.cosmos.throughputControl.name" -> "TargetContainerThroughputControl",
  "spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.30",
  "spark.cosmos.throughputControl.globalControl.useDedicatedContainer" -> "false"
)

注释

在此示例中,将命名 products 目标数据库,并命名 recommendations目标容器。

此步骤中指定的吞吐量配置可确保分配给目标容器的请求单位 (RU) 中仅有 30% 可用于 Spark 操作。

将示例产品建议数据引入 Delta 表

创建一个示例 DataFrame,其中包含用户的产品建议信息,并将其写入名为 recommendations_deltaDelta 表。 此步骤模拟你打算同步到 Azure Cosmos DB for NoSQL 的数据湖中的特选转换数据。 写入到 Delta 格式可确保稍后可以启用变更数据捕获 (CDC) 进行增量同步。

from pyspark.sql import SparkSession

# Create sample data and convert it to a DataFrame
df = spark.createDataFrame([
    ("yara-lima", "Full-Finger Gloves", "clothing-gloves", 80),
    ("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 90)
], ["id", "productname", "category", "recommendationscore"])

# Write the DataFrame to a Delta table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
// Create sample data as a sequence and convert it to a DataFrame
val df = Seq(
  ("yara-lima", "Full-Finger Gloves", "clothing-gloves", 12.95),
  ("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 19.99)
).toDF("id", "productname", "category", "recommendationscore") 

// Write the DataFrame to a table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")

将初始数据批量加载到 Azure Cosmos DB for NoSQL

接下来,将 recommendations_delta Delta 表读入 Spark 数据帧,并使用 cosmos.oltp 格式执行对 Azure Cosmos DB for NoSQL 的初始批量写入。 使用 追加 模式添加数据,而无需覆盖目标数据库和容器中的现有内容。 此步骤可确保在 CDC 开始之前帐户中提供所有历史数据。

# Read the Delta table into a DataFrame
df_delta = spark.read.format("delta").table("recommendations_delta")

# Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Read the Delta table into a DataFrame
val df_delta = spark.read.format("delta").table("recommendations_delta")

// Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(cosmosconfig).save()

使用变更数据馈送启用流式处理同步

recommendations_delta 表上启用 Delta Lake 的更改数据馈送(CDF)功能,方法是更改该表的属性。 CDF 允许 Delta Lake 跟踪将来的所有行级插入、更新和删除。 启用此属性对于执行到 Azure Cosmos DB for NoSQL 的增量同步至关重要,因为它公开更改而无需比较快照。

进行历史数据加载后,可以使用 Delta 变更数据馈送 (CDF) 捕获 Delta 表中的更改。 可以实现基于批处理的或基于流式传输的 CDC。

# Enable Change Data Feed (CDF)
spark.sql("""
  ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Read the Change Data Capture (CDC) data from the Delta table
cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")

# Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Enable Change Data Feed (CDF)
spark.sql("""
  ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

// Read the Change Data Capture (CDC) data from the Delta table
val cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")

// Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(cosmos_config).save()

使用 NoSQL 查询验证数据

写入 Azure Cosmos DB for NoSQL 后,请使用同一帐户配置将其查询回 Spark 来验证数据。 然后;检查 Delta Lake 中引入的数据、运行验证或与其他数据集联接以进行分析或报告。 Azure Cosmos DB for NoSQL 支持快速索引读取,实现实时查询性能。

# Load DataFrame
df_cosmos = spark.read.format("cosmos.oltp").options(**cosmos_config).load()

# Run query
df_cosmos.select("id", "productname", "category", "recommendationscore").show()
// Load DataFrame
val dfCosmos = spark.read.format("cosmos.oltp").options(cosmosConfig).load()

// Run query
dfCosmos.select("id", "productname", "category", "recommendationscore").show()