apply_changes()
函数使用 DLT 变更数据捕获 (CDC) 功能来处理来自变更数据馈送 (CDF) 的源数据。
重要
必须声明一个要向其应用更改的目标流式处理表。 可以选择为目标表指定架构。 指定 apply_changes()
目标表的架构时,必须包含具有与 __START_AT
字段相同数据类型的 __END_AT
和 sequence_by
列。
若要创建所需的目标表,可以在 DLT Python 接口中使用 create_streaming_table() 函数。
import dlt
dlt.apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
对于 apply_changes
处理,INSERT
和 UPDATE
事件的默认行为是从源更新插入 CDC 事件:更新目标表中与指定的键匹配的所有行,或者当目标表中不存在某个匹配的记录时插入新行。 可以通过DELETE
参数指定apply_as_deletes
事件的处理。
若要了解有关使用更改源进行 CDC 处理的详细信息,请参阅 “应用更改 API:使用 DLT 简化更改数据捕获”。 有关使用apply_changes()
函数的示例,请参阅示例:使用 CDF 源数据处理 SCD 类型 1 和 SCD 类型 2。
参数 |
类型 |
DESCRIPTION |
target |
str |
必填。 要更新的表的名称。 可以在执行 函数之前使用 apply_changes() 函数创建目标表。 |
source |
str |
必填。 包含疾病控制与预防中心(CDC)记录的数据源。 |
keys |
list |
必填。 用于唯一标识源数据中的行的列或列组合。 这用于标识哪些 CDC 事件适用于目标表中的特定记录。 可以指定以下任一项:
- 字符串列表:
["userId", "orderId"] - Spark SQL
col() 函数的列表: [col("userId"), col("orderId"] .
col() 函数的参数不能包含限定符。 例如,可以使用 col(userId) ,但不能使用 col(source.userId) 。
|
sequence_by |
str 、col() 或 struct() |
必填。 用于指定源数据中 CDC 事件的逻辑顺序的列名。 DLT 使用此排序来处理以无序方式到达的更改事件。 指定的列必须是可排序的数据类型。 可以指定以下任一项:
- 字符串:
"sequenceNum" - 一个 Spark SQL
col() 函数:col("sequenceNum") 。
col() 函数的参数不能包含限定符。 例如,可以使用 col(userId) ,但不能使用 col(source.userId) 。 -
struct() 组合多个列来打破关联:struct("timestamp_col", "id_col") ,它首先按第一个结构字段排序,如果存在关联,则按第二个字段排序,依此类推。
|
ignore_null_updates |
bool |
允许引入包含目标列子集的更新。 当 CDC 事件匹配现有行并且ignore_null_updates 为True 时,具有null 的列会在目标中保留其现有值。 这也适用于值为 null 的嵌套列。 当ignore_null_updates 为False 时,会使用null 值覆盖现有值。 默认值为 False 。 |
apply_as_deletes |
str 或 expr() |
指定何时应将 CDC 事件视为 DELETE 而不是更新插入。 可以指定以下任一项:
- 字符串:
"Operation = 'DELETE'" - Spark SQL
expr() 函数:expr("Operation = 'DELETE'")
为了处理无序数据,已删除的行将暂时保留为基础增量表中的逻辑删除,并在元存储中创建一个用于筛选出这些逻辑删除的视图。 可以使用表属性配置 pipelines.cdc.tombstoneGCThresholdInSeconds 保留间隔。 |
apply_as_truncates |
str 或 expr() |
指定何时应将 CDC 事件视为完整表 TRUNCATE 。 可以指定以下任一项:
- 字符串:
"Operation = 'TRUNCATE'" - Spark SQL
expr() 函数:expr("Operation = 'TRUNCATE'")
由于此子句会触发目标表的完全截断,因此应仅将其用于需要此功能的特定用例。 仅 SCD 类型 1 支持 apply_as_truncates 参数。 SCD 类型 2 不支持截断操作。 |
column_list 或 except_column_list |
list |
要包含在目标表中的列子集。 使用 column_list 指定要包含的列的完整列表。 使用 except_column_list 指定要排除的列。 可以将任一值声明为字符串列表或 Spark SQL col() 函数:
column_list = ["userId", "name", "city"] column_list = [col("userId"), col("name"), col("city")] except_column_list = ["operation", "sequenceNum"] except_column_list = [col("operation"), col("sequenceNum")
col() 函数的参数不能包含限定符。 例如,可以使用 col(userId) ,但不能使用 col(source.userId) 。 当没有 column_list 或 except_column_list 参数传递给函数时,默认设置是包含目标表中的所有列。 |
stored_as_scd_type |
str 或 int |
将记录存储为 SCD 类型 1 还是 SCD 类型 2。 对于 SCD 类型 1,将其设置为 1 ;对于 SCD 类型 2,将其设置为 2 。 默认值为 SCD 类型 1。 |
track_history_column_list 或 track_history_except_column_list |
list |
要在目标表中跟踪其历史记录的输出列子集。 使用 track_history_column_list 指定要跟踪的列的完整列表。 用于 track_history_except_column_list 指定要从跟踪中排除的列。 可以将任一值声明为字符串列表或 Spark SQL col() 函数:
track_history_column_list = ["userId", "name", "city"] track_history_column_list = [col("userId"), col("name"), col("city")] track_history_except_column_list = ["operation", "sequenceNum"] track_history_except_column_list = [col("operation"), col("sequenceNum")
col() 函数的参数不能包含限定符。 例如,可以使用 col(userId) ,但不能使用 col(source.userId) 。 当没有 track_history_column_list 或 track_history_except_column_list 参数传递给函数时,默认设置是包含目标表中的所有列。 |