重要
此功能处于公共预览阶段。
该create_auto_cdc_from_snapshot_flow
函数创建一个流,该流使用 Lakeflow 声明式管道更改数据捕获(CDC)功能来处理来自数据库快照的源数据。 请参阅 CDC 如何使用 API 实现AUTO CDC FROM SNAPSHOT
?
注释
此函数替换上一个函数 apply_changes_from_snapshot()
。 这两个函数具有相同的签名。 Databricks 建议更新以使用新名称。
语法
import dlt
dlt.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
注释
对于AUTO CDC FROM SNAPSHOT
处理,默认行为是在目标中不存在具有相同键的匹配记录时插入新行。 如果匹配记录确实存在,则只有当行中的任何值都已更改时,才会更新该记录。 删除目标中存在键但源中不再存在键的行。
若要了解有关使用快照处理 CDC 的详细信息,请参阅 AUTO CDC API:使用 Lakeflow 声明性管道简化变更数据捕获。 有关使用create_auto_cdc_from_snapshot_flow()
函数的示例,请参阅定期快照引入和历史快照引入示例。
参数
参数 | 类型 | DESCRIPTION |
---|---|---|
target |
str |
必填。 要更新的表的名称。 可以在执行 函数之前使用 create_auto_cdc_from_snapshot_flow() 函数创建目标表。 |
source |
str 或 lambda function |
必填。 要定期拍摄快照的表或视图的名称或返回要处理的快照 DataFrame 和快照版本的 Python lambda 函数。 请参阅 “实现 source 参数”。 |
keys |
list |
必填。 用于唯一标识源数据中的行的列或列组合。 这用于标识哪些 CDC 事件适用于目标表中的特定记录。 可以指定以下任一项:
|
stored_as_scd_type |
str 或 int |
将记录存储为 SCD 类型 1 还是 SCD 类型 2。 对于 SCD 类型 1,将其设置为 1 ;对于 SCD 类型 2,将其设置为 2 。 默认值为 SCD 类型 1。 |
track_history_column_list 或 track_history_except_column_list |
list |
要在目标表中跟踪其历史记录的输出列子集。 使用 track_history_column_list 指定要跟踪的列的完整列表。 用于 track_history_except_column_list 指定要从跟踪中排除的列。 可以将任一值声明为字符串列表或 Spark SQL col() 函数:
col() 函数的参数不能包含限定符。 例如,可以使用 col(userId) ,但不能使用 col(source.userId) 。 当没有 track_history_column_list 或 track_history_except_column_list 参数传递给函数时,默认设置是包含目标表中的所有列。 |
实现 source
参数
create_auto_cdc_from_snapshot_flow()
函数包括source
参数。 对于处理历史快照,source
参数应为 Python lambda 函数,该函数将两个值返回到create_auto_cdc_from_snapshot_flow()
函数:包含要处理的快照数据的 Python DataFrame 和快照版本。
以下是 lambda 函数的签名:
lambda Any => Optional[(DataFrame, Any)]
- lambda 函数的参数是最近处理的快照版本。
- lambda 函数的返回值是
None
或两个值的元组:元组的第一个值是包含要处理的快照的 DataFrame。 元组的第二个值是表示快照逻辑顺序的快照版本。
实现和调用 lambda 函数的示例:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)
每当包含 create_auto_cdc_from_snapshot_flow()
函数的管道被触发时,Lakeflow 声明性流水线运行时都会执行以下步骤:
运行
next_snapshot_and_version
函数以加载下一个快照 DataFrame 和相应的快照版本。如果未返回 DataFrame,则运行会终止,管道更新会标记为已完成。
检测新快照中的更改,并以增量方式将其应用于目标表。
返回到步骤 #1 以加载下一个快照及其版本。