Lakeflow 声明性管道通过 AUTO CDC
和 AUTO CDC FROM SNAPSHOT
API 简化了更改数据捕获(CDC)。
注释
AUTO CDC
API 替换 APPLY CHANGES
API,并具有相同的语法。
APPLY CHANGES
API 仍然可用,但 Databricks 建议在其位置使用 AUTO CDC
API。
使用的接口取决于更改数据源:
- 用
AUTO CDC
来处理更改数据馈送(CDF)中的更改。 - 使用
AUTO CDC FROM SNAPSHOT
(公共预览版,仅适用于 Python)处理数据库快照中的更改。
以前,该 MERGE INTO
语句通常用于处理 Azure Databricks 上的 CDC 记录。 但是, MERGE INTO
由于序列外记录或需要复杂的逻辑重新排序记录,可能会生成不正确的结果。
Lakeflow 声明性管道 SQL 和 Python 接口支持该 AUTO CDC
API。 Lakeflow 声明性管道 Python 接口支持该 AUTO CDC FROM SNAPSHOT
API。
AUTO CDC
和AUTO CDC FROM SNAPSHOT
都支持使用 SCD 类型 1 和类型 2 更新表。
- 使用 SCD 类型 1 直接更新记录。 历史不会保留更新后的记录。
- 使用 SCD 类型 2 保留记录的历史,可以选择在所有更新中保留,或者仅在指定列集的更新中保留。
有关语法和其他引用,请参阅 AUTO CDC for Lakeflow 声明性管道 SQL、 适用于 Lakeflow 声明性管道 Python 的 AUTO CDC 和 适用于 Lakeflow 声明性管道 Python 的 AUTO CDC FROM SNAPSHOT。
注释
本文介绍如何在源数据发生更改时,更新 Lakeflow 声明性管道中的表。 若要了解如何记录和查询 Delta 表的行级更改信息,请参阅 在 Azure Databricks 上使用 Delta Lake 更改数据馈送。
要求
若要使用 CDC API,必须将管道配置为使用 Lakeflow 声明性管道 Pro
或 Advanced
版本。
CDC 如何通过 AUTO CDC API 实现?
通过自动处理序列外记录,Lakeflow 声明性管道中的 AUTO CDC API 可确保正确处理 CDC 记录,并不需要开发复杂的逻辑来处理序列外记录。 必须在源数据中指定一个列以对记录进行排序,该列被 Lakeflow 声明性管道解释为正确排序源数据的单调递增表示。 Lakeflow 声明性管道会自动处理无序到达的数据。 对于 SCD 类型 2 的更改,Lakeflow 声明性管道会将相应的序列值分别传播到目标表的__START_AT
列和__END_AT
列。 每个排序值中每个键应有一个不同的更新,不支持 NULL 排序值。
若要使用 AUTO CDC
执行 CDC 处理,请先创建流式表,然后在 SQL 中使用 AUTO CDC ... INTO
语句或在 Python 中使用 create_auto_cdc_flow()
函数来指定更改源、键和顺序以设置变更馈送。 若要创建目标流式处理表,请使用 CREATE OR REFRESH STREAMING TABLE
SQL 中的语句或 create_streaming_table()
Python 中的函数。 请参阅 SCD 类型 1 和类型 2 处理 示例。
有关语法详细信息,请参阅 Lakeflow 声明性管道 SQL 参考 或 Python 参考。
CDC 如何通过 AUTO CDC FROM SNAPSHOT
API 实现?
重要
API AUTO CDC FROM SNAPSHOT
处于 公开预览版。
AUTO CDC FROM SNAPSHOT
是一个声明性 API,它通过比较一系列有序快照来有效地确定源数据的变化,然后运行 CDC 处理快照中记录所需的处理。
AUTO CDC FROM SNAPSHOT
仅 Lakeflow 声明性管道 Python 接口支持。
AUTO CDC FROM SNAPSHOT
支持从多种源类型导入快照。
- 使用定期快照导入从现有表或视图中获取的快照。
AUTO CDC FROM SNAPSHOT
具有一个简单的简化界面,用于支持定期从现有数据库对象引入快照。 每次管道更新时,新快照被引入,并将引入的时间用作快照版本号。 在连续模式下运行管道时,将在每个管道更新中引入多个快照,该周期由包含处理的流的AUTO CDC FROM SNAPSHOT
设置决定。 - 使用历史快照引入来处理包含数据库快照的文件,例如从 Oracle 或 MySQL 数据库或数据仓库生成的快照。
若要使用任意源类型 AUTO CDC FROM SNAPSHOT
进行 CDC 处理,请先创建一个流表,然后在 Python 中使用函数 create_auto_cdc_from_snapshot_flow()
来指定快照、键和实现处理所需的其他参数。 请参阅 定期快照引入 和 历史快照引入 示例。
传递给 API 的快照必须按版本按升序排列。 如果 Lakeflow 声明性管道检测到无序快照,则会引发错误。
有关语法详细信息,请参阅 Lakeflow 的声明式管道Python参考手册。
使用多个列进行排序
可以按多个列进行排序(例如时间戳和 ID 来解决并列),可以使用 STRUCT 将它们结合起来:首先按 STRUCT 的第一个字段排序,如果有并列情况,再考虑第二个字段,依此类推。
SQL 中的示例:
SEQUENCE BY STRUCT(timestamp_col, id_col)
Python 中的示例:
sequence_by = struct("timestamp_col", "id_col")
局限性
用于排序的列必须是可排序数据类型。
示例:使用 CDF 源数据处理 SCD 类型 1 和 SCD 类型 2
以下部分提供了 Lakeflow 声明性管道 SCD 类型 1 和类型 2 查询的示例,这些查询基于更改数据源中的源事件更新目标表:
创建新的用户记录。
删除用户记录。
更新用户记录。 在 SCD 类型 1 示例中,最后一个
UPDATE
操作延迟到达,并且从目标表中删除,演示了如何处理无序事件。
以下示例假定熟悉如何配置和更新 Lakeflow 声明性管道。 请参阅 教程:通过 Lakeflow 声明性管道使用变更数据捕获生成 ETL 管道。
若要运行这些示例,必须首先创建示例数据集。 请参阅 “生成测试数据”。
下面是这些示例的输入记录:
userId | 姓名 | 城市 | 操作 | 序列号 |
---|---|---|---|---|
124 | 劳尔 | 瓦哈卡州 | INSERT | 1 |
123 | 伊莎贝尔 | 蒙特雷 | INSERT | 1 |
125 | 梅赛德斯 | 提 华纳 | INSERT | 2 |
126 | 百合 | 坎昆 | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | 梅赛德斯 | 瓜达拉哈拉 | UPDATE | 6 |
125 | 梅赛德斯 | Mexicali | UPDATE | 5 |
123 | 伊莎贝尔 | 奇瓦瓦州 | UPDATE | 5 |
如果取消注释示例数据中的最后一行,它将插入以下记录,指定应截断记录的位置:
userId | 姓名 | 城市 | 操作 | 序列号 |
---|---|---|---|---|
null | null | null | 截断 | 3 |
注释
以下所有示例都包含指定DELETE
和TRUNCATE
操作的选项,但每个操作都是可选的。
处理 SCD 类型 1 更新
以下示例演示如何处理 SCD 类型 1 更新:
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flowname AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
运行 SCD 类型 1 示例后,目标表包含以下记录:
userId | 姓名 | 城市 |
---|---|---|
124 | 劳尔 | 瓦哈卡州 |
125 | 梅赛德斯 | 瓜达拉哈拉 |
126 | 百合 | 坎昆 |
在运行带有附加TRUNCATE
记录的 SCD 类型 1 示例之后,由于在124
进行的126
操作,记录TRUNCATE
和sequenceNum=3
被截断,目标表包含以下记录:
userId | 姓名 | 城市 |
---|---|---|
125 | 梅赛德斯 | 瓜达拉哈拉 |
处理 SCD 类型 2 更新
以下示例演示如何处理 SCD 类型 2 更新:
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
运行 SCD 类型 2 示例后,目标表包含以下记录:
userId | 姓名 | 城市 | __START_AT | __END_AT |
---|---|---|---|---|
123 | 伊莎贝尔 | 蒙特雷 | 1 | 5 |
123 | 伊莎贝尔 | 奇瓦瓦州 | 5 | 6 |
124 | 劳尔 | 瓦哈卡州 | 1 | null |
125 | 梅赛德斯 | 提 华纳 | 2 | 5 |
125 | 梅赛德斯 | Mexicali | 5 | 6 |
125 | 梅赛德斯 | 瓜达拉哈拉 | 6 | null |
126 | 百合 | 坎昆 | 2 | null |
SCD 类型 2 查询还可以指定要跟踪的目标表中历史记录的输出列的子集。 对其他列的更改将就地更新,而不是生成新的历史记录。 以下示例演示如何从跟踪中排除 city
列:
以下示例演示如何将跟踪历史记录与 SCD 类型 2 配合使用:
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("cdc_data.users")
dp.create_streaming_table("target")
dp.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
在没有其他 TRUNCATE
记录的情况下运行此示例后,目标表包含以下记录:
userId | 姓名 | 城市 | __START_AT | __END_AT |
---|---|---|---|---|
123 | 伊莎贝尔 | 奇瓦瓦州 | 1 | 6 |
124 | 劳尔 | 瓦哈卡州 | 1 | null |
125 | 梅赛德斯 | 瓜达拉哈拉 | 2 | null |
126 | 百合 | 坎昆 | 2 | null |
生成测试数据
下面提供了以下代码来生成示例数据集,以便在本教程中提供的示例查询中使用。 假设你有适当的凭据来创建新架构并创建新表,则可以使用笔记本或 Databricks SQL 运行这些语句。 以下代码 不应 作为 Lakeflow 声明性管道的一部分运行:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
示例:定期快照处理
下面的示例演示了 SCD 类型 2 处理,该处理获取存储在mycatalog.myschema.mytable
位置的表的快照。 处理结果将写入名为 target
的表。
mycatalog.myschema.mytable
在时间戳 2024-01-01 00:00:00 的记录
Key | 价值 |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
时间戳 2024-01-01 12:00:00 的记录
Key | 价值 |
---|---|
2 | b2 |
3 | a3 |
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
处理快照后,目标表包含以下记录:
Key | 价值 | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024年01月01日 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024年01月01日 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | null |
3 | a3 | 2024-01-01 12:00:00 | null |
示例:历史快照处理
以下示例演示 SCD 类型 2 的处理,该处理根据存储在云存储系统中的两个快照的源事件来更新目标表:
快照位置 timestamp
,存储在 /<PATH>/filename1.csv
Key | TrackingColumn | 非跟踪列 |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
快照位置 timestamp + 5
,存储在 /<PATH>/filename2.csv
Key | TrackingColumn | 非跟踪列 |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
以下代码示例演示如何使用以下快照处理 SCD 类型 2 更新:
from pyspark import pipelines as dp
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dp.create_streaming_live_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
处理快照后,目标表包含以下记录:
Key | TrackingColumn | 非跟踪列 | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | null |
3 | a3 | b3 | 2 | null |
4 | a4 | b4_new | 1 | null |
在目标流式处理表中添加、更改或删除数据
如果管道将表发布到 Unity 目录,则可以使用 数据作语言 (DML)语句(包括插入、更新、删除和合并语句)修改由 AUTO CDC ... INTO
语句创建的目标流式处理表。
注释
- 不支持用于修改流式处理表的表架构的 DML 语句。 确保 DML 语句不会尝试修改表架构。
- 只能在使用 Databricks Runtime 13.3 LTS 及更高版本的 Unity Catalog 共享群集或 SQL 仓库中运行用于更新流式表的 DML 语句。
- 由于流式传输要求仅追加数据源,因此如果你的处理需要从包含更改的源流式传输表进行流式传输(例如,通过 DML 语句),请在读取源流式传输表时设置 skipChangeCommits 标志。 设置
skipChangeCommits
后,将会忽略删除或修改源表上记录的事务。 如果你的处理不需要某个流式处理表,则可使用具体化视图(没有“仅追加”限制)作为目标表。
由于 Lakeflow 命令式管道使用指定的 SEQUENCE BY
列并将适当的序列值传播到目标表的 __START_AT
和 __END_AT
列(用于 SCD 类型 2),因此必须确保 DML 语句使用这些列的有效值来保持记录的正确顺序。 请参阅 如何使用 AUTO CDC API 实现 CDC?。
有关对流式处理表使用 DML 语句的详细信息,请参阅 在流式处理表中添加、更改或删除数据。
以下示例插入一个活动记录,其起始序列为 5:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
从 AUTO CDC 目标表读取更改数据馈送
在 Databricks Runtime 15.2 及以上版本中,你可以从作为 AUTO CDC
或 AUTO CDC FROM SNAPSHOT
查询目标的流式处理表中读取更改数据馈送,方式与从其他 Delta 表读取更改数据馈送相同。 从目标流式处理表读取更改数据馈送需要满足以下条件:
- 目标流式处理表必须发布到 Unity Catalog。 请参阅 将 Unity 目录与 Lakeflow 声明性管道配合使用。
- 为了从目标流式处理表中读取变更数据流,您必须使用 Databricks Runtime 15.2 或更高版本。 若要读取其他管道中的更改数据馈送,必须将管道配置为使用 Databricks Runtime 15.2 或更高版本。
从在 Lakeflow 声明性管道中创建的目标流式处理表读取更改数据馈送的方式与从其他 Delta 表读取更改数据馈送的方式相同。 若要详细了解如何使用 Delta 更改数据馈送功能,包括 Python 和 SQL 中的示例,请参阅 在 Azure Databricks 上使用 Delta Lake 更改数据馈送。
注释
更改数据馈送记录包括标识更改事件的类型的 元数据 。 在表中更新记录时,关联更改记录的元数据通常包括 _change_type
值,以及设置为 update_preimage
和 update_postimage
的事件。
但是,如果对目标流表进行更新并包含更改主键值,则这些 _change_type
值会有所不同。 更改包括对主键的更新时,元数据 _change_type
字段将设置为 insert
和 delete
事件。 在对具有 UPDATE
或 MERGE
语句的某个键字段进行手动更新时,或对于 SCD 类型 2 表,当 __start_at
字段更改以反映较早的起始序列值时,主键可能会发生更改。
查询 AUTO CDC
用于确定主键值,而在确定这些主键值的方法上,SCD 类型 1 和 SCD 类型 2 的处理是不同的。
- 对于 SCD Type 1 处理和 Lakeflow 声明式管道 Python 接口,主键是
keys
函数中create_auto_cdc_flow()
参数的值。 对于 Lakeflow 声明性管道的 SQL 接口,主键是由KEYS
语句中的AUTO CDC ... INTO
子句定义的列。 - 对于 SCD 类型 2,主键是
keys
参数或KEYS
子句加上coalesce(__START_AT, __END_AT)
操作的返回值,其中__START_AT
和__END_AT
是目标流式处理表中的相应列。
获取有关 Lakeflow 声明性管道 CDC 查询处理的记录的数据
注释
以下指标仅由 AUTO CDC
查询捕获,而不是由 AUTO CDC FROM SNAPSHOT
查询捕获。
以下指标由 AUTO CDC
查询捕获:
-
num_upserted_rows
:更新期间插入数据集的输出行数。 -
num_deleted_rows
:更新期间从数据集中删除的现有输出行数。
对于 num_output_rows
查询,不会捕获 AUTO CDC
非 CDC 流的指标(输出)。
哪些数据对象用于 Lakeflow 声明性管道 CDC 处理?
注释
- 这些数据结构仅适用于
AUTO CDC
处理,不适用于AUTO CDC FROM SNAPSHOT
处理。 - 仅当目标表发布到 Hive 元存储时,这些数据结构才适用。 如果管道发布到 Unity Catalog,则用户无法访问内部基础表。
在 Hive 元存储中声明目标表时,会创建两个数据结构:
- 一个使用分配给目标表的名称的视图。
- Lakeflow 声明性管道用于管理 CDC 处理的内部支持表。 此表通过追加
__apply_changes_storage_
到目标表名称进行命名。
例如,如果声明了名为 dp_cdc_target
的目标表,则会看到名为 dp_cdc_target
的视图和元存储中命名 __apply_changes_storage_dp_cdc_target
的表。 创建视图可让 Lakeflow 声明性管道过滤处理无序数据所需的额外信息(例如,墓碑和版本)。 若要查看已处理的数据,请查询目标视图。 由于表的 __apply_changes_storage_
架构可能会更改为支持将来的功能或增强功能,因此不应查询表以供生产使用。 如果手动向表添加数据,则假定记录位于其他更改之前,因为缺少版本列。