增量实时表使用APPLY CHANGES
和APPLY CHANGES FROM SNAPSHOT
API 简化了变更数据捕获 (CDC)。 使用的接口取决于更改数据源:
- 使用
APPLY CHANGES
处理更改数据源 (CDF) 的更改。 - 使用
APPLY CHANGES FROM SNAPSHOT
(公共预览版)处理数据库快照中的更改。
以前,通常使用 MERGE INTO
语句处理 Azure Databricks 上的 CDC 记录。 但是,MERGE INTO
可能会由于无序记录而生成不正确的结果,或者需要复杂的逻辑来重新排序记录。
增量实时表 SQL 和 Python 接口支持APPLY CHANGES
API。 增量实时表 Python 接口支持APPLY CHANGES FROM SNAPSHOT
API。
APPLY CHANGES
和APPLY CHANGES FROM SNAPSHOT
都支持使用 SCD 类型 1 和类型 2 更新表:
- 使用 SCD 类型 1 直接更新记录。 不保留更新记录的历史记录。
- 使用 SCD 类型 2 保留有关所有更新或者对指定列集的更新的记录的历史记录。
有关语法和其他参考,请参阅:
备注
本文介绍如何根据源数据中的更改来更新增量实时表管道中的表。 若要了解如何记录和查询 Delta 表的行级更改信息,请参阅在 Azure Databricks 上使用 Delta Lake 更改数据馈送。
若要使用 CDC API,必须将管道配置为使用增量实时表 Pro
或 Advanced
版本。
增量实时表中的 APPLY CHANGES
API 可自动处理无序记录,从而确保正确处理 CDC 记录,而且无需开发复杂的逻辑来处理无序记录。 必须在源数据中指定一列作为记录排序依据,增量实时表将此顺序解释为源数据正确排序的单调递增表示形式。 增量实时表会自动处理不按顺序到达的数据。 对于 SCD 类型 2 的更改,增量实时表会将相应的顺序值传播到目标表的 __START_AT
和 __END_AT
列。 每个顺序值的每个键应该有一个非重复更新,不支持 NULL 顺序值。
要使用APPLY CHANGES
执行 CDC 处理,请先创建流式表,然后使用 SQL 中的APPLY CHANGES INTO
语句或 Python 中的apply_changes()
函数指定更改源的源、键和排序。 若要创建目标流式表,请使用 SQL 中的 CREATE OR REFRESH STREAMING TABLE
语句或 Python 中的 create_streaming_table()
函数。 请参阅SCD 类型 1 和类型 2 处理示例。
有关语法详细信息,请参阅增量实时表SQL 参考或Python 参考。
重要
APPLY CHANGES FROM SNAPSHOT
API 为公共预览版。
APPLY CHANGES FROM SNAPSHOT
是声明性 API,它比较一系列有序快照以有效地确定源数据的更改,然后运行对快照中的记录进行 CDC 处理所需的处理。 仅增量实时表 Python 接口支持APPLY CHANGES FROM SNAPSHOT
。
APPLY CHANGES FROM SNAPSHOT
支持从多个源类型引入快照:
- 使用定期快照引入从现有表或视图中引入快照。
APPLY CHANGES FROM SNAPSHOT
提供了简单的简化界面,用于支持定期从现有数据库对象引入快照。 每次管道更新都会引入新快照,引入时间用作快照版本。 在连续模式下运行管道时,会按照包含 APPLY CHANGES FROM SNAPSHOT 处理的流的触发器间隔设置确定的周期,使用每个管道更新引入多个快照。 - 使用历史快照引入来处理包含数据库快照的文件,例如从 Oracle 或 MySQL 数据库或数据仓库生成的快照。
要使用APPLY CHANGES FROM SNAPSHOT
从任何源类型执行 CDC 处理,请先创建流式表,然后使用 Python 中的apply_changes_from_snapshot()
函数指定实现处理所需的快照、键和其他参数。 请参阅定期快照引入和历史快照引入示例。
传递给 API 的快照必须按版本按升序排列。 如果增量实时表检测到无序快照,则会引发错误。
有关语法详细信息,请参阅增量实时表Python 参考。
用于排序的列必须是可排序数据类型。
以下部分提供了增量实时表 SCD 类型 1 和类型 2 查询的示例,这些查询基于更改数据源中的源事件更新目标表:
- 新建用户记录。
- 删除用户记录。
- 更新用户记录。 在 SCD 类型 1 示例中,最后的
UPDATE
操作延迟到达并从目标表中删除,展示了无序事件的处理。
以下所有示例假设你知道如何配置和更新增量实时表管道。 请参阅教程:运行第一个增量实时表管道。
若要运行这些示例,必须首先创建一个示例数据集。 请参阅生成测试数据。
下面是这些示例的输入记录:
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
124 | Raul | 瓦哈卡 | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | Null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | 奇瓦瓦 | UPDATE | 5 |
如果你取消注释示例数据中的最后一行,则会插入以下记录用于指定记录的截断位置:
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
null | Null | null | TRUNCATE | 3 |
备注
以下所有示例都包含用于指定DELETE
和TRUNCATE
操作的选项,但其中的每个选项都是可选的。
以下示例演示了如何处理 SCD 类型 1 更新:
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
运行 SCD 类型 1 示例后,目标表包含以下记录:
userId | name | city |
---|---|---|
124 | Raul | 瓦哈卡 |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
使用附加的 TRUNCATE
记录运行 SCD 类型 1 示例后,由于 sequenceNum=3
处的 TRUNCATE
操作,124
和 126
的记录将被截断,并且目标表包含以下记录:
userId | name | city |
---|---|---|
125 | Mercedes | Guadalajara |
以下示例演示了如何处理 SCD 类型 2 更新:
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
运行 SCD 类型 2 示例后,目标表包含以下记录:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | 奇瓦瓦 | 5 | 6 |
124 | Raul | 瓦哈卡 | 1 | null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lily | Cancun | 2 | Null |
SCD 类型 2 查询还可以指定一个输出列的子集,以便跟踪目标表中的历史记录。 对其他列的更改都就地更新,而不是生成新的历史记录。 以下示例演示了如何从跟踪中排除 city
列:
以下示例演示如何通过 SCD 类型 2 使用跟踪历史记录:
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
在没有额外 TRUNCATE
记录的情况下运行此示例后,目标表包含以下记录:
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | 奇瓦瓦 | 1 | 6 |
124 | Raul | 瓦哈卡 | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lily | Cancun | 2 | null |
以下代码用于生成可在本教程的示例查询中使用的示例数据集。 假设你拥有新建架构和新表的适当凭据,则可以使用笔记本或 Databricks SQL 运行这些语句。 以下代码不能作为增量实时表管道的一部分运行:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
以下示例演示了 SCD 类型 2 处理,该处理引入存储在mycatalog.myschema.mytable
的表的快照。 处理结果写入名为target
的表。
mycatalog.myschema.mytable
时间戳 2024-01-01 00:00:00 的记录
密钥 | 值 |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
时间戳 2024-01-01 12:00:00 的记录
密钥 | 值 |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
处理快照后,目标表包含以下记录:
密钥 | 值 | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | Null |
3 | a3 | 2024-01-01 12:00:00 | Null |
以下示例演示了 SCD 类型 2 处理,该处理基于存储在云存储系统中的两个快照中的源事件更新目标表:
存储在 /<PATH>/filename1.csv
的 timestamp
上的快照
密钥 | TrackingColumn | NonTrackingColumn |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
存储在 /<PATH>/filename2.csv
的 timestamp + 5
上的快照
密钥 | TrackingColumn | NonTrackingColumn |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
以下代码示例演示了如何使用以下快照处理 SCD 类型 2 更新:
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
处理快照后,目标表包含以下记录:
密钥 | TrackingColumn | NonTrackingColumn | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | null |
3 | a3 | b3 | 2 | null |
4 | a4 | b4_new | 1 | Null |
如果管道将表发布到 Unity Catalog,则可以使用数据操作语言 (DML) 声明(包括插入、更新、删除与合并声明)来修改由 APPLY CHANGES INTO
声明创建的目标流式表。
备注
- 不支持 DML 声明修改流式表的表架构。 确保 DML 语句不会尝试修改表架构。
- 只能在使用 Databricks Runtime 13.3 LTS 及更高版本的 Unity Catalog 共享群集或 SQL 仓库中运行用于更新流式表的 DML 语句。
- 由于流式传输要求仅追加数据源,因此如果你的处理需要从包含更改的源流式传输表进行流式传输(例如,通过 DML 语句),请在读取源流式传输表时设置 skipChangeCommits 标志。 设置
skipChangeCommits
后,将会忽略删除或修改源表上记录的事务。 如果处理不需要流式表,则可以使用具体化视图(没有仅追加限制)作为目标表。
由于增量实时表使用指定的 SEQUENCE BY
列并将适当的排序值传播到(供 SCD 类型 2 使用的)目标表的 __START_AT
和 __END_AT
列,因此必须确保 DML 声明使用这些列的有效值,确保记录的顺序正确。 请参阅如何使用 APPLY CHANGES API 实现 CDC?。
有关如何使用 DML 声明处理流式表的详细信息,请参阅在流式表中添加、更改或删除数据。
以下示例插入一个起始序列为 5 的活动记录:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
在 Databricks Runtime 15.2 及更高版本中,可以从流式表中读取更改数据馈送,该表是 APPLY CHANGES
或 APPLY CHANGES FROM SNAPSHOT
查询的目标,读取方式与从其他 Delta 表中读取更改数据馈送的方式相同。 从目标流式表中读取更改数据馈送需要满足以下条件:
- 目标流式表必须发布到 Unity Catalog。 请参阅将 Unity Catalog 与 Delta Live Tables 管道配合使用。
- 若要从目标流式表中读取更改数据馈送,必须使用 Databricks Runtime 15.2 或更高版本。 若要在另一个增量实时表管道中读取更改数据馈送,必须将管道配置为使用 Databricks Runtime 15.2 或更高版本。
可以从在 Delta 实时表管道中创建的目标流式表中读取更改数据馈送,读取方式与从其他 Delta 表中读取更改数据馈送的方式相同。 若要详细了解如何使用 Delta 更改数据馈送功能,包括 Python 和 SQL 中的示例,请参阅在 Azure Databricks 上使用 Delta Lake 更改数据馈送。
备注
更改数据馈送记录包括标识更改事件类型的元数据。 在表中更新记录时,关联的更改记录的元数据通常包括设置为 update_preimage
和 update_postimage
事件的 _change_type
值。
但是,如果对包含更改主键值的目标流式表进行更新,则 _change_type
值会有所不同。 当更改包括对主键的更新时,_change_type
元数据字段将设置为 insert
和 delete
事件。 如果对具有 UPDATE
或 MERGE
语句的某个密钥字段进行手动更新,或者对于 SCD 类型 2 表,当 __start_at
字段更改以反映较早的起始序列值时,可能会更改主键。
APPLY CHANGES
查询确定主键值,SCD 类型 1 和 SCD 类型 2 处理分别对应不同的值:
- 对于 SCD 类型 1 处理和增量实时表 Python 接口,主键是
apply_changes()
函数中keys
参数的值。 对于增量实时表 SQL 接口,主键是由APPLY CHANGES INTO
语句中的KEYS
子句定义的列。 - 对于 SCD 类型 2,主键是
keys
参数或KEYS
子句以及coalesce(__START_AT, __END_AT)
操作的返回值,其中__START_AT
和__END_AT
分别是目标流式表中的相应列。
备注
以下指标仅通过 APPLY CHANGES
查询(而不是 APPLY CHANGES FROM SNAPSHOT
查询)捕获。
以下指标由 APPLY CHANGES
查询捕获:
num_upserted_rows
:在更新期间更新插入数据集的输出行数。num_deleted_rows
:在更新期间从数据集中删除的现有输出行数。
对于 apply changes
查询,不会捕获作为非 CDC 流输出的 num_output_rows
指标。
备注
- 以下数据结构仅适用于
APPLY CHANGES
处理,而不适用于APPLY CHANGES FROM SNAPSHOT
处理。 - 仅当目标表发布到 Hive 元存储时,这些数据结构才适用。 如果管道发布到 Unity Catalog,则用户无法访问内部支持表。
在 Hive 元存储中声明目标表时,会创建两个数据结构:
- 一个使用分配给目标表的名称的视图。
- 一个由增量实时表用来管理 CDC 处理的内部支持表。 此表的命名方式是在目标表名称的前面加上
__apply_changes_storage_
。
例如,如果你声明一个名为 dlt_cdc_target
的目标表,则会在元存储中看到一个名为 dlt_cdc_target
的视图和一个名为 __apply_changes_storage_dlt_cdc_target
的表。 创建视图后,增量实时表可以筛选出处理无序数据所需的额外信息(例如逻辑删除和版本)。 若要查看处理后的数据,请查询目标视图。 由于表 __apply_changes_storage_
的架构可能会更改以支持将来的功能或增强功能,因此不应查询表以供生产使用。 如果手动在表中添加数据,则认为记录出现在其他更改之前,因为缺少版本列。