将数据从 Azure Cosmos DB 引入到 Azure 数据资源管理器

Azure 数据资源管理器支持使用更改源Azure Cosmos DB for NoSql引入数据。 Cosmos DB 更改源数据连接是一个引入管道,用于侦听 Cosmos DB 更改源并将数据引入到数据资源管理器表中。 更改源侦听新文档和更新的文档,但不记录删除。 有关 Azure 数据资源管理器中数据引入的常规信息,请参阅 Azure 数据资源管理器数据引入概述

每个数据连接都侦听特定的 Cosmos DB 容器,并将数据引入到指定的表中(多个连接可以在一个表中引入数据)。 该引入方法支持流式引入(如果已启用)和排队引入。

本文介绍如何设置 Cosmos DB 更改源数据连接,以便使用系统托管标识将数据引入到 Azure 数据资源管理器中。 在开始之前,请查看注意事项

使用以下步骤设置连接器:

步骤 1:选择 Azure 数据资源管理器表并配置其表映射

步骤 2:创建 Cosmos DB 数据连接

步骤 3:测试数据连接

先决条件

步骤 1:选择 Azure 数据资源管理器表并配置其表映射

在创建数据连接之前,请创建一个表,你将在其中存储引入的数据并应用与源 Cosmos DB 容器中的架构匹配的映射。 如果你的方案需要的不仅仅是简单的字段映射,则可以使用更新策略来转换和映射从更改源中引入的数据

下面显示了 Cosmos DB 容器中某个项的示例架构:

{
    "id": "17313a67-362b-494f-b948-e2a8e95e237e",
    "name": "Cousteau",
    "_rid": "pL0MAJ0Plo0CAAAAAAAAAA==",
    "_self": "dbs/pL0MAA==/colls/pL0MAJ0Plo0=/docs/pL0MAJ0Plo0CAAAAAAAAAA==/",
    "_etag": "\"000037fc-0000-0700-0000-626a44110000\"",
    "_attachments": "attachments/",
    "_ts": 1651131409
}

使用以下步骤创建表并应用表映射:

  1. 在 Azure 数据资源管理器 Web UI 的左侧导航菜单中选择“查询”,然后选择要在其中创建表的数据库。

  2. 运行以下命令来创建一个名为 TestTable 的表。

    .create table TestTable(Id:string, Name:string, _ts:long, _timestamp:datetime)
    
  3. 运行以下命令来创建表映射。

    该命令将 Cosmos DB JSON 文档中的自定义属性映射到 TestTable 表中的列,如下所示:

    Cosmos DB 属性 表列 转换
    id ID
    name 名称
    _ts _ts
    _ts _timestamp 使用 DateTimeFromUnixSeconds 可将 _ts(UNIX 秒转换为 _timestamp (datetime)

    注意

    建议使用以下时间戳列:

    • _ts:使用此列将数据与 Cosmos DB 协调。
    • _timestamp:使用此列在 Kusto 查询中运行高效的时间筛选器。 有关详细信息,请参阅查询最佳做法
    .create table TestTable ingestion json mapping "DocumentMapping"
    ```
    [
        {"column":"Id","path":"$.id"},
        {"column":"Name","path":"$.name"},
        {"column":"_ts","path":"$._ts"},
        {"column":"_timestamp","path":"$._ts", "transform":"DateTimeFromUnixSeconds"}
    ]
    ```
    

使用更新策略转换和映射数据

如果你的方案需要的不仅仅是简单的字段映射,则可以使用更新策略来转换和映射从更改源中引入的数据。

更新策略是在数据引入到表时转换数据的一种方式。 它们以 Kusto 查询语言编写,在引入管道上运行。 它们可用于转换从 Cosmos DB 更改源中引入的数据,例如在以下场景中:

  • 你的文档包含数组,如果使用 mv-expand 运算符将它们转换为多行,则这些数组将更易于查询。
  • 你想要筛选出文档。 例如,可以使用 where 运算符按类型筛选出文档。
  • 你的复杂逻辑无法在表映射中表示。

有关如何创建和管理更新策略的信息,请参阅更新策略概述

步骤 2:创建 Cosmos DB 数据连接

可使用以下方法创建数据连接器:

  1. 在Azure 门户中,转到群集概述页,然后选择“入门”选项卡。

  2. 在“数据引入”磁贴上,选择“创建数据连接”>“Cosmos DB”。

    Screenshot of the Getting started tab, showing the Create Cosmos DB data connection option.

  3. 在 Cosmos DB 的“创建数据连接”窗格中,使用下表中的信息填写表单:

    Screenshot of the data connection pane, showing the form fields with values.

    字段 说明
    数据库名称 选择要将数据引入到的 Azure 数据资源管理器数据库。
    数据连接名称 指定数据连接的名称。
    订阅 选择包含 Cosmos DB NoSQL 帐户的订阅。
    Cosmos DB 帐户 选择要从中引入数据的 Cosmos DB 帐户。
    SQL 数据库 选择要从中引入数据的 Cosmos DB 数据库。
    SQL 容器 选择要从中引入数据的 Cosmos DB 容器。
    表名称 指定要将数据引入到的 Azure 数据资源管理器表名称
    映射名称 (可选)指定要用于数据连接的映射名称
  4. (可选)在“高级设置”部分下执行以下操作:

    1. 指定“事件检索开始日期”。 这是连接器开始引入数据的时间。 如果未指定时间,连接器将从你创建数据连接的时间开始引入数据。 建议的日期格式是 ISO 8601 UTC 标准,按以下方式指定:yyyy-MM-ddTHH:mm:ss.fffffffZ

    2. 选择“用户分配”,然后选择标识。 默认情况下,连接使用系统分配的托管标识。 如有必要,可以使用用户分配的标识。

      Screenshot of the data connection pane, showing the Advance settings.

  5. 选择“创建”以创建数据连接。

步骤 3:测试数据连接

  1. 在 Cosmos DB 容器中,插入以下文档:

    {
        "name":"Cousteau"
    }
    
  2. 在 Azure 数据资源管理器 Web UI 中,运行以下查询:

    TestTable
    

    结果集应如下图所示:

    Screenshot of the results pane, showing the ingested document.

注意

Azure 数据资源管理器具有用于排队数据引入的聚合(批处理)策略,旨在优化引入过程。 默认批处理策略配置为在批满足以下条件之一时封装该批:最大延迟时间为 5 分钟、总大小为 1 GB 或 1000 个 blob。 因此,你可能会遇到延迟。 有关详细信息,请参阅批处理策略。 若要降低延迟,请将表配置为支持流式处理。 请参阅流式处理策略

注意事项

以下注意事项适用于 Cosmos DB 更改源:

  • 更改源不会公开“删除”事件。

    Cosmos DB 更改源仅包括新文档和更新的文档。 如果需要了解有关已删除的文档的信息,可以配置源以使用软标记将 Cosmos DB 文档标记为已删除。 这会添加一个属性来更新用于指示文档是否已被删除的事件。 然后,可以在查询中使用 where 运算符来筛选出它们。

    例如,如果将 deleted 属性映射到名为“IsDeleted”的表列,则可以使用以下查询筛选出已删除的文档:

    TestTable
    | where not(IsDeleted)
    
  • 更改源仅公开文档的最新更新。

    若要了解第二个注意事项的影响,请查看以下方案:

    Cosmos DB 容器包含文档 A 和 B。下表显示了对名为 foo 的属性的更改:

    文档 ID 属性 foo 事件 文档时间戳 (_ts)
    A 红色 创建 10
    B 蓝色 创建 20
    A 橙色 更新 30
    A 粉色 更新 40
    B 紫罗兰色 更新 50
    A 胭脂红色 更新 50
    B 霓虹蓝色 更新 70

    数据连接器定期轮询(通常每隔几秒钟轮询一次)更改源 API。 每个轮询都包含两次调用之间容器中发生的更改(但每个文档只有最新版的更改)。

    为了说明此问题,请考虑带有时间戳 15、35、55 和 75 的一系列 API 调用,如下表所示:

    API 调用时间戳 文档 ID 属性 foo 文档时间戳 (_ts)
    15 A 红色 10
    35 B 蓝色 20
    35 A 橙色 30
    55 B 紫罗兰色 50
    55 A 胭脂红色 60
    75 B 霓虹蓝色 70

    将 API 结果与 Cosmos DB 文档中所做的一系列更改进行比较时,你会发现它们不匹配。 文档 A 的更新事件(已在更改表中的时间戳 40 所在一行突出显示)未显示在 API 调用结果中。

    为了了解未显示该事件的原因,我们将检查 API 调用在时间戳 35 和 55 之间对文档 A 所做的更改。 在这两次调用之间,文档 A 更改了两次,如下所示:

    文档 ID 属性 foo 事件 文档时间戳 (_ts)
    A 粉色 更新 40
    A 胭脂红色 更新 50

    在时间戳 55 时进行 API 调用的时候,更改源 API 返回文档的最新版本。 在本例中,最新版本的文档 A 是在时间戳 50 时的更新,即属性 foo 从“粉红色”到“胭脂红色”的更新。

    由于这种情况,数据连接器可能会错过一些中间文档更改。 例如,如果数据连接服务关闭几分钟,或者文档更改频率高于 API 轮询频率,则可能会错过某些事件。 但是,每个文档的最新状态均会被捕获。

  • 不支持删除并重新创建 Cosmos DB 容器

    Azure 数据资源管理器跟踪更改源的方式是检查它在源中的“位置”。 这是通过在容器的每个物理分区上使用延续令牌来完成的。 删除/重新创建容器时,这些延续令牌无效且不会重置:你必须删除并重新创建数据连接。

估算成本

使用 Cosmos DB 数据连接对 Cosmos DB 容器的请求单位 (RU) 使用量影响有多大?

连接器在容器的每个物理分区上调用 Cosmos DB 更改源 API,每秒最多调用一次。 以下成本与这些调用相关:

成本 说明
固定成本 固定成本大约为每秒每个物理分区 2 RU。
可变成本 可变成本大约为用于写入文档的 RU 的 2%,不过,具体比例因场景而异。 例如,如果将 100 个文档写入 Cosmos DB 容器,则写入这些文档的成本为 1,000 RU。 使用连接器读取这些文档的相应成本大约为写入文档的成本的 2%,即大约为 20 RU。