使用 AUTO CDC 复制外部 RDBMS 表

本页逐步讲解如何使用 Lakeflow 声明性管道中的 AUTO CDC API 将表从外部关系数据库管理系统(RDBMS)复制到 Azure Databricks。 学习内容:

  • 用于设置源的常见模式。
  • 如何通过once流完整复制一次现有数据。
  • 如何使用change流持续处理新更改。

此模式非常适合生成渐变维度(SCD)表或使目标表与外部记录系统保持同步。

在您开始之前

本指南假定你有权从源访问以下数据集:

  • 云存储中源表的完整快照。 此数据集用于初始加载。
  • 连续更改源,填充到同一云存储位置(例如,使用 Debezium、Kafka 或基于日志的 CDC)。 此源是正在进行的 AUTO CDC 进程的输入。

设置源视图

首先,定义两个源视图,以从云存储路径rdbms_orders填充orders_snapshot_path目标表。 这两种视图都是在云存储中基于原始数据构建的流式视图。 使用视图可提供更高的效率,因为无需在过程中使用 AUTO CDC 之前写入数据。

  • 第一个源视图是完整快照(full_orders_snapshot
  • 第二个是连续更改源(rdbms_orders_change_feed)。

本指南中的示例使用云存储作为源,但可以使用流式处理表支持的任何源。

full_orders_snapshot()

此步骤创建一个 Lakeflow 声明性管道视图,用于读取订单数据的初始完整快照。

Python

以下 Python 示例:

  • 与自动加载器一 spark.readStream 起使用(format("cloudFiles")
  • 从由 orders_snapshot_path 定义的目录中读取 JSON 文件
  • includeExistingFiles设置为true以确保已处理路径中的历史数据
  • inferColumnTypes设置为true以自动推断架构
  • 返回包含 .select("\*") 的所有列
@dp.view()
def full_orders_snapshot():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(orders_snapshot_path)
        .select("*")
    )

SQL

以下 SQL 示例将选项作为字符串键值对的映射传递。 orders_snapshot_path 应可用作 SQL 变量(例如,使用管道参数定义或手动内插)。

CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
  "cloudFiles.includeExistingFiles", "true",
  "cloudFiles.inferColumnTypes", "true"
));

rdbms_orders_change_feed()

此步骤创建第二个 Lakeflow 声明式管道视图,用于读取增量更改数据(例如,从 CDC 日志或变更表中)。 它从 orders_cdc_path 中读取并假定 CDC 样式的 JSON 文件会定期放入此路径。

Python

@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)

SQL

在以下 SQL 示例中,${orders_cdc_path} 是一个变量,可以通过在管道设置中设置值或在代码中显式设置变量来进行替换。

CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));

初始水化(初次流动)

设置源后, AUTO CDC 逻辑会将这两个源合并到目标流式处理表中。 首先,使用一次性 AUTO CDCONCE=TRUE 将 RDBMS 表的完整内容复制到流式处理表中。 这将为目标表准备历史数据,并确保在将来的更新中不重播这些数据。

Python

from pyspark import pipelines as dp

# Step 1: Create the target streaming table

dp.create_streaming_table("rdbms_orders")

# Step 2: Once Flow — Load initial snapshot of full RDBMS table

dp.create_auto_cdc_flow(
  flow_name = "initial_load_orders",
  once = True,  # one-time load
  target = "rdbms_orders",
  source = "full_orders_snapshot",  # e.g., ingested from JDBC into bronze
  keys = ["order_id"],
  sequence_by = "timestamp",
  stored_as_scd_type = "1"
)

SQL


-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;

-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

once 流仅运行一次。 忽略在创建管道后添加到 full_orders_snapshot 的新文件。

重要

执行rdbms_orders流式处理表的完全刷新会重新运行once流程。 如果云存储中的初始快照数据已删除,则会导致数据丢失。

连续更改源 (更改流)

初始快照加载后,使用另一个 AUTO CDC 流从 RDBMS 的 CDC 数据流持续吸收更改。 这会使 rdbms_orders 表保持最新状态,并插入、更新和删除。

Python

from pyspark import pipelines as dp

# Step 3: Change Flow — Ingest ongoing CDC stream from source system

dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)

SQL

-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

注意事项

回填幂等性 仅当完全刷新目标表时,once流才会重新运行。
多个流 可以使用多个更正流来合并更正、后期到达的数据或备用源,但所有流都必须共享同一架构和密钥。
完全刷新 流式处理表的 rdbms_orders 完全刷新会重新运行 once 流。 如果初始云存储位置删除了初始快照数据,则可能会导致数据丢失。
流执行顺序 流执行的顺序并不重要。 最终结果相同。