什么是变更数据捕获 (CDC)?

更改数据捕获(CDC)是一种数据集成模式,用于捕获对源系统中数据所做的更改,例如插入、更新和删除。 这些更改(表示为列表)通常称为 CDC 数据流。 如果利用 CDC 传输流进行操作,而不是读取整个源数据集,就可以更快地处理数据。 事务数据库(如 SQL Server、MySQL 和 Oracle)会生成 CDC 数据流。 Delta 表生成自己的 CDC 源,称为变更数据馈送(CDF)。

下图显示,当源表中包含员工数据的行更新时,它会在只包含更改的 CDC 数据流中生成新的行集。 CDC 数据流的每一行通常包含其他元数据,包括操作(例如 UPDATE)以及一个列,可以用于确定 CDC 数据流中每一行的顺序,以便有效处理无序更新。 例如,sequenceNum 下图所示列用于确定 CDC 数据流中的行顺序:

更改数据捕获概述。

处理更改数据馈送:仅保留最新数据与保留历史数据版本

更改的数据馈送的处理称为缓慢变化维度(SCD)。 处理 CDC 数据流时,可以选择:

  • 是否仅保留最新数据(即覆盖现有数据) ? 这称为 SCD 类型 1
  • 或者,是否保留数据更改的历史记录? 这称为 SCD 类型 2

SCD 类型 1 处理涉及在发生更改时用新数据覆盖旧数据。 这意味着不会保留任何更改历史记录。 只有最新版本的数据可用。 这是一种简单的方法,通常在更改历史记录不重要(例如更正错误或更新客户电子邮件地址等非关键字段)时使用。

更改数据捕获 SCD 类型 1 概述。

SCD 类型 2 处理通过创建其他记录来捕获不同版本的数据,从而维护数据更改的历史记录。 每个版本的数据都带有时间戳或标记有元数据,允许用户在发生更改时进行跟踪。 当跟踪数据演变(如跟踪客户地址随时间变化进行分析)非常重要时,这非常有用。

更改数据捕获 SCD 类型 2 概述。

使用 Lakeflow 声明性管道进行 SCD 类型 1 和 2 处理的示例

本部分中的示例演示如何使用 SCD 类型 1 和类型 2。

步骤 1:准备示例数据

在此示例中,你将生成一个示例 CDC 数据馈送。 首先,创建笔记本并将以下代码粘贴到其中。 将代码块开头的变量更新为有权创建表和视图的目录和架构。

此代码创建一个新的 Delta 表,其中包含多个更改记录。 架构如下所示:

  • id - 此员工的唯一标识符的整数
  • name - 字符串,员工名称
  • role - 字符串,员工角色
  • country - 员工工作的字符串、国家/地区代码
  • operation- 更改类型(例如,INSERTUPDATEDELETE
  • sequenceNum - 整数,标识源数据中 CDC 事件的逻辑顺序。 Lakeflow 声明性管道使用这种排序来处理按无序顺序到达的变更事件。
# update these to the catalog and schema where you have permissions
# to create tables and views.

catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"

def write_employees_cdf_to_delta():
 data = [
   (1, "Alex", "chef", "FR", "INSERT", 1),
   (2, "Jessica", "owner", "US", "INSERT", 2),
   (3, "Mikhail", "security", "UK", "INSERT", 3),
   (4, "Gary", "cleaner", "UK", "INSERT", 4),
   (5, "Chris", "owner", "NL", "INSERT", 6),
   # out of order update, this should be dropped from SCD Type 1
   (5, "Chris", "manager", "NL", "UPDATE", 5),
   (6, "Pat", "mechanic", "NL", "DELETE", 8),
   (6, "Pat", "mechanic", "NL", "INSERT", 7)
 ]
 columns = ["id", "name", "role", "country", "operation", "sequenceNum"]
 df = spark.createDataFrame(data, columns)
 df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog}.{schema}.{employees_cdf_table}")

write_employees_cdf_to_delta()

可以使用以下 SQL 命令预览此数据:

SELECT *
FROM mycatalog.myschema.employees_cdf

步骤 2:使用 SCD 类型 1 仅保留最新数据

建议在 Lakeflow 声明性管道中使用 AUTO CDC API 来处理 SCD 类型 1 表中的更改数据馈送。

  1. 创建新的笔记本。

  2. 将以下代码粘贴到其中。

  3. 创建并连接到管道

employees_cdf 函数将读取我们前面刚刚创建的表作为流,因为 create_auto_cdc_flow API 将用于更改数据捕获处理,因此需要将更改流作为输入。 使用修饰器 @dp.temporary_view 包装它,因为不想将此流具体化到表中。

然后,您用 dp.create_target_table 创建一个流式处理表,其中包含处理此更改数据馈送结果。

最后,你使用 dp.create_auto_cdc_flow 处理变更数据馈送。 让我们看看每个参数:

  • target - 您之前定义的目标流式表。
  • source - 对之前定义的更改记录流的视图。
  • keys - 标识更改源中的唯一行。 由于你将id用作唯一标识符,因此只需提供id作为识别列。
  • sequence_by - 指定源数据中 CDC 事件的逻辑顺序的列名称。 需要这个排序来处理顺序不一致到达的变更事件。 将 sequenceNum 作为排序列。
  • apply_as_deletes - 由于示例数据包含删除操作,因此使用 apply_as_deletes 指示应将 CDC 事件视为 DELETE 而不是更新或插入。
  • except_column_list - 包含不希望包含在目标表中的列的列表。 在此示例中,你将使用此参数排除 sequenceNumoperation
  • stored_as_scd_type - 指示要使用的 SCD 类型。
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr, lit, when
from pyspark.sql.types import StringType, ArrayType

catalog = "mycatalog"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table_current = "employees_current"
employees_table_historical = "employees_historical"

@dp.temporary_view
def employees_cdf():
 return spark.readStream.format("delta").table(f"{catalog}.{schema}.{employees_cdf_table}")

dp.create_target_table(f"{catalog}.{schema}.{employees_table_current}")

dp.create_auto_cdc_flow(
 target=f"{catalog}.{schema}.{employees_table_current}",
 source=employees_cdf_table,
 keys=["id"],
 sequence_by=col("sequenceNum"),
 apply_as_deletes=expr("operation = 'DELETE'"),
 except_column_list = ["operation", "sequenceNum"],
 stored_as_scd_type = 1
)

单击“ 开始”运行此管道。

然后,在 SQL 编辑器中运行以下查询,验证是否已正确处理更改记录:

SELECT *
FROM mycatalog.myschema.employees_current

注释

员工 Chris 的无序更新已正确删除,因为他们的角色仍设置为“所有者”而不是经理。

更改数据捕获 SCD 类型 1 示例。

步骤 3:使用 SCD 类型 2 来保留历史数据

在此示例中,将创建第二个目标表,该 employees_historical表包含对员工记录的更改的完整历史记录。

将此代码添加到管道。 此处的唯一区别是 stored_as_scd_type 设置为 2 而不是 1。

dp.create_target_table(f"{catalog}.{schema}.{employees_table_historical}")

dp.create_auto_cdc_flow(
 target=f"{catalog}.{schema}.{employees_table_historical}",
 source=employees_cdf_table,
 keys=["id"],
 sequence_by=col("sequenceNum"),
 apply_as_deletes=expr("operation = 'DELETE'"),
 except_column_list = ["operation", "sequenceNum"],
 stored_as_scd_type = 2
)

单击“ 开始”运行此管道。

然后,在 SQL 编辑器中运行以下查询,验证是否已正确处理更改记录:

SELECT *
FROM mycatalog.myschema.employees_historical

你将看到对员工的所有更改,包括那些已被删除的员工,例如 Pat。

更改数据捕获 SCD 类型 2 示例。

步骤 4:清理资源

完成后,请按照以下步骤清理资源:

  1. 删除流水线:

    注释

    删除管道时,它会自动删除 employeesemployees_historical 表。

    1. 单击 “作业与管道”,然后查找要删除的管道名称。
    2. 单击 “溢出”图标。 在管道名称所在的同一行中,然后单击“ 删除”。
  2. 删除笔记本。

  3. 请删除包含更改数据流的表:

    1. 单击“ 新建 > 查询”。
    2. 粘贴并运行以下 SQL 代码,根据需要调整目录和架构:
DROP TABLE mycatalog.myschema.employees_cdf

使用 MERGE INTOforeachBatch 用于更改数据捕获的缺点

Databricks 提供了一个 MERGE INTO SQL 命令,可用于 foreachBatch API 将行向上插入 Delta 表。 本部分探讨如何将此方法用于简单的用例,但此方法在应用于实际方案时变得越来越复杂和脆弱。

在这个示例中,你将使用与前面示例中相同的更改数据馈送。

简单的实现与MERGE INTOforeachBatch

创建笔记本并将以下代码复制到其中。 根据需要更改catalogschemaemployees_table变量。 应将 catalogschema 变量设置为 Unity Catalog 中可以创建表的位置。

运行笔记本时,它会执行以下事情:

  • create_table中创建目标表。 与自动处理此步骤不同的 create_auto_cdc_flow是,必须指定架构。
  • 以流的形式读取更改数据馈送。 使用upsertToDelta方法处理每个微分块,该方法运行MERGE INTO命令。
catalog = "jobs"
schema = "myschema"
employees_cdf_table = "employees_cdf"
employees_table = "employees_merge"

def upsertToDelta(microBatchDF, batchId):
 microBatchDF.createOrReplaceTempView("updates")
 microBatchDF.sparkSession.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET *
   WHEN NOT MATCHED THEN INSERT *
 """)

def create_table():
 spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
  spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
   (id INT, name STRING, age INT, country STRING)
 """)

create_table()

cdcData = spark.readStream.table(f"{catalog}.{schema}.{employees_cdf_table}")

cdcData.writeStream \
.foreachBatch(upsertToDelta) \
.outputMode("append") \
.start()

若要查看结果,请运行以下 SQL 查询:

SELECT *
FROM mycatalog.myschema.employees_merge

遗憾的是,结果不正确,如下所示:

更改数据捕获 MERGE INTO 示例。

同一微批处理中相同密钥的多个更新

第一个问题是,代码不会处理同一微包中同一键的多个更新。 例如,您可以使用 INSERT 来插入员工 Chris,然后更新他的角色,将其从所有者变更为经理。 结果会导致一行,但结果却显示有两行。

当微包中有多个对同一密钥的更新时,哪个更改会获胜?

在同一微批处理示例中,更改数据捕获对同一键的多次更新。

逻辑变得更加复杂。 下面的代码示例通过sequenceNum检索最新的行,并仅将该数据合并到目标表中,如下所示:

  • 按主键分组。 id
  • 获取该键的批处理中具有最大值 sequenceNum 的行的所有列。
  • 将行展开回去。

按如下所示更新upsertToDelta方法,然后运行代码。

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET *
   WHEN NOT MATCHED THEN INSERT *
 """)

查询目标表时,可以看到名为 Chris 的员工具有正确的角色,但仍存在其他问题要解决,因为你仍然在目标表中显示已删除的记录。

更改数据捕获同一微包示例结果中同一键的多个更新。

跨微批处理的无序更新

本节探讨跨微批处理之间无序更新的问题。 下图说明了这个问题:如果 Chris 的行在第一个微批中有一个 UPDATE 操作,随后在后续微批中有一个 INSERT 操作,该怎么办? 代码无法正确处理。

当多个微批中对同一键进行无序更新时,哪个更改会生效?

更改数据捕获的无序更新跨微批处理的示例。

若要解决此问题,请展开代码以在每个行中存储版本,如下所示:

  • sequenceNum存储上次更新行的时间。
  • 对于每个新行,请检查时间戳是否大于存储的时间戳,然后应用以下逻辑:
    • 如果更大,请使用目标中的新数据。
    • 否则,请将数据保留在源中。

首先,更新 createTable 方法以存储 sequenceNum,因为你将使用它对每行进行版本化:

def create_table():
 spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{employees_table}")
  spark.sql(f"""
   CREATE TABLE IF NOT EXISTS {catalog}.{schema}.{employees_table}
   (id INT, name STRING, age INT, country STRING, sequenceNum INT)
 """)

接下来,更新 upsertToDelta 以处理行版本。 UPDATE SET子句的MERGE INTO需要单独处理每一列。

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

处理删除

遗憾的是,代码仍存在问题。 它不处理 DELETE 操作,因为员工 Pat 仍然在目标表中,这证明了这一点。

假设删除到达同一个微包。 若要处理它们,请在更改数据记录指示删除时再次更新 upsertToDelta 该方法以删除该行,如下所示:

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

处理删除后无序到达的更新

遗憾的是,上述代码仍然不太正确,因为它未能处理 DELETE 在 microbatches 间被无序的 UPDATE 跟随的情况。

更改数据捕获处理删除示例后已无序到达的更新。

处理这一情况的算法需要记录删除操作,以便能够处理后续发生的无序更新。 为此,请按以下步骤操作:

  • 不要立即删除行,而是通过时间戳或 sequenceNum 进行软删除。 软删除的行将被标记为删除
  • 将所有用户重定向到过滤掉标记为删除的记录的视图。
  • 生成一个清理作业,该作业会随时间推移删除墓碑。

使用以下代码:

def upsertToDelta(microBatchDF, batchId):
 microBatchDF = microBatchDF.groupBy("id").agg(
   max_by(struct("*"), "sequenceNum").alias("row")
 ).select("row.*").createOrReplaceTempView("updates")

 spark.sql(f"""
   MERGE INTO {catalog}.{schema}.{employees_table} t
   USING updates s
   ON s.id = t.id
   WHEN MATCHED AND s.operation = 'DELETE' THEN UPDATE SET DELETED_AT=now()
   WHEN MATCHED THEN UPDATE SET
     name=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.name ELSE t.name END,
     age=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.age ELSE t.age END,
     country=CASE WHEN s.sequenceNum > t.sequenceNum THEN s.country ELSE t.country END
   WHEN NOT MATCHED THEN INSERT *
 """)

用户不能直接使用目标表,因此请创建一个视图,以便他们可以查询:

CREATE VIEW employees_v AS
SELECT * FROM employees_merge
WHERE DELETED_AT = NULL

最后,创建一个定期清理墓碑行的作业:

DELETE FROM employees_merge
WHERE DELETED_AT < now() - INTERVAL 1 DAY