Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Azure Cosmos DB Spark 连接器提供了使用 Apache Spark 大规模处理更改源的强大方法。 连接器使用下面的 Java SDK,并实现 一个拉取模型 ,该模型以透明方式在 Spark 执行器之间分布处理,使其成为大规模数据处理方案的最佳选择。
Spark 连接器的工作原理
适用于 Azure Cosmos DB 的 Spark 连接器基于 Azure Cosmos DB Java SDK 构建,并实现用于读取更改源的拉取模型方法。 主要特征包括:
- Java SDK 基础:使用下面的可靠 Azure Cosmos DB Java SDK 进行可靠的更改源处理
- 拉取模型实现:遵循 更改源拉取模型 模式,让你控制处理速度
- 分布式处理:自动跨多个 Spark 执行程序分发更改源处理,以便进行并行处理
- 透明缩放:连接器无需手动干预即可处理分区和负载分布
唯一检查点功能
使用 Spark 连接器进行更改源处理的主要优点之一是其内置的检查点机制。 此功能提供:
- 自动恢复:大规模处理更改源时用于恢复的现用机制
- 容错:在发生故障时,能够从最后一个检查点恢复处理
- 状态管理:在 Spark 会话和群集重启之间保持处理状态
- 可伸缩性:支持跨分布式 Spark 环境的检查点
此检查点功能对于 Spark 连接器是唯一的,在直接使用 SDK 时不可用,因此对于需要高可用性和可靠性的生产方案尤其有用。
警告
如果检查点位置中存在现有书签,则 spark.cosmos.changeFeed.startFrom 忽略配置。 从检查点恢复时,连接器将从上次处理的位置(而不是指定的起点)继续。
何时使用 Spark 进行更改源处理
请考虑在这些情况下使用 Spark 连接器进行更改源处理:
- 大规模数据处理:需要处理大量超过单台计算机功能的更改源数据
- 复杂转换:当更改源处理涉及复杂的数据转换、聚合或与其他数据集联接时
- 分布式分析:在分布式环境中对更改源数据执行实时或准实时分析时
- 与数据管道集成:更改源处理是已使用 Spark 的大型 ETL/ELT 管道的一部分
- 容错要求:需要可靠的检查点和生产工作负荷的恢复机制
- 多容器处理:需要同时处理来自多个容器的更改源
对于更简单的方案,或者需要对单个文档处理进行精细控制时,请考虑直接将 更改源处理器 或 拉取模型 与 SDK 配合使用。
代码示例
以下示例演示如何使用 Spark 连接器从更改源中读取数据。 有关更全面的示例,请参阅完整的示例笔记本:
- Python 结构化流式处理示例 - NYC 出租车数据处理与更改源
- Scala 容器迁移示例 - 使用更改源进行实时容器迁移
# Configure change feed reading
changeFeedConfig = {
"spark.cosmos.accountEndpoint": "https://<account-name>.documents.azure.cn:443/",
"spark.cosmos.accountKey": "<account-key>",
"spark.cosmos.database": "<database-name>",
"spark.cosmos.container": "<container-name>",
# Start from beginning, now, or specific timestamp (ignored if checkpoints exist)
"spark.cosmos.changeFeed.startFrom": "Beginning", # "Now" or "2020-02-10T14:15:03"
"spark.cosmos.changeFeed.mode": "LatestVersion", # or "AllVersionsAndDeletes"
# Control batch size - if not set, all available data processed in first batch
"spark.cosmos.changeFeed.itemCountPerTriggerHint": "50000",
"spark.cosmos.read.partitioning.strategy": "Restrictive"
}
# Read change feed as a streaming DataFrame
changeFeedDF = spark \
.readStream \
.format("cosmos.oltp.changeFeed") \
.options(**changeFeedConfig) \
.load()
# Configure output settings with checkpointing
outputConfig = {
"spark.cosmos.accountEndpoint": "https://<target-account>.documents.azure.cn:443/",
"spark.cosmos.accountKey": "<target-account-key>",
"spark.cosmos.database": "<target-database>",
"spark.cosmos.container": "<target-container>",
"spark.cosmos.write.strategy": "ItemOverwrite"
}
# Process and write the change feed data with checkpointing
query = changeFeedDF \
.selectExpr("*") \
.writeStream \
.format("cosmos.oltp") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/changefeed-checkpoint") \
.options(**outputConfig) \
.start()
# Wait for the streaming query to finish
query.awaitTermination()
密钥配置选项
在 Spark 中使用更改源时,这些配置选项尤为重要:
-
spark.cosmos.changeFeed.startFrom:控制开始读取更改源的位置
-
"Beginning"- 从更改源的开头开始 -
"Now"- 从当前时间开始 -
"2020-02-10T14:15:03"- 从特定时间戳开始(ISO 8601 格式) - 注意:如果检查点位置中存在现有书签,则会忽略此设置
-
-
spark.cosmos.changeFeed.mode:指定更改源模式
-
"LatestVersion"- 仅处理最新版本的已更改文档 -
"AllVersionsAndDeletes"- 处理所有版本的更改,包括删除
-
-
spark.cosmos.changeFeed.itemCountPerTriggerHint:控制批处理大小
- 每个微批/触发器的更改源读取的最大项数
- 示例:
"50000" - 重要提示:如果未设置,更改源中的所有可用数据将在第一个微批处理中进行处理
- checkpointLocation:指定在何处存储容错和恢复的检查点信息
- spark.cosmos.read.partitioning.strategy:控制跨 Spark 执行程序对数据进行分区的方式
后续步骤
- 详细了解 更改源设计模式
- 浏览 更改源拉取模型
- 了解单台计算机方案的更改源处理器
- 查看 Spark 连接器文档 以获取其他配置选项
- 查看不同处理方案的更改源模式