从流式处理检查点失败中恢复 Lakeflow 声明性管道

本页介绍如何在流式处理检查点失效或损坏时恢复 Lakeflow 声明性管道中的管道。

什么是流式处理检查点?

在 Apache Spark 结构化流式处理中,检查点是用于保存流式处理查询状态的机制。 此状态包括:

  • 进度信息:已处理与源的偏移量。
  • 中间状态:需要跨微批处理维护的数据进行有状态作(例如聚合)。 mapGroupsWithState
  • 元数据:有关流式处理查询执行的信息。

检查点对于确保流式处理应用程序中的容错和数据一致性至关重要:

  • 容错:如果流式处理应用程序失败(例如,由于节点故障、应用程序崩溃),检查点允许应用程序从上次成功的检查点状态重启,而不是从头开始重新处理所有数据。 这可以防止数据丢失并确保增量处理。
  • 恰好一次处理:对于许多流式处理源、检查点以及幂等接收器,启用一次恰好处理可确保每条记录完全处理一次,即使面对故障,也防止重复或遗漏。
  • 状态管理:对于有状态转换,检查点会保留这些作的内部状态,使流式处理查询能够根据累积的历史状态正确继续处理新数据。

Lakeflow 声明性管道中的检查点

Lakeflow 声明性管道构建在结构化流式处理上,并将大部分基础检查点管理抽象化,提供更声明性的方法。 在管道中定义流式处理表时,每个流都会有一个检查点状态,用于写入流式处理表。 这些检查点位置位于管道内部,用户无法访问。

通常不需要管理或了解流式处理表的基础检查点,但以下情况除外:

  • 回退和重播:如果要在保留表的当前状态时重新处理特定时间点的数据,则必须重置流式处理表的检查点。
  • 从检查点失败或损坏中恢复:如果写入流式处理表的查询由于检查点相关错误而失败,则会导致硬故障,并且查询无法进一步进行。 有三种方法可用于从此类故障中恢复:
    • 完整表刷新:这会重置表并擦除现有数据。
    • 使用备份和回填进行完整表刷新:在执行完整表刷新和回填旧数据之前,先执行表的备份,但这非常昂贵,应该是最后的手段。
    • 重置检查点并继续:如果负担不起丢失现有数据,则必须对受影响的流流执行选择性检查点重置。

示例:由于代码更改而导致管道失败

假设有一个 Lakeflow 声明性管道,该管道处理更改数据馈送以及云存储系统(如 Amazon S3)的初始表快照,以及写入 SCD-1 流式处理表的情况。

管道有两个流式处理流:

  • customers_incremental_flow:以增量方式读取 customer 源表 CDC 源、筛选出重复记录,并将这些记录向上插入到目标表中。
  • customers_snapshot_flow:一次性读取源表的初始快照 customers ,并将记录向上插入目标表。

Lakeflow 声明性管道 CDC 示例

@dp.temporary_view(name="customers_incremental_view")
  def query():
    return (
    spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.includeExistingFiles", "true")
        .load(customers_incremental_path)
        .dropDuplicates(["customer_id"])
    )

@dp.temporary_view(name="customers_snapshot_view")
def full_orders_snapshot():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(customers_snapshot_path)
        .select("*")
    )

dp.create_streaming_table("customers")

dp.create_auto_cdc_flow(
    flow_name = "customers_incremental_flow",
    target = "customers",
    source = "customers_incremental_view",
    keys = ["customer_id"],
    sequence_by = col("sequenceNum"),
    apply_as_deletes = expr("operation = 'DELETE'"),
    apply_as_truncates = expr("operation = 'TRUNCATE'"),
    except_column_list = ["operation", "sequenceNum"],
    stored_as_scd_type = 1
)
dp.create_auto_cdc_flow(
    flow_name = "customers_snapshot_flow",
    target = "customers",
    source = "customers_snapshot_view",
    keys = ["customer_id"],
    sequence_by = lit(0),
    stored_as_scd_type = 1,
    once = True
)

部署此管道后,它将成功运行,并开始处理更改数据馈送和初始快照。

稍后,你意识到查询中的 customers_incremental_view 重复数据删除逻辑是冗余的,并导致性能瓶颈。 dropDuplicates()删除以提高性能:

@dp.temporary_view(name="customers_raw_view")
  def query():
    return (
    spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.includeExistingFiles", "true")
        .load()
        # .dropDuplicates()
    )

删除 dropDuplicates() API 并重新部署管道后,更新失败并出现以下错误:

Streaming stateful operator name does not match with the operator in state metadata.
This is likely to happen when a user adds/removes/changes stateful operators of existing streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)];
Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST

此错误表示,由于检查点状态与当前查询定义不匹配,不允许更改,从而阻止管道进一步进行。

检查点相关的故障可能会因各种原因而发生,而不仅仅是删除 dropDuplicates API。 常见方案包括:

  • 在现有流式处理查询中添加或删除有状态运算符(例如引入或删除 dropDuplicates() 或聚合)。
  • 在以前检查点的查询中添加、删除或组合流式处理源(例如,将现有流式处理查询与新查询合并,或者从现有联合作中添加/删除源)。
  • 修改有状态流式处理作的状态架构(例如更改用于删除重复数据或聚合的列)。

有关受支持和不支持的更改的综合列表,请参阅结构化流式处理查询中的Spark 结构化流式处理指南和更改类型。

恢复选项

有三种恢复策略,具体取决于数据持续性要求和资源约束:

Methods 复杂性 成本 可能丢失数据 潜在的数据重复 需要初始快照 完整表重置
完整表刷新 Low 中等 是(如果没有初始快照可用,或者原始文件已在源中删除)。 否(对于应用更改目标表)。 是的 是的
备份和回填的完整表刷新 中等 High 否 (对于幂等接收器。例如,自动 CDC。)
重置表检查点 Medium-High(仅追加源的中,提供不可变偏移量)。 Low 不(需要仔细考虑)。 不 (对于幂等作家。例如,仅将 CDC 自动连接到目标表。

Medium-High 复杂性取决于流式处理源类型和查询的复杂性。

Recommendations

  • 如果不想处理检查点重置的复杂性,则可以重新计算整个表,请使用完整表刷新。 这还会提供一个选项来更改代码。
  • 如果不想处理检查点重置的复杂性,则可以对备份和回填使用完整表刷新,并且可以承担备份和回填历史数据的额外成本。
  • 如果必须保留表中的现有数据并继续以增量方式处理新数据,请使用重置表检查点。 但是,此方法需要仔细处理检查点重置,以检查表中的现有数据是否不丢失,并且管道可以继续处理新数据。

重置检查点并继续增量

若要重置检查点并继续以增量方式进行处理,请执行以下步骤:

  1. 停止管道:确保管道没有正在运行的活动更新。

  2. 确定新检查点的起始位置:确定要继续处理的最后一个成功偏移量或时间戳。 这通常是在发生故障之前成功处理的最新偏移量。

    在上面的示例中,由于使用自动加载程序读取 JSON 文件,因此可以使用 modifiedAfter 选项来指定新检查点的起始位置。 使用此选项可以设置自动加载程序何时开始处理新文件的时间戳。

    对于 Kafka 源,可以使用 startingOffsets 此选项指定流式处理查询应开始处理新数据的偏移量。

    对于 Delta Lake 源,可以使用 startingVersion 此选项指定流式处理查询应从中开始处理新数据的版本。

  3. 进行代码更改:可以修改流式处理查询以删除 dropDuplicates() API 或进行其他更改。 此外,请检查是否已将 modifiedAfter 选项添加到自动加载程序读取路径。

    @dp.temporary_view(name="customers_incremental_view")
    def query():
        return (
        spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.includeExistingFiles", "true")
            .option("modifiedAfter", "2025-04-09T06:15:00")
            .load(customers_incremental_path)
            # .dropDuplicates(["customer_id"])
        )
    

    注释

    提供不正确的 modifiedAfter 时间戳可能会导致数据丢失或重复。 检查时间戳是否已正确设置,以避免再次处理旧数据或缺少新数据。

    如果查询具有流式联接或流式传输联合,则必须对所有参与的流源应用上述策略。 例如:

    cdc_1 = spark.readStream.format("cloudFiles")...
    cdc_2 = spark.readStream.format("cloudFiles")...
    cdc_source = cdc_1..union(cdc_2)
    
  4. 标识与要为其重置检查点的流式处理表关联的流名称。 在此示例中,它是 customers_incremental_flow. 可以在管道代码中或通过检查管道 UI 或管道事件日志来查找流名称。

  5. 重置检查点:创建 Python 笔记本并将其附加到 Azure Databricks 群集。

    需要以下信息才能重置检查点:

    • Azure Databricks 工作区 URL
    • 管道 ID
    • 要为其重置检查点的流名称
    import requests
    import json
    
    # Define your Databricks instance and pipeline ID
    databricks_instance = "<DATABRICKS_URL>"
    token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
    pipeline_id = "<YOUR_PIPELINE_ID>"
    flows_to_reset = ["<YOUR_FLOW_NAME>"]
    # Set up the API endpoint
    endpoint = f"{databricks_instance}/api/2.0/pipelines/{pipeline_id}/updates"
    
    # Set up the request headers
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }
    
    # Define the payload
    payload = {
        "reset_checkpoint_selection": flows_to_reset
    }
    
    # Make the POST request
    response = requests.post(endpoint, headers=headers, data=json.dumps(payload))
    
    # Check the response
    if response.status_code == 200:
        print("Pipeline update started successfully.")
    else:
        print(f"Error: {response.status_code}, {response.text}")
    
  6. 运行管道:管道使用新的检查点从指定的起始位置开始处理新数据,同时保留现有表数据,同时继续增量处理。

最佳做法

  • 避免在生产中使用个人预览版功能。
  • 在生产环境中进行更改之前测试更改。
    • 理想情况下,在较低环境中创建测试管道。 如果无法执行此作,请尝试对测试使用不同的目录和架构。
    • 重现错误。
    • 应用更改。
    • 验证结果并做出决定,no-go。
    • 对生产管道进行更改。