将数据从 Azure Cosmos DB 引入到 Azure 数据资源管理器

Azure 数据资源管理器支持通过使用更改源馈送Azure Cosmos DB for NoSql引入数据。 Cosmos DB 更改源数据连接是一条导入管道,用于侦听 Cosmos DB 的更改源,并将数据加载到数据资源管理器表中。 更改源侦听新文档和更新的文档,但不记录删除。 有关 Azure 数据资源管理器中数据引入的常规信息,请参阅 Azure 数据资源管理器数据引入概述

每个数据连接侦听特定的 Cosmos DB 容器,并将数据引入指定表(多个连接可以引入到单个表中)。 该引入方法支持流式引入(如果已启用)和排队引入。

使用 Cosmos DB 更改源数据连接的两个主要方案:

本文介绍如何设置 Cosmos DB 更改源数据连接,以便使用系统托管标识将数据引入 Azure 数据资源管理器。 在开始之前,请查看注意事项

使用以下步骤设置连接器:

步骤 1:选择 Azure 数据资源管理器表并配置其表映射

步骤 2:创建 Cosmos DB 数据连接

步骤 3:测试数据连接

先决条件

步骤 1:选择 Azure 数据资源管理器表并配置其表映射

在创建数据连接之前,请创建一个表,在其中存储引入的数据并应用与源 Cosmos DB 容器中的架构匹配的映射。 如果你的方案需要的不仅仅是简单的字段映射,则可以使用更新策略来转换和映射从更改源中引入的数据

下面显示了 Cosmos DB 容器中某个项的示例架构:

{
    "id": "17313a67-362b-494f-b948-e2a8e95e237e",
    "name": "Cousteau",
    "_rid": "pL0MAJ0Plo0CAAAAAAAAAA==",
    "_self": "dbs/pL0MAA==/colls/pL0MAJ0Plo0=/docs/pL0MAJ0Plo0CAAAAAAAAAA==/",
    "_etag": "\"000037fc-0000-0700-0000-626a44110000\"",
    "_attachments": "attachments/",
    "_ts": 1651131409
}

使用以下步骤创建表并应用表映射:

  1. 在 Azure 数据资源管理器 Web UI 的左侧导航菜单中选择“查询”,然后选择要在其中创建表的数据库。

  2. 运行以下命令来创建一个名为 TestTable 的表。

    .create table TestTable(Id:string, Name:string, _ts:long, _timestamp:datetime)
    
  3. 运行以下命令来创建表映射。

    该命令将 Cosmos DB JSON 文档中的自定义属性映射到 TestTable 表中的列,如下所示:

    Cosmos DB 属性 表列 转换
    id ID
    name 名称
    _ts _ts
    _ts _timestamp 使用 DateTimeFromUnixSeconds 可将 _tsUNIX 秒转换_timestamp (datetime)

    注意事项

    使用以下时间戳列:

    • _ts:使用此列将数据与 Cosmos DB 协调。
    • _timestamp:使用此列在 Kusto 查询中运行高效的时间筛选器。 有关详细信息,请参阅查询最佳做法
    .create table TestTable ingestion json mapping "DocumentMapping"
    ```
    [
        {"column":"Id","path":"$.id"},
        {"column":"Name","path":"$.name"},
        {"column":"_ts","path":"$._ts"},
        {"column":"_timestamp","path":"$._ts", "transform":"DateTimeFromUnixSeconds"}
    ]
    ```
    

使用更新策略转换和映射数据

如果您的方案需要的不仅仅是字段的简单映射,请使用更新策略来转换和映射从更改源引入的数据。

更新策略 在数据引入到表时转换数据。 使用 Kusto 查询语言编写它们,并在引入管道上运行它们。 使用它们将数据从 Cosmos DB 更改源数据流引入中转换,例如在以下方案中:

  • 您的文档中包含的数组可以通过使用 mv-expand 运算符转换为多行,从而更易于查询。
  • 你想要过滤掉文档。 例如,可以使用运算符按类型 where 筛选出文档。
  • 你的复杂逻辑无法在表映射中表示。

有关如何创建和管理更新策略的信息,请参阅更新策略概述

步骤 2:创建 Cosmos DB 数据连接

使用以下方法创建数据连接器:

  1. 在Azure 门户中,转到群集概述页,然后选择“入门”选项卡。

  2. 数据摄取磁贴上,选择创建数据连接>Cosmos DB

    “入门”选项卡的屏幕截图,其中显示了“创建 Cosmos DB 数据连接”选项。

  3. 在 Cosmos DB “创建数据连接 ”窗格中,用下表中的信息填写表单:

    “数据连接”窗格的屏幕截图,其中显示了包含值的表单字段。

    字段 说明
    数据库名称 选择要将数据引入到的 Azure 数据资源管理器数据库。
    数据连接名称 指定数据连接的名称。
    订阅 选择包含 Cosmos DB NoSQL 帐户的订阅。
    Cosmos DB 帐户 选择要从中引入数据的 Cosmos DB 帐户。
    SQL 数据库 选择要从中引入数据的 Cosmos DB 数据库。
    SQL 容器 选择要从中引入数据的 Cosmos DB 容器。
    表名称 指定要将数据引入到的 Azure 数据资源管理器表名称
    映射名称 (可选)指定要用于数据连接的映射名称
  4. (可选)在 “高级设置” 部分下,输入以下信息:

    1. 指定“事件检索开始日期”。 此值是连接器开始引入数据的时间。 如果未指定时间,连接器将从创建数据连接时开始引入数据。 建议的日期格式是 ISO 8601 UTC 标准,按以下方式指定:yyyy-MM-ddTHH:mm:ss.fffffffZ

    2. 选择用户分配,然后选择身份。 默认情况下,连接使用 系统分配的 托管标识。 如有必要,可以使用用户分配的标识。

      “数据连接”窗格的屏幕截图,其中显示了“高级设置”。

  5. 选择“ 创建 ”以创建数据连接。

步骤 3:测试数据连接

  1. 在 Cosmos DB 容器中,插入以下文档:

    {
        "name":"Cousteau"
    }
    
  2. 在 Azure 数据资源管理器 Web UI 中,运行以下查询:

    TestTable
    

    结果集应如下图所示:

    结果窗格的屏幕截图,其中显示了引入的文档。

注意事项

Azure 数据资源管理器使用聚合(批处理)策略进行排队的数据引入,以优化引入过程。 当批满足以下条件之一时,默认批处理策略会密封一个批:最长延迟时间为 5 分钟、总大小为 1 GB 或 1,000 个 blob。 因此,可能会遇到延迟。 有关详细信息,请参阅批处理策略。 若要降低延迟,请将表配置为支持流式处理。 请参阅流媒体策略

注意事项

以下注意事项适用于 Cosmos DB 更改源:

  • 变更日志不会公开“删除”事件。

    Cosmos DB 更改源仅包括新文档和更新的文档。 如果您需要了解有关已删除文档的信息,您可以配置数据流为使用软标记将 Cosmos DB 文档标记为已删除。 添加了一个属性到更新事件中,以指示文档是否被删除。 然后,可以在查询中使用 where 运算符来筛选出它们。

    例如,如果将 deleted 属性映射到名为“IsDeleted”的表列,则可以使用以下查询筛选出已删除的文档:

    TestTable
    | where not(IsDeleted)
    
  • 更改提要只展示文档的最新更新。

    若要了解第二个注意事项的影响,请查看以下方案:

    Cosmos DB 容器包含文档 A 和 B。下表显示了对名为 foo 的属性的更改:

    文档编号 属性 foo 事件 文档时间戳记 (_ts)
    A 红色 创建 10
    B 蓝色 创建 20
    A 橙色 更新 30
    A 粉色 更新 40
    B 紫罗兰色 更新 50
    A 胭脂红色 更新 50
    B 霓虹蓝色 更新 70

    数据连接器定期轮询更改源 API,通常每隔几秒钟轮询一次。 每个轮询都包含两次调用之间容器中发生的更改(但每个文档只有最新版的更改)。

    为了说明此问题,请考虑带有时间戳 15、35、55 和 75 的一系列 API 调用,如下表所示:

    API 调用时间戳 文档编号 属性 foo 文档时间戳记 (_ts)
    15 A 红色 10
    35 B 蓝色 20
    35 A 橙色 30
    55 B 紫罗兰色 50
    55 A 胭脂红色 60
    75 B 霓虹蓝色 70

    将 API 结果与 Cosmos DB 文档中所做的更改列表进行比较,你会注意到它们不匹配。 文档A的更新事件(在更改表中时间戳为40的一行进行了突出显示)未显示在API调用结果中。

    若要了解事件为何未出现,请查阅时间戳 35 到 55 之间的 API 调用对文档 A 的更改。 在这两次调用之间,文档 A 更改了两次,如下所示:

    文档编号 属性 foo 事件 文档时间戳记 (_ts)
    A 粉色 更新 40
    A 胭脂红色 更新 50

    在时间戳 55 时进行 API 调用的时候,更改源 API 返回文档的最新版本。 在此情况下,文档A的最新版本是在时间戳50时的更新,即将属性foo粉红更新为胭脂红

    由于这种情况,数据连接器可能会错过一些中间文档更改。 例如,如果数据连接服务关闭几分钟,或者文档更改的频率高于 API 轮询频率,则数据连接器可能会错过一些事件。 但是,每个文档的最新状态均会被捕获。

  • 不支持删除和重新创建 Cosmos DB 容器。

    Azure 数据资源管理器跟踪更改源的方式是检查它在源中的“位置”。 此过程在容器的每个物理分区上使用延续令牌。 删除并重新创建容器时,延续令牌将变为无效且不会重置。 在这种情况下,必须删除并重新创建数据连接。

估算成本

使用 Cosmos DB 数据连接对 Cosmos DB 容器 的请求单位(RU) 使用情况有何影响?

连接器在容器的每个物理分区上调用 Cosmos DB 更改源 API,每秒最多调用一次。 以下成本与这些调用相关:

成本 说明
固定成本 固定成本大约是每个物理分区每秒消耗两个 RU。
可变成本 变量成本大约占用来编写文档的 RU 的 2%,但这一数值可能会根据具体情况有所不同。 例如,如果将 100 个文档写入 Cosmos DB 容器,则写入这些文档的成本为 1,000 RU。 使用连接器读取文档的相应成本约为 2% 写入文档的成本约为 20 RU。