create_auto_cdc_from_snapshot_flow

重要

此功能以 公共预览版提供

create_auto_cdc_from_snapshot_flow 函数创建一个流,该流使用 Lakeflow 声明性管道更改数据捕获(CDC)功能来处理数据库快照中的源数据。 请参阅 CDC 如何使用 API 实现AUTO CDC FROM SNAPSHOT

注释

此函数替换上一个函数 apply_changes_from_snapshot()。 这两个函数具有相同的签名。 Databricks 建议更新以使用新名称。

重要

必须为该操作指定目标流式处理表。

若要创建所需的目标表,可以使用 create_streaming_table() 函数。

Syntax

from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

注释

对于 AUTO CDC FROM SNAPSHOT 处理,默认行为是在目标中不存在具有相同键的匹配记录时插入新行。 如果匹配记录存在,则仅当行中的任何值已更改时,才会更新该记录。 目标中存在但源中不再存在键的行将被删除。

若要了解有关使用快照处理 CDC 的详细信息,请参阅 AUTO CDC API:使用 Lakeflow 声明性管道简化变更数据捕获。 有关使用 create_auto_cdc_from_snapshot_flow() 函数的示例,请参阅 定期快照引入历史快照引入 示例。

参数

参数 类型 Description
target str 必填。 要更新的表的名称。 可以在执行 函数之前使用 create_auto_cdc_from_snapshot_flow() 函数创建目标表。
source strlambda function 必填。 要定期快照的表或视图的名称,或返回要处理的快照数据帧及其快照版本的 Python lambda 函数。 请参阅 “实现 source 参数”。
keys list 必填。 用于唯一标识源数据中的行的列或列组合。 这用于标识哪些 CDC 事件适用于目标表中的特定记录。 可以指定以下任一项:
  • 字符串列表: ["userId", "orderId"]
  • Spark SQL col() 函数的列表: [col("userId"), col("orderId"]. col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)
stored_as_scd_type strint 将记录存储为 SCD 类型 1 还是 SCD 类型 2。 对于 SCD 类型 1,将其设置为 1;对于 SCD 类型 2,将其设置为 2。 默认值为 SCD 类型 1。
track_history_column_listtrack_history_except_column_list list 要在目标表中跟踪其历史记录的输出列子集。 使用 track_history_column_list 指定要跟踪的列的完整列表。 用于 track_history_except_column_list 指定要从跟踪中排除的列。 可以将任一值声明为字符串列表或 Spark SQL col() 函数:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)。 当没有 track_history_column_listtrack_history_except_column_list 参数传递给函数时,默认设置是包含目标表中的所有列。

source实现参数

create_auto_cdc_from_snapshot_flow() 函数包括 source 参数。 对于处理历史快照,参数 source 应为 Python lambda 函数,该函数返回两个值 create_auto_cdc_from_snapshot_flow() :一个 Python 数据帧,其中包含要处理的快照数据和快照版本。

以下是 lambda 函数的签名:

lambda Any => Optional[(DataFrame, Any)]
  • lambda 函数的参数是最近处理的快照版本。
  • lambda 函数的返回值是 None 或由两个值组成的元组:元组的第一个值是包含要处理的快照的 DataFrame。 元组的第二个值是用来表示快照的逻辑顺序的快照版本号。

实现和调用 lambda 函数的示例:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

每次触发包含 create_auto_cdc_from_snapshot_flow() 函数的管道时,Lakeflow Declarative Pipelines 运行时环境都会执行以下步骤:

  1. 运行该 next_snapshot_and_version 函数以加载下一个快照数据帧和相应的快照版本。

  2. 如果未返回 DataFrame,则运行将终止,管道更新将标记为已完成。

  3. 检测新快照中的更改,并增量将它们应用于目标表。

  4. 返回到步骤 #1 以加载下一个快照及其版本。