应用更改

apply_changes() 函数使用 DLT 变更数据捕获 (CDC) 功能来处理来自变更数据馈送 (CDF) 的源数据。

重要

必须声明一个要向其应用更改的目标流式处理表。 可以选择为目标表指定架构。 指定 apply_changes() 目标表的架构时,必须包含具有与 __START_AT 字段相同数据类型的 __END_ATsequence_by 列。

若要创建所需的目标表,可以在 DLT Python 接口中使用 create_streaming_table() 函数。

语法

import dlt

dlt.apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

对于 apply_changes 处理,INSERTUPDATE 事件的默认行为是从源更新插入 CDC 事件:更新目标表中与指定的键匹配的所有行,或者当目标表中不存在某个匹配的记录时插入新行。 可以通过DELETE参数指定apply_as_deletes事件的处理。

若要了解有关使用更改源进行 CDC 处理的详细信息,请参阅 “应用更改 API:使用 DLT 简化更改数据捕获”。 有关使用apply_changes()函数的示例,请参阅示例:使用 CDF 源数据处理 SCD 类型 1 和 SCD 类型 2

参数

参数 类型 DESCRIPTION
target str 必填。 要更新的表的名称。 可以在执行 函数之前使用 apply_changes() 函数创建目标表。
source str 必填。 包含疾病控制与预防中心(CDC)记录的数据源。
keys list 必填。 用于唯一标识源数据中的行的列或列组合。 这用于标识哪些 CDC 事件适用于目标表中的特定记录。 可以指定以下任一项:
  • 字符串列表: ["userId", "orderId"]
  • Spark SQL col() 函数的列表: [col("userId"), col("orderId"]. col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)
sequence_by strcol()struct() 必填。 用于指定源数据中 CDC 事件的逻辑顺序的列名。 DLT 使用此排序来处理以无序方式到达的更改事件。 指定的列必须是可排序的数据类型。 可以指定以下任一项:
  • 字符串: "sequenceNum"
  • 一个 Spark SQL col() 函数:col("sequenceNum")col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)
  • struct() 组合多个列来打破关联:struct("timestamp_col", "id_col"),它首先按第一个结构字段排序,如果存在关联,则按第二个字段排序,依此类推。
ignore_null_updates bool 允许引入包含目标列子集的更新。 当 CDC 事件匹配现有行并且ignore_null_updatesTrue时,具有null的列会在目标中保留其现有值。 这也适用于值为 null 的嵌套列。 当ignore_null_updatesFalse时,会使用null值覆盖现有值。
默认值为 False
apply_as_deletes strexpr() 指定何时应将 CDC 事件视为 DELETE 而不是更新插入。 可以指定以下任一项:
  • 字符串: "Operation = 'DELETE'"
  • Spark SQL expr() 函数:expr("Operation = 'DELETE'")

为了处理无序数据,已删除的行将暂时保留为基础增量表中的逻辑删除,并在元存储中创建一个用于筛选出这些逻辑删除的视图。 可以使用表属性配置 pipelines.cdc.tombstoneGCThresholdInSeconds 保留间隔。
apply_as_truncates strexpr() 指定何时应将 CDC 事件视为完整表 TRUNCATE。 可以指定以下任一项:
  • 字符串: "Operation = 'TRUNCATE'"
  • Spark SQL expr() 函数:expr("Operation = 'TRUNCATE'")

由于此子句会触发目标表的完全截断,因此应仅将其用于需要此功能的特定用例。 仅 SCD 类型 1 支持 apply_as_truncates 参数。 SCD 类型 2 不支持截断操作。
column_listexcept_column_list list 要包含在目标表中的列子集。 使用 column_list 指定要包含的列的完整列表。 使用 except_column_list 指定要排除的列。 可以将任一值声明为字符串列表或 Spark SQL col() 函数:
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)。 当没有 column_listexcept_column_list 参数传递给函数时,默认设置是包含目标表中的所有列。
stored_as_scd_type strint 将记录存储为 SCD 类型 1 还是 SCD 类型 2。 对于 SCD 类型 1,将其设置为 1;对于 SCD 类型 2,将其设置为 2。 默认值为 SCD 类型 1。
track_history_column_listtrack_history_except_column_list list 要在目标表中跟踪其历史记录的输出列子集。 使用 track_history_column_list 指定要跟踪的列的完整列表。 用于 track_history_except_column_list 指定要从跟踪中排除的列。 可以将任一值声明为字符串列表或 Spark SQL col() 函数:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

col() 函数的参数不能包含限定符。 例如,可以使用 col(userId),但不能使用 col(source.userId)。 当没有 track_history_column_listtrack_history_except_column_list 参数传递给函数时,默认设置是包含目标表中的所有列。