在本教程中,将反向 ETL 管道设置为将扩充数据从 Azure Databricks 中的 Delta 表移动到 Azure Cosmos DB for NoSQL。 然后使用 Azure Cosmos DB for NoSQL 的联机事务处理 (OLTP) Spark 连接器来同步数据。
反向 ETL 管道设置的先决条件
- 现有的 Azure Cosmos DB 帐户。
- 如果你有 Azure 订阅,请创建一个新帐户。
- 现有的 Azure Databricks 工作区。
- 如果有 Azure 订阅, 请创建新的工作区。
- 最新版本的 Azure CLI。
使用 Microsoft Entra 配置基于角色的访问控制
Azure 托管标识确保 Azure Cosmos DB for NoSQL 的安全无密码身份验证,而无需手动管理凭据。 在此先决条件步骤中,设置一个用户分配的托管标识,此标识由 Azure Databricks 自动创建,具有对元数据的读取权限和对数据的写入权限,以便访问您的 Azure Cosmos DB for NoSQL 帐户。 此步骤为托管标识配置控制和数据平面的基于角色的访问控制角色。
登录到 Azure 门户 (https://portal.azure.cn)。
导航到现有的 Azure Databricks 资源。
在 “概要” 窗格中,找到并导航到与工作区关联的托管资源组。
在受管理资源组中,选择使用工作区自动创建的用户分配的托管标识。
在“概要”窗格中记录客户端 ID 和对象(主体)ID 字段的值。 稍后使用此值来分配控制和数据平面角色。
小窍门
也可使用 Azure CLI 获取托管标识的主体 ID。 假设托管标识的名称是
dbmanagedidentity
,请使用az resource show
命令获取主体 ID。az resource show \ --resource-group "<name-of-managed-resource-group>" \ --name "dbmanagedidentity" \ --resource-type "Microsoft.ManagedIdentity/userAssignedIdentities" \ --query "{clientId: properties.clientId, principalId: properties.principalId}"
导航到目标 Azure Cosmos DB for NoSQL 帐户。
在帐户的页面上,选择“访问控制”(IAM)。
在 “访问控制 ”窗格中,选择 “添加” ,然后选择 “添加角色分配 ”选项,开始将控制平面角色分配给用户分配的托管标识的过程。
在要分配的角色列表中选择 Cosmos DB 帐户读取者 角色。
在用于分配对 用户、组或服务主体 的访问权限的部分中,使用 选择成员 选项进行交互。
在成员对话框中,输入主体 ID 以筛选与 Azure Databricks 关联的用户分配的托管标识。 选择该标识。
最后,选择“ 查看 + 分配 ”以创建控制平面角色分配。
使用
az cosmosdb sql role assignment create
命令将“Cosmos DB Built-in Data Contributor
”数据平面角色和/
范围分配给与 Azure Databricks 关联的用户分配的托管标识。az cosmosdb sql role assignment create \ --resource-group "<name-of-resource-group>" \ --account-name "<name-of-cosmos-nosql-account>" \ --principal-id "<managed-identity-principal-id>" \ --role-definition-name "Cosmos DB Built-in Data Contributor" \ --scope "/"
使用
az account show
来获取订阅和租户标识符。 在后续步骤中使用 Spark 连接器使用 Microsoft Entra 身份验证时需要这些值。az account show --query '{subscriptionId: id, tenantId: tenantId}'
创建 Databricks 笔记本
导航到现有的 Azure Databricks 资源,然后打开工作区 UI。
如果还没有群集,请创建新群集。
重要
确保群集的运行时版本为15.4或更高,以获得对Spark 3.5.0和Scala 2.12的长期支持。 本指南中的剩余步骤假定这些版本的工具。
导航到“安装新>库>”和“Maven”以安装 Maven 包。
使用“组 ID”筛选器
com.azure.cosmos.spark
并选择“项目 ID”为azure-cosmos-spark_3-5_2-12
的包来搜索 Azure Cosmos DB for NoSQL 的 Spark 连接器。导航到工作区>[文件夹]>,然后选择新建>笔记本创建新笔记本。
将笔记本附加到群集。
在 Azure Databricks 中配置 Spark 连接器
配置 Spark 连接器,以使用 Microsoft Entra 身份验证连接到帐户的容器。 另外,请将连接器配置为仅对 Spark 操作使用有限的吞吐量阈值。 要配置 Spark 连接器,请定义一个包含凭据的配置字典,以便连接到您的帐户。 这些凭据包括:
价值 | |
---|---|
spark.cosmos.accountEndpoint |
NoSQL 帐户终结点 |
spark.cosmos.database |
目标数据库的名称 |
spark.cosmos.container |
目标容器的名称 |
spark.cosmos.auth.type |
ManagedIdentity |
spark.cosmos.auth.aad.clientId |
用户分配的托管标识的“客户端 ID” |
spark.cosmos.account.subscriptionId |
订阅的 ID |
spark.cosmos.account.tenantId |
关联的 Microsoft Entra 租户的 ID |
spark.cosmos.account.resourceGroupName |
资源组的名称 |
spark.cosmos.throughputControl.enabled |
true |
spark.cosmos.throughputControl.name |
TargetContainerThroughputControl |
spark.cosmos.throughputControl.targetThroughputThreshold |
0.30 |
spark.cosmos.throughputControl.globalControl.useDedicatedContainer |
false |
cosmos_config = {
# General settings
"spark.cosmos.accountEndpoint": "<endpoint>",
"spark.cosmos.database": "products",
"spark.cosmos.container": "recommendations",
# Entra authentication settings
"spark.cosmos.auth.type": "ManagedIdentity",
"spark.cosmos.account.subscriptionId": "<subscriptionId>",
"spark.cosmos.account.tenantId": "<tenantId>",
"spark.cosmos.account.resourceGroupName": "<resourceGroupName>",
# Throughput control settings
"spark.cosmos.throughputControl.enabled": "true",
"spark.cosmos.throughputControl.name": "TargetContainerThroughputControl",
"spark.cosmos.throughputControl.targetThroughputThreshold": "0.30",
"spark.cosmos.throughputControl.globalControl.useDedicatedContainer": "false",
}
val cosmosconfig = Map(
// General settings
"spark.cosmos.accountEndpoint" -> "<endpoint>",
"spark.cosmos.database" -> "products",
"spark.cosmos.container" -> "recommendations",
// Entra authentication settings
"spark.cosmos.auth.type" -> "ManagedIdentity",
"spark.cosmos.account.subscriptionId" -> "<subscriptionId>",
"spark.cosmos.account.tenantId" -> "<tenantId>",
"spark.cosmos.account.resourceGroupName" -> "<resourceGroupName>",
// Throughput control settings
"spark.cosmos.throughputControl.enabled" -> "true",
"spark.cosmos.throughputControl.name" -> "TargetContainerThroughputControl",
"spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.30",
"spark.cosmos.throughputControl.globalControl.useDedicatedContainer" -> "false"
)
注释
在此示例中,将命名 products
目标数据库,并命名 recommendations
目标容器。
此步骤中指定的吞吐量配置可确保分配给目标容器的请求单位 (RU) 中仅有 30% 可用于 Spark 操作。
将示例产品建议数据引入 Delta 表
创建一个示例 DataFrame,其中包含用户的产品建议信息,并将其写入名为 recommendations_delta
Delta 表。 此步骤模拟你打算同步到 Azure Cosmos DB for NoSQL 的数据湖中的特选转换数据。 写入到 Delta 格式可确保稍后可以启用变更数据捕获 (CDC) 进行增量同步。
from pyspark.sql import SparkSession
# Create sample data and convert it to a DataFrame
df = spark.createDataFrame([
("yara-lima", "Full-Finger Gloves", "clothing-gloves", 80),
("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 90)
], ["id", "productname", "category", "recommendationscore"])
# Write the DataFrame to a Delta table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
// Create sample data as a sequence and convert it to a DataFrame
val df = Seq(
("yara-lima", "Full-Finger Gloves", "clothing-gloves", 12.95),
("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 19.99)
).toDF("id", "productname", "category", "recommendationscore")
// Write the DataFrame to a table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
将初始数据批量加载到 Azure Cosmos DB for NoSQL
接下来,将 recommendations_delta
Delta 表读入 Spark 数据帧,并使用 cosmos.oltp
格式执行对 Azure Cosmos DB for NoSQL 的初始批量写入。 使用 追加 模式添加数据,而无需覆盖目标数据库和容器中的现有内容。 此步骤可确保在 CDC 开始之前帐户中提供所有历史数据。
# Read the Delta table into a DataFrame
df_delta = spark.read.format("delta").table("recommendations_delta")
# Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Read the Delta table into a DataFrame
val df_delta = spark.read.format("delta").table("recommendations_delta")
// Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(cosmosconfig).save()
使用变更数据馈送启用流式处理同步
在 recommendations_delta
表上启用 Delta Lake 的更改数据馈送(CDF)功能,方法是更改该表的属性。 CDF 允许 Delta Lake 跟踪将来的所有行级插入、更新和删除。 启用此属性对于执行到 Azure Cosmos DB for NoSQL 的增量同步至关重要,因为它公开更改而无需比较快照。
进行历史数据加载后,可以使用 Delta 变更数据馈送 (CDF) 捕获 Delta 表中的更改。 可以实现基于批处理的或基于流式传输的 CDC。
# Enable Change Data Feed (CDF)
spark.sql("""
ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# Read the Change Data Capture (CDC) data from the Delta table
cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")
# Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Enable Change Data Feed (CDF)
spark.sql("""
ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
// Read the Change Data Capture (CDC) data from the Delta table
val cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")
// Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(cosmos_config).save()
使用 NoSQL 查询验证数据
写入 Azure Cosmos DB for NoSQL 后,请使用同一帐户配置将其查询回 Spark 来验证数据。 然后;检查 Delta Lake 中引入的数据、运行验证或与其他数据集联接以进行分析或报告。 Azure Cosmos DB for NoSQL 支持快速索引读取,实现实时查询性能。
# Load DataFrame
df_cosmos = spark.read.format("cosmos.oltp").options(**cosmos_config).load()
# Run query
df_cosmos.select("id", "productname", "category", "recommendationscore").show()
// Load DataFrame
val dfCosmos = spark.read.format("cosmos.oltp").options(cosmosConfig).load()
// Run query
dfCosmos.select("id", "productname", "category", "recommendationscore").show()