共用方式為

使用 Apache Spark 更改源

适用范围: NoSQL

Azure Cosmos DB Spark 连接器提供了使用 Apache Spark 大规模处理更改源的强大方法。 连接器使用下面的 Java SDK,并实现 一个拉取模型 ,该模型以透明方式在 Spark 执行器之间分布处理,使其成为大规模数据处理方案的最佳选择。

Spark 连接器的工作原理

适用于 Azure Cosmos DB 的 Spark 连接器基于 Azure Cosmos DB Java SDK 构建,并实现用于读取更改源的拉取模型方法。 主要特征包括:

  • Java SDK 基础:使用下面的可靠 Azure Cosmos DB Java SDK 进行可靠的更改源处理
  • 拉取模型实现:遵循 更改源拉取模型 模式,让你控制处理速度
  • 分布式处理:自动跨多个 Spark 执行程序分发更改源处理,以便进行并行处理
  • 透明缩放:连接器无需手动干预即可处理分区和负载分布

唯一检查点功能

使用 Spark 连接器进行更改源处理的主要优点之一是其内置的检查点机制。 此功能提供:

  • 自动恢复:大规模处理更改源时用于恢复的现用机制
  • 容错:在发生故障时,能够从最后一个检查点恢复处理
  • 状态管理:在 Spark 会话和群集重启之间保持处理状态
  • 可伸缩性:支持跨分布式 Spark 环境的检查点

此检查点功能对于 Spark 连接器是唯一的,在直接使用 SDK 时不可用,因此对于需要高可用性和可靠性的生产方案尤其有用。

警告

如果检查点位置中存在现有书签,则 spark.cosmos.changeFeed.startFrom 忽略配置。 从检查点恢复时,连接器将从上次处理的位置(而不是指定的起点)继续。

何时使用 Spark 进行更改源处理

请考虑在这些情况下使用 Spark 连接器进行更改源处理:

  • 大规模数据处理:需要处理大量超过单台计算机功能的更改源数据
  • 复杂转换:当更改源处理涉及复杂的数据转换、聚合或与其他数据集联接时
  • 分布式分析:在分布式环境中对更改源数据执行实时或准实时分析时
  • 与数据管道集成:更改源处理是已使用 Spark 的大型 ETL/ELT 管道的一部分
  • 容错要求:需要可靠的检查点和生产工作负荷的恢复机制
  • 多容器处理:需要同时处理来自多个容器的更改源

对于更简单的方案,或者需要对单个文档处理进行精细控制时,请考虑直接将 更改源处理器拉取模型 与 SDK 配合使用。

代码示例

以下示例演示如何使用 Spark 连接器从更改源中读取数据。 有关更全面的示例,请参阅完整的示例笔记本:

# Configure change feed reading
changeFeedConfig = {
    "spark.cosmos.accountEndpoint": "https://<account-name>.documents.azure.cn:443/",
    "spark.cosmos.accountKey": "<account-key>",
    "spark.cosmos.database": "<database-name>",
    "spark.cosmos.container": "<container-name>",
    # Start from beginning, now, or specific timestamp (ignored if checkpoints exist)
    "spark.cosmos.changeFeed.startFrom": "Beginning",  # "Now" or "2020-02-10T14:15:03"
    "spark.cosmos.changeFeed.mode": "LatestVersion",  # or "AllVersionsAndDeletes"
    # Control batch size - if not set, all available data processed in first batch
    "spark.cosmos.changeFeed.itemCountPerTriggerHint": "50000",
    "spark.cosmos.read.partitioning.strategy": "Restrictive"
}

# Read change feed as a streaming DataFrame
changeFeedDF = spark \
    .readStream \
    .format("cosmos.oltp.changeFeed") \
    .options(**changeFeedConfig) \
    .load()

# Configure output settings with checkpointing
outputConfig = {
    "spark.cosmos.accountEndpoint": "https://<target-account>.documents.azure.cn:443/",
    "spark.cosmos.accountKey": "<target-account-key>",
    "spark.cosmos.database": "<target-database>",
    "spark.cosmos.container": "<target-container>",
    "spark.cosmos.write.strategy": "ItemOverwrite"
}

# Process and write the change feed data with checkpointing
query = changeFeedDF \
    .selectExpr("*") \
    .writeStream \
    .format("cosmos.oltp") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/changefeed-checkpoint") \
    .options(**outputConfig) \
    .start()

# Wait for the streaming query to finish
query.awaitTermination()

密钥配置选项

在 Spark 中使用更改源时,这些配置选项尤为重要:

  • spark.cosmos.changeFeed.startFrom:控制开始读取更改源的位置
    • "Beginning" - 从更改源的开头开始
    • "Now" - 从当前时间开始
    • "2020-02-10T14:15:03" - 从特定时间戳开始(ISO 8601 格式)
    • 注意:如果检查点位置中存在现有书签,则会忽略此设置
  • spark.cosmos.changeFeed.mode:指定更改源模式
    • "LatestVersion" - 仅处理最新版本的已更改文档
    • "AllVersionsAndDeletes" - 处理所有版本的更改,包括删除
  • spark.cosmos.changeFeed.itemCountPerTriggerHint:控制批处理大小
    • 每个微批/触发器的更改源读取的最大项数
    • 示例: "50000"
    • 重要提示:如果未设置,更改源中的所有可用数据将在第一个微批处理中进行处理
  • checkpointLocation:指定在何处存储容错和恢复的检查点信息
  • spark.cosmos.read.partitioning.strategy:控制跨 Spark 执行程序对数据进行分区的方式

后续步骤