适用范围: NoSQL
Azure Cosmos DB Spark 连接器提供了使用 Apache Spark 大规模处理更改源的强大方法。 连接器使用下面的 Java SDK,并实现 一个拉取模型 ,该模型以透明方式在 Spark 执行器之间分布处理,使其成为大规模数据处理方案的最佳选择。
Spark 连接器的工作原理
适用于 Azure Cosmos DB 的 Spark 连接器基于 Azure Cosmos DB Java SDK 构建,并实现用于读取更改源的拉取模型方法。 主要特征包括:
- Java SDK 基础:使用下面的可靠 Azure Cosmos DB Java SDK 进行可靠的更改源处理
- 拉取模型实现:遵循 更改源拉取模型 模式,让你控制处理速度
- 分布式处理:自动跨多个 Spark 执行程序分发更改源处理,以便进行并行处理
- 透明缩放:连接器无需手动干预即可处理分区和负载分布
唯一检查点功能
使用 Spark 连接器进行更改源处理的主要优点之一是其内置的检查点机制。 此功能提供:
- 自动恢复:大规模处理更改源时用于恢复的现用机制
- 容错:在发生故障时,能够从最后一个检查点恢复处理
- 状态管理:在 Spark 会话和群集重启之间保持处理状态
- 可伸缩性:支持跨分布式 Spark 环境的检查点
此检查点功能对于 Spark 连接器是唯一的,在直接使用 SDK 时不可用,因此对于需要高可用性和可靠性的生产方案尤其有用。
警告
如果检查点位置中存在现有书签,则 spark.cosmos.changeFeed.startFrom
忽略配置。 从检查点恢复时,连接器将从上次处理的位置(而不是指定的起点)继续。
何时使用 Spark 进行更改源处理
请考虑在这些情况下使用 Spark 连接器进行更改源处理:
- 大规模数据处理:需要处理大量超过单台计算机功能的更改源数据
- 复杂转换:当更改源处理涉及复杂的数据转换、聚合或与其他数据集联接时
- 分布式分析:在分布式环境中对更改源数据执行实时或准实时分析时
- 与数据管道集成:更改源处理是已使用 Spark 的大型 ETL/ELT 管道的一部分
- 容错要求:需要可靠的检查点和生产工作负荷的恢复机制
- 多容器处理:需要同时处理来自多个容器的更改源
对于更简单的方案,或者需要对单个文档处理进行精细控制时,请考虑直接将 更改源处理器 或 拉取模型 与 SDK 配合使用。
代码示例
以下示例演示如何使用 Spark 连接器从更改源中读取数据。 有关更全面的示例,请参阅完整的示例笔记本:
- Python 结构化流式处理示例 - NYC 出租车数据处理与更改源
- Scala 容器迁移示例 - 使用更改源进行实时容器迁移
# Configure change feed reading
changeFeedConfig = {
"spark.cosmos.accountEndpoint": "https://<account-name>.documents.azure.cn:443/",
"spark.cosmos.accountKey": "<account-key>",
"spark.cosmos.database": "<database-name>",
"spark.cosmos.container": "<container-name>",
# Start from beginning, now, or specific timestamp (ignored if checkpoints exist)
"spark.cosmos.changeFeed.startFrom": "Beginning", # "Now" or "2020-02-10T14:15:03"
"spark.cosmos.changeFeed.mode": "LatestVersion", # or "AllVersionsAndDeletes"
# Control batch size - if not set, all available data processed in first batch
"spark.cosmos.changeFeed.itemCountPerTriggerHint": "50000",
"spark.cosmos.read.partitioning.strategy": "Restrictive"
}
# Read change feed as a streaming DataFrame
changeFeedDF = spark \
.readStream \
.format("cosmos.oltp.changeFeed") \
.options(**changeFeedConfig) \
.load()
# Configure output settings with checkpointing
outputConfig = {
"spark.cosmos.accountEndpoint": "https://<target-account>.documents.azure.cn:443/",
"spark.cosmos.accountKey": "<target-account-key>",
"spark.cosmos.database": "<target-database>",
"spark.cosmos.container": "<target-container>",
"spark.cosmos.write.strategy": "ItemOverwrite"
}
# Process and write the change feed data with checkpointing
query = changeFeedDF \
.selectExpr("*") \
.writeStream \
.format("cosmos.oltp") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/changefeed-checkpoint") \
.options(**outputConfig) \
.start()
# Wait for the streaming query to finish
query.awaitTermination()
密钥配置选项
在 Spark 中使用更改源时,这些配置选项尤为重要:
-
spark.cosmos.changeFeed.startFrom:控制开始读取更改源的位置
-
"Beginning"
- 从更改源的开头开始 -
"Now"
- 从当前时间开始 -
"2020-02-10T14:15:03"
- 从特定时间戳开始(ISO 8601 格式) - 注意:如果检查点位置中存在现有书签,则会忽略此设置
-
-
spark.cosmos.changeFeed.mode:指定更改源模式
-
"LatestVersion"
- 仅处理最新版本的已更改文档 -
"AllVersionsAndDeletes"
- 处理所有版本的更改,包括删除
-
-
spark.cosmos.changeFeed.itemCountPerTriggerHint:控制批处理大小
- 每个微批/触发器的更改源读取的最大项数
- 示例:
"50000"
- 重要提示:如果未设置,更改源中的所有可用数据将在第一个微批处理中进行处理
- checkpointLocation:指定在何处存储容错和恢复的检查点信息
- spark.cosmos.read.partitioning.strategy:控制跨 Spark 执行程序对数据进行分区的方式
后续步骤
- 详细了解 更改源设计模式
- 浏览 更改源拉取模型
- 了解单台计算机方案的更改源处理器
- 查看 Spark 连接器文档 以获取其他配置选项
- 查看不同处理方案的更改源模式