本页介绍如何在流式处理检查点失效或损坏时恢复 Lakeflow 声明性管道中的管道。
什么是流式处理检查点?
在 Apache Spark 结构化流式处理中,检查点是用于保存流式处理查询状态的机制。 此状态包括:
- 进度信息:已处理与源的偏移量。
-
中间状态:需要跨微批处理维护的数据进行有状态作(例如聚合)。
mapGroupsWithState
- 元数据:有关流式处理查询执行的信息。
检查点对于确保流式处理应用程序中的容错和数据一致性至关重要:
- 容错:如果流式处理应用程序失败(例如,由于节点故障、应用程序崩溃),检查点允许应用程序从上次成功的检查点状态重启,而不是从头开始重新处理所有数据。 这可以防止数据丢失并确保增量处理。
- 恰好一次处理:对于许多流式处理源、检查点以及幂等接收器,启用一次恰好处理可确保每条记录完全处理一次,即使面对故障,也防止重复或遗漏。
- 状态管理:对于有状态转换,检查点会保留这些作的内部状态,使流式处理查询能够根据累积的历史状态正确继续处理新数据。
Lakeflow 声明性管道中的检查点
Lakeflow 声明性管道构建在结构化流式处理上,并将大部分基础检查点管理抽象化,提供更声明性的方法。 在管道中定义流式处理表时,每个流都会有一个检查点状态,用于写入流式处理表。 这些检查点位置位于管道内部,用户无法访问。
通常不需要管理或了解流式处理表的基础检查点,但以下情况除外:
- 回退和重播:如果要在保留表的当前状态时重新处理特定时间点的数据,则必须重置流式处理表的检查点。
-
从检查点失败或损坏中恢复:如果写入流式处理表的查询由于检查点相关错误而失败,则会导致硬故障,并且查询无法进一步进行。 有三种方法可用于从此类故障中恢复:
- 完整表刷新:这会重置表并擦除现有数据。
- 使用备份和回填进行完整表刷新:在执行完整表刷新和回填旧数据之前,先执行表的备份,但这非常昂贵,应该是最后的手段。
- 重置检查点并继续:如果负担不起丢失现有数据,则必须对受影响的流流执行选择性检查点重置。
示例:由于代码更改而导致管道失败
假设有一个 Lakeflow 声明性管道,该管道处理更改数据馈送以及云存储系统(如 Amazon S3)的初始表快照,以及写入 SCD-1 流式处理表的情况。
管道有两个流式处理流:
-
customers_incremental_flow
:以增量方式读取customer
源表 CDC 源、筛选出重复记录,并将这些记录向上插入到目标表中。 -
customers_snapshot_flow
:一次性读取源表的初始快照customers
,并将记录向上插入目标表。
@dp.temporary_view(name="customers_incremental_view")
def query():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.load(customers_incremental_path)
.dropDuplicates(["customer_id"])
)
@dp.temporary_view(name="customers_snapshot_view")
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(customers_snapshot_path)
.select("*")
)
dp.create_streaming_table("customers")
dp.create_auto_cdc_flow(
flow_name = "customers_incremental_flow",
target = "customers",
source = "customers_incremental_view",
keys = ["customer_id"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
dp.create_auto_cdc_flow(
flow_name = "customers_snapshot_flow",
target = "customers",
source = "customers_snapshot_view",
keys = ["customer_id"],
sequence_by = lit(0),
stored_as_scd_type = 1,
once = True
)
部署此管道后,它将成功运行,并开始处理更改数据馈送和初始快照。
稍后,你意识到查询中的 customers_incremental_view
重复数据删除逻辑是冗余的,并导致性能瓶颈。
dropDuplicates()
删除以提高性能:
@dp.temporary_view(name="customers_raw_view")
def query():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.includeExistingFiles", "true")
.load()
# .dropDuplicates()
)
删除 dropDuplicates()
API 并重新部署管道后,更新失败并出现以下错误:
Streaming stateful operator name does not match with the operator in state metadata.
This is likely to happen when a user adds/removes/changes stateful operators of existing streaming query.
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)];
Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST
此错误表示,由于检查点状态与当前查询定义不匹配,不允许更改,从而阻止管道进一步进行。
检查点相关的故障可能会因各种原因而发生,而不仅仅是删除 dropDuplicates
API。 常见方案包括:
- 在现有流式处理查询中添加或删除有状态运算符(例如引入或删除
dropDuplicates()
或聚合)。 - 在以前检查点的查询中添加、删除或组合流式处理源(例如,将现有流式处理查询与新查询合并,或者从现有联合作中添加/删除源)。
- 修改有状态流式处理作的状态架构(例如更改用于删除重复数据或聚合的列)。
有关受支持和不支持的更改的综合列表,请参阅结构化流式处理查询中的Spark 结构化流式处理指南和更改类型。
恢复选项
有三种恢复策略,具体取决于数据持续性要求和资源约束:
Methods | 复杂性 | 成本 | 可能丢失数据 | 潜在的数据重复 | 需要初始快照 | 完整表重置 |
---|---|---|---|---|---|---|
完整表刷新 | Low | 中等 | 是(如果没有初始快照可用,或者原始文件已在源中删除)。 | 否(对于应用更改目标表)。 | 是的 | 是的 |
备份和回填的完整表刷新 | 中等 | High | 否 | 否 (对于幂等接收器。例如,自动 CDC。) | 否 | 否 |
重置表检查点 | Medium-High(仅追加源的中,提供不可变偏移量)。 | Low | 不(需要仔细考虑)。 | 不 (对于幂等作家。例如,仅将 CDC 自动连接到目标表。 | 否 | 否 |
Medium-High 复杂性取决于流式处理源类型和查询的复杂性。
Recommendations
- 如果不想处理检查点重置的复杂性,则可以重新计算整个表,请使用完整表刷新。 这还会提供一个选项来更改代码。
- 如果不想处理检查点重置的复杂性,则可以对备份和回填使用完整表刷新,并且可以承担备份和回填历史数据的额外成本。
- 如果必须保留表中的现有数据并继续以增量方式处理新数据,请使用重置表检查点。 但是,此方法需要仔细处理检查点重置,以检查表中的现有数据是否不丢失,并且管道可以继续处理新数据。
重置检查点并继续增量
若要重置检查点并继续以增量方式进行处理,请执行以下步骤:
停止管道:确保管道没有正在运行的活动更新。
确定新检查点的起始位置:确定要继续处理的最后一个成功偏移量或时间戳。 这通常是在发生故障之前成功处理的最新偏移量。
在上面的示例中,由于使用自动加载程序读取 JSON 文件,因此可以使用
modifiedAfter
选项来指定新检查点的起始位置。 使用此选项可以设置自动加载程序何时开始处理新文件的时间戳。对于 Kafka 源,可以使用
startingOffsets
此选项指定流式处理查询应开始处理新数据的偏移量。对于 Delta Lake 源,可以使用
startingVersion
此选项指定流式处理查询应从中开始处理新数据的版本。进行代码更改:可以修改流式处理查询以删除
dropDuplicates()
API 或进行其他更改。 此外,请检查是否已将modifiedAfter
选项添加到自动加载程序读取路径。@dp.temporary_view(name="customers_incremental_view") def query(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .option("cloudFiles.includeExistingFiles", "true") .option("modifiedAfter", "2025-04-09T06:15:00") .load(customers_incremental_path) # .dropDuplicates(["customer_id"]) )
注释
提供不正确的
modifiedAfter
时间戳可能会导致数据丢失或重复。 检查时间戳是否已正确设置,以避免再次处理旧数据或缺少新数据。如果查询具有流式联接或流式传输联合,则必须对所有参与的流源应用上述策略。 例如:
cdc_1 = spark.readStream.format("cloudFiles")... cdc_2 = spark.readStream.format("cloudFiles")... cdc_source = cdc_1..union(cdc_2)
标识与要为其重置检查点的流式处理表关联的流名称。 在此示例中,它是
customers_incremental_flow
. 可以在管道代码中或通过检查管道 UI 或管道事件日志来查找流名称。重置检查点:创建 Python 笔记本并将其附加到 Azure Databricks 群集。
需要以下信息才能重置检查点:
- Azure Databricks 工作区 URL
- 管道 ID
- 要为其重置检查点的流名称
import requests import json # Define your Databricks instance and pipeline ID databricks_instance = "<DATABRICKS_URL>" token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get() pipeline_id = "<YOUR_PIPELINE_ID>" flows_to_reset = ["<YOUR_FLOW_NAME>"] # Set up the API endpoint endpoint = f"{databricks_instance}/api/2.0/pipelines/{pipeline_id}/updates" # Set up the request headers headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } # Define the payload payload = { "reset_checkpoint_selection": flows_to_reset } # Make the POST request response = requests.post(endpoint, headers=headers, data=json.dumps(payload)) # Check the response if response.status_code == 200: print("Pipeline update started successfully.") else: print(f"Error: {response.status_code}, {response.text}")
运行管道:管道使用新的检查点从指定的起始位置开始处理新数据,同时保留现有表数据,同时继续增量处理。
最佳做法
- 避免在生产中使用个人预览版功能。
- 在生产环境中进行更改之前测试更改。
- 理想情况下,在较低环境中创建测试管道。 如果无法执行此作,请尝试对测试使用不同的目录和架构。
- 重现错误。
- 应用更改。
- 验证结果并做出决定,no-go。
- 对生产管道进行更改。