Azure Cosmos DB 分析存储中的变更数据捕获入门

适用对象: NoSQL MongoDB

使用 Azure Cosmos DB 分析存储中的变更数据捕获 (CDC) 作为 Azure 数据工厂Azure Synapse Analytics 的源,以捕获对数据所做的特定更改。

先决条件

启用分析存储

首先,在帐户级别启用 Azure Synapse Link,然后为适合工作负载的容器启用分析存储。

  1. 启用 Azure Synapse Link:为 Azure Cosmos DB 帐户启用 Azure Synapse Link

  2. 为容器启用分析存储:

    选项 指南
    为特定的新容器启用 为新容器启用 Azure Synapse Link
    为特定的现有容器启用 为现有容器启用 Azure Synapse Link

使用数据流创建目标 Azure 资源

分析存储的变更数据捕获功能可通过 Azure 数据工厂Azure Synapse Analytics 的数据流功能获得。 对于本指南,请使用 Azure 数据工厂。

重要

也可以使用 Azure Synapse Analytics。 首先,创建 Azure Synapse 工作区(如果还没有)。 在新创建的工作区中,依次选择“开发”选项卡、“添加新资源”和“数据流”。

  1. 创建 Azure 数据工厂(如果还没有)。

    提示

    如果可能,请在 Azure Cosmos DB 帐户所在的同一区域中创建数据工厂。

  2. 启动新创建的数据工厂。

  3. 在数据工厂中,选择“数据流”选项卡,然后选择“新建数据流”。

  4. 为新创建的数据流指定唯一名称。 在此示例中,数据流命名为 cosmoscdc

    Screnshot of a new data flow with the name cosmoscdc.

配置分析存储容器的源设置

现在,创建并配置一个源以从 Azure Cosmos DB 帐户的分析存储中传输数据。

  1. 选择“添加源”。

    Screenshot of the add source menu option.

  2. 在“输出流名称”字段中,输入 cosmos

    Screenshot of naming the newly created source cosmos.

  3. 在“源类型”部分,选择“内联”。

    Screenshot of selecting the inline source type.

  4. 在“数据集”字段中,选择“Azure - Azure Cosmos DB for NoSQL”。

    Screenshot of selecting Azure Cosmos DB for NoSQL as the dataset type.

  5. 为名为 cosmoslinkedservice 的帐户创建新的链接服务。 在“新建链接服务”弹出对话框中,选择现有的 Azure Cosmos DB for NoSQL 帐户,然后选择“确定”。 在此示例中,我们选择名为 msdocs-cosmos-source 的现存 Azure Cosmos DB for NoSQL 帐户和名为 cosmicworks 的数据库。

    Screenshot of the New linked service dialog with an Azure Cosmos DB account selected.

  6. 为存储类型选择“分析”。

    Screenshot of the analytical option selected for a linked service.

  7. 选择“源选项”选项卡。

  8. 在“源选项”中,选择目标容器并启用“数据流调试”。 在此示例中,容器命名为 products

    Screenshot of a source container selected named products.

  9. 选择“数据流调试”。 在“启用数据流调试”弹出对话框中,保留默认选项,然后选择“确定”。

    Screenshot of the toggle option to enable data flow debug.

  10. 源选项”选项卡还包含你可能希望启用的其他选项。 下表描述了这些选项:

选项 说明
捕获中间更新 如果要捕获项目的更改历史记录,包括变更数据捕获读取之间的中间更改,请启用此选项。
捕获删除 启用此选项可捕获用户删除的记录并将其应用于接收器。 不能在 Azure 数据资源管理器和 Azure Cosmos DB 接收器上应用删除。
捕获事务存储 TTL 启用此选项可捕获 Azure Cosmos DB 事务存储 TTL(生存时间)删除的记录,并将其应用于接收器。 不能在 Azure 数据资源管理器和 Azure Cosmos DB 接收器上应用 TTL 删除。
批大小(以字节为单位) 此设置实际上是千兆字节 (GB)。 如果要对变更数据捕获源进行批处理,请指定大小(以 GB 为单位)
额外配置 额外的 Azure Cosmos DB 分析存储配置及其值。 (例如:spark.cosmos.allowWhiteSpaceInFieldNames -> true

使用源选项

选中任何 Capture intermediate updatesCapture DeltesCapture Transactional store TTLs 选项时,CDC 进程将在接收器中创建 __usr_opType 字段并填入以下值:

说明 选项
1 UPDATE 捕获中间更新
2 INSERT 没有插入选项,其默认处于打开状态
3 USER_DELETE 捕获删除
4 TTL_DELETE 捕获事务存储 TTL

如果必须将 TTL 删除的记录与用户或应用程序删除的文档区分开来,则要同时选中 Capture intermediate updatesCapture Transactional store TTLs 选项。 然后,必须根据业务需求调整 CDC 进程、应用程序或查询以使用 __usr_opType

提示

如果需要下游使用者在选中“捕获中间更新”选项的情况下还原更新顺序,则可以将系统时间戳 _ts 字段用作排序字段。

创建和配置用于更新和删除操作的接收器设置

首先,创建一个简单的 Azure Blob 存储接收器,然后将该接收器配置为仅筛选特定操作的数据。

  1. 创建 Azure Blob 存储帐户和容器(如果还没有)。 在接下来的示例中,我们将使用名为 msdocsblobstorage 的帐户和名为 output 的容器。

    提示

    如果可能,请在 Azure Cosmos DB 帐户所在的同一区域中创建存储帐户。

  2. 返回 Azure 数据工厂,为从 cosmos 源捕获的变更数据创建新的接收器。

    Screenshot of adding a new sink that's connected to the existing source.

  3. 为接收器提供唯一名称。 在此示例中,接收器命名为 storage

    Screenshot of naming the newly created sink storage.

  4. 在“接收器类型”部分,选择“内联”。 在“数据集”字段中,选择“增量”。

    Screenshot of selecting and Inline Delta dataset type for the sink.

  5. 使用名为 storagelinkedserviceAzure Blob 存储为帐户创建新的链接服务。 在“新建链接服务”弹出对话框中,选择现有的 Azure Blob 存储帐户,然后选择“确定”。 在此示例中,我们选择名为 msdocsblobstorage 的现存 Azure Blob 存储帐户。

    Screenshot of the service type options for a new Delta linked service.

    Screenshot of the New linked service dialog with an Azure Blob Storage account selected.

  6. 选择“设置”选项卡。

  7. 在“设置”中,将“文件夹路径”设置为 Blob 容器的名称。 在此示例中,容器的名称为 output

    Screenshot of the blob container named output set as the sink target.

  8. 找到“更新方法”部分,并将选择更改为仅允许“删除”和“更新”操作。 此外,使用字段 {_rid} 作为唯一标识符,将“键列”指定为“列的列表”。

    Screenshot of update methods and key columns being specified for the sink.

  9. 选择“验证”以确保未出现任何错误或遗漏。 然后,选择“发布”以发布数据流。

    Screenshot of the option to validate and then publish the current data flow.

计划变更数据捕获执行

发布数据流后,可以添加新管道来移动和转换数据。

  1. 创建新管道。 为管道指定唯一名称。 在此示例中,管道名称为 cosmoscdcpipeline

    Screenshot of the new pipeline option within the resources section.

  2. 在“活动”部分中,展开“移动和转换”选项,然后选择“数据流”。

    Screenshot of the data flow activity option within the activities section.

  3. 为数据流活动指定唯一名称。 在此示例中,活动命名为 cosmoscdcactivity

  4. 在“设置”选项卡中,选择之前在本指南中创建的名为 cosmoscdc 的数据流。 然后,根据数据量和工作负载所需的延迟选择计算大小。

    Screenshot of the configuration settings for both the data flow and compute size for the activity.

    提示

    对于大于 100 GB 的增量数据大小,建议使用核心数为 32(+16 个驱动程序核心)的自定义大小。

  5. 选择“添加触发器”。 计划此管道,使其按对工作负载有意义的节奏执行。 在此示例中,管道配置为每五分钟执行一次。

    Screenshot of the add trigger button for a new pipeline.

    Screenshot of a trigger configuration based on a schedule, starting in the year 2023, that runs every five minutes.

    注意

    执行变更数据捕获的最小定期时段为一分钟。

  6. 选择“验证”以确保未出现任何错误或遗漏。 然后,选择“发布”以发布管道。

  7. 使用 Azure Cosmos DB 分析存储变更数据捕获,观察作为数据流输出置于 Azure Blob 存储容器中的数据。

    Screnshot of the output files from the pipeline in the Azure Blob Storage container.

    注意

    初始群集启动时间最长可能需要三分钟。 若要在后续的变更数据捕获执行中避免群集启动时间,请配置数据流群集的生存时间值。 有关集成运行时和 TTL 的详细信息,请参阅 Azure 数据工厂中的集成运行时

并发作业

源选项中的批大小,或者接收器引入更改流的速度较慢的情况,可能会导致同时执行多个作业。 若要避免这种情况,请在“管道”设置中将“并发”选项设置为 1,以确保在当前执行完成之前不会触发新的执行。

后续步骤