使用
本页逐步讲解如何使用 Lakeflow 声明性管道中的 AUTO CDC API 将表从外部关系数据库管理系统(RDBMS)复制到 Azure Databricks。 学习内容:
- 用于设置源的常见模式。
- 如何通过
once流完整复制一次现有数据。 - 如何使用
change流持续处理新更改。
此模式非常适合生成渐变维度(SCD)表或使目标表与外部记录系统保持同步。
在您开始之前
本指南假定你有权从源访问以下数据集:
- 云存储中源表的完整快照。 此数据集用于初始加载。
- 连续更改源,填充到同一云存储位置(例如,使用 Debezium、Kafka 或基于日志的 CDC)。 此源是正在进行的
AUTO CDC进程的输入。
设置源视图
首先,定义两个源视图,以从云存储路径rdbms_orders填充orders_snapshot_path目标表。 这两种视图都是在云存储中基于原始数据构建的流式视图。 使用视图可提供更高的效率,因为无需在过程中使用 AUTO CDC 之前写入数据。
- 第一个源视图是完整快照(
full_orders_snapshot) - 第二个是连续更改源(
rdbms_orders_change_feed)。
本指南中的示例使用云存储作为源,但可以使用流式处理表支持的任何源。
full_orders_snapshot()
此步骤创建一个 Lakeflow 声明性管道视图,用于读取订单数据的初始完整快照。
Python
以下 Python 示例:
- 与自动加载器一
spark.readStream起使用(format("cloudFiles")) - 从由
orders_snapshot_path定义的目录中读取 JSON 文件 - 将
includeExistingFiles设置为true以确保已处理路径中的历史数据 - 将
inferColumnTypes设置为true以自动推断架构 - 返回包含
.select("\*")的所有列
@dp.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
SQL
以下 SQL 示例将选项作为字符串键值对的映射传递。
orders_snapshot_path 应可用作 SQL 变量(例如,使用管道参数定义或手动内插)。
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
此步骤创建第二个 Lakeflow 声明式管道视图,用于读取增量更改数据(例如,从 CDC 日志或变更表中)。 它从 orders_cdc_path 中读取并假定 CDC 样式的 JSON 文件会定期放入此路径。
Python
@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
SQL
在以下 SQL 示例中,${orders_cdc_path} 是一个变量,可以通过在管道设置中设置值或在代码中显式设置变量来进行替换。
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
初始水化(初次流动)
设置源后, AUTO CDC 逻辑会将这两个源合并到目标流式处理表中。 首先,使用一次性 AUTO CDC 流 ONCE=TRUE 将 RDBMS 表的完整内容复制到流式处理表中。 这将为目标表准备历史数据,并确保在将来的更新中不重播这些数据。
Python
from pyspark import pipelines as dp
# Step 1: Create the target streaming table
dp.create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
dp.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
once 流仅运行一次。 忽略在创建管道后添加到 full_orders_snapshot 的新文件。
重要
执行rdbms_orders流式处理表的完全刷新会重新运行once流程。 如果云存储中的初始快照数据已删除,则会导致数据丢失。
连续更改源 (更改流)
初始快照加载后,使用另一个 AUTO CDC 流从 RDBMS 的 CDC 数据流持续吸收更改。 这会使 rdbms_orders 表保持最新状态,并插入、更新和删除。
Python
from pyspark import pipelines as dp
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
注意事项
| 回填幂等性 | 仅当完全刷新目标表时,once流才会重新运行。 |
|---|---|
| 多个流 | 可以使用多个更正流来合并更正、后期到达的数据或备用源,但所有流都必须共享同一架构和密钥。 |
| 完全刷新 | 流式处理表的 rdbms_orders 完全刷新会重新运行 once 流。 如果初始云存储位置删除了初始快照数据,则可能会导致数据丢失。 |
| 流执行顺序 | 流执行的顺序并不重要。 最终结果相同。 |