将数据从 Azure Cosmos DB 引入到 Azure 数据资源管理器
Azure 数据资源管理器支持使用更改源从 Azure Cosmos DB for NoSql引入数据。 Cosmos DB 更改源数据连接是一个引入管道,用于侦听 Cosmos DB 更改源并将数据引入到数据资源管理器表中。 更改源侦听新文档和更新的文档,但不记录删除。 有关 Azure 数据资源管理器中数据引入的常规信息,请参阅 Azure 数据资源管理器数据引入概述。
每个数据连接都侦听特定的 Cosmos DB 容器,并将数据引入到指定的表中(多个连接可以在一个表中引入数据)。 该引入方法支持流式引入(如果已启用)和排队引入。
本文介绍如何设置 Cosmos DB 更改源数据连接,以便使用系统托管标识将数据引入到 Azure 数据资源管理器中。 在开始之前,请查看注意事项。
使用以下步骤设置连接器:
步骤 1:选择 Azure 数据资源管理器表并配置其表映射
步骤 2:创建 Cosmos DB 数据连接
步骤 3:测试数据连接
先决条件
在创建数据连接之前,请创建一个表,你将在其中存储引入的数据并应用与源 Cosmos DB 容器中的架构匹配的映射。 如果你的方案需要的不仅仅是简单的字段映射,则可以使用更新策略来转换和映射从更改源中引入的数据。
下面显示了 Cosmos DB 容器中某个项的示例架构:
{
"id": "17313a67-362b-494f-b948-e2a8e95e237e",
"name": "Cousteau",
"_rid": "pL0MAJ0Plo0CAAAAAAAAAA==",
"_self": "dbs/pL0MAA==/colls/pL0MAJ0Plo0=/docs/pL0MAJ0Plo0CAAAAAAAAAA==/",
"_etag": "\"000037fc-0000-0700-0000-626a44110000\"",
"_attachments": "attachments/",
"_ts": 1651131409
}
使用以下步骤创建表并应用表映射:
在 Azure 数据资源管理器 Web UI 的左侧导航菜单中选择“查询”,然后选择要在其中创建表的数据库。
运行以下命令来创建一个名为 TestTable 的表。
.create table TestTable(Id:string, Name:string, _ts:long, _timestamp:datetime)
运行以下命令来创建表映射。
该命令将 Cosmos DB JSON 文档中的自定义属性映射到 TestTable 表中的列,如下所示:
Cosmos DB 属性 |
表列 |
转换 |
id |
ID |
无 |
name |
名称 |
无 |
_ts |
_ts |
无 |
_ts |
_timestamp |
使用 DateTimeFromUnixSeconds 可将 _ts(UNIX 秒)转换为 _timestamp (datetime ) |
注意
建议使用以下时间戳列:
- _ts:使用此列将数据与 Cosmos DB 协调。
- _timestamp:使用此列在 Kusto 查询中运行高效的时间筛选器。 有关详细信息,请参阅查询最佳做法。
.create table TestTable ingestion json mapping "DocumentMapping"
```
[
{"column":"Id","path":"$.id"},
{"column":"Name","path":"$.name"},
{"column":"_ts","path":"$._ts"},
{"column":"_timestamp","path":"$._ts", "transform":"DateTimeFromUnixSeconds"}
]
```
如果你的方案需要的不仅仅是简单的字段映射,则可以使用更新策略来转换和映射从更改源中引入的数据。
更新策略是在数据引入到表时转换数据的一种方式。 它们以 Kusto 查询语言编写,在引入管道上运行。 它们可用于转换从 Cosmos DB 更改源中引入的数据,例如在以下场景中:
- 你的文档包含数组,如果使用
mv-expand
运算符将它们转换为多行,则这些数组将更易于查询。
- 你想要筛选出文档。 例如,可以使用
where
运算符按类型筛选出文档。
- 你的复杂逻辑无法在表映射中表示。
有关如何创建和管理更新策略的信息,请参阅更新策略概述。
步骤 2:创建 Cosmos DB 数据连接
可使用以下方法创建数据连接器:
在Azure 门户中,转到群集概述页,然后选择“入门”选项卡。
在“数据引入”磁贴上,选择“创建数据连接”>“Cosmos DB”。
在 Cosmos DB 的“创建数据连接”窗格中,使用下表中的信息填写表单:
字段 |
说明 |
数据库名称 |
选择要将数据引入到的 Azure 数据资源管理器数据库。 |
数据连接名称 |
指定数据连接的名称。 |
订阅 |
选择包含 Cosmos DB NoSQL 帐户的订阅。 |
Cosmos DB 帐户 |
选择要从中引入数据的 Cosmos DB 帐户。 |
SQL 数据库 |
选择要从中引入数据的 Cosmos DB 数据库。 |
SQL 容器 |
选择要从中引入数据的 Cosmos DB 容器。 |
表名称 |
指定要将数据引入到的 Azure 数据资源管理器表名称。 |
映射名称 |
(可选)指定要用于数据连接的映射名称。 |
(可选)在“高级设置”部分下执行以下操作:
指定“事件检索开始日期”。 这是连接器开始引入数据的时间。 如果未指定时间,连接器将从你创建数据连接的时间开始引入数据。 建议的日期格式是 ISO 8601 UTC 标准,按以下方式指定:yyyy-MM-ddTHH:mm:ss.fffffffZ
。
选择“用户分配”,然后选择标识。 默认情况下,连接使用系统分配的托管标识。 如有必要,可以使用用户分配的标识。
选择“创建”以创建数据连接。
使用以下示例 ARM 模板作为创建自己的数据连接模板的基础,然后在 Azure 门户中部署它。
若要配置 Cosmos DB 连接,请执行以下操作:
配置系统托管标识以用于 Cosmos DB 连接身份验证。
- 在 Azure 数据资源管理器 Web UI 的左侧导航菜单中选择“查询”,然后选择用于数据连接的群集或数据库。
授予数据连接访问 Cosmos DB 帐户的权限。 向数据连接提供对 Cosmos DB 的访问权限将使其能够访问并检索数据库中的数据。 你将需要群集的主体 ID,该 ID 可在 Azure 门户中找到。 有关详细信息,请参阅为群集配置托管标识。
使用以下选项之一授予对 Cosmos DB 帐户的访问权限:
使用 Azure CLI 授予访问权限:运行以下 CLI 命令(请使用下表中的信息将占位符替换为适当的值):
az cosmosdb sql role assignment create --account-name <CosmosDbAccountName> --resource-group <CosmosDbResourceGroup> --role-definition-id 00000000-0000-0000-0000-000000000001 --principal-id <ClusterPrincipalId> --scope "/"
az role assignment create --role fbdf93bf-df7d-467e-a4d2-9458aa1360c8 --assignee <ClusterPrincipalId> --scope <CosmosDBAccountResourceId>
占位符 |
说明 |
<CosmosDBAccountName> |
Cosmos DB 帐户的名称。 |
<CosmosDBResourceGroup> |
包含 Cosmos DB 帐户的资源组的名称。 |
<CosmosDBAccountResourceId> |
Cosmos DB 帐户的 Azure 资源 ID(以 subscriptions/ 开头)。 |
<ClusterPrincipalId> |
分配给群集的托管标识的主体 ID。 可在 Azure 门户中找到群集的主体 ID。 有关详细信息,请参阅为群集配置托管标识。 |
使用 ARM 模板授予访问权限:在 Cosmos DB 帐户资源组中部署以下模板:
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"clusterPrincipalId": {
"type": "string",
"metadata": { "description": "The principle ID of your cluster." }
},
"cosmosDbAccount": {
"type": "string",
"metadata": { "description": "The name of your Cosmos DB account." }
},
"cosmosDbAccountResourceId": {
"type": "string",
"metadata": { "description": "The resource ID of your Cosmos DB account." }
}
},
"variables": {
"cosmosDataReader": "00000000-0000-0000-0000-000000000001",
"dataRoleDefinitionId": "[format('/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.DocumentDB/databaseAccounts/{2}/sqlRoleDefinitions/{3}', subscription().subscriptionId, resourceGroup().name, parameters('cosmosDbAccount'), variables('cosmosDataReader'))]",
"roleAssignmentId": "[guid(parameters('cosmosDbAccountResourceId'), parameters('clusterPrincipalId'))]",
"rbacRoleDefinitionId": "[format('/subscriptions/{0}/providers/Microsoft.Authorization/roleDefinitions/{1}', subscription().subscriptionId, 'fbdf93bf-df7d-467e-a4d2-9458aa1360c8')]"
},
"resources": [
{
"type": "Microsoft.DocumentDB/databaseAccounts/sqlRoleAssignments",
"apiVersion": "2022-08-15",
"name": "[concat(parameters('cosmosDbAccount'), '/', guid(parameters('clusterPrincipalId'), parameters('cosmosDbAccount')))]",
"properties": {
"principalId": "[parameters('clusterPrincipalId')]",
"roleDefinitionId": "[variables('dataRoleDefinitionId')]",
"scope": "[resourceId('Microsoft.DocumentDB/databaseAccounts', parameters('cosmosDbAccount'))]"
}
},
{
"type": "Microsoft.Authorization/roleAssignments",
"apiVersion": "2022-04-01",
"name": "[variables('roleAssignmentId')]",
"scope": "[format('Microsoft.DocumentDb/databaseAccounts/{0}', parameters('cosmosDbAccount'))]",
"properties": {
"description": "Giving RBAC reader on Cosmos DB",
"principalId": "[parameters('clusterPrincipalId')]",
"principalType": "ServicePrincipal",
"roleDefinitionId": "[variables('rbacRoleDefinitionId')]"
}
}
]
}
部署以下 ARM 模板以创建 Cosmos DB 数据连接。 将占位符替换为适当的值。
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"kustoClusterName": {
"type": "string",
"metadata": { "description": "Kusto Cluster name" }
},
"kustoDbName": {
"type": "string",
"metadata": { "description": "Kusto Database name" }
},
"kustoConnectionName": {
"type": "string",
"metadata": { "description": "Kusto Database connection name" }
},
"kustoLocation": {
"type": "string",
"metadata": { "description": "Location (Azure Region) of the Kusto cluster" }
},
"kustoTable": {
"type": "string",
"metadata": { "description": "Kusto Table name where to ingest data" }
},
"kustoMappingRuleName": {
"type": "string",
"defaultValue": "",
"metadata": { "description": "Mapping name of the Kusto Table (if omitted, default mapping is applied)" }
},
"managedIdentityResourceId": {
"type": "string",
"metadata": { "description": "ARM resource ID of the managed identity (cluster resource ID for system or user identity)" }
},
"cosmosDbAccountResourceId": {
"type": "string",
"metadata": { "description": "ARM resource ID of Cosoms DB account" }
},
"cosmosDbDatabase": {
"type": "string",
"metadata": { "description": "Cosmos DB Database name" }
},
"cosmosDbContainer": {
"type": "string",
"metadata": { "description": "Cosmos DB container name" }
},
"retrievalStartDate": {
"type": "string",
"defaultValue": "",
"metadata": { "description": "Date-time at which to start the data retrieval; default: 'now' if not provided. Recommended format: yyyy-MM-ddTHH:mm:ss.fffffffZ" }
}
},
"variables": { },
"resources": [{
"type": "Microsoft.Kusto/Clusters/Databases/DataConnections",
"apiVersion": "2022-11-11",
"name": "[concat(parameters('kustoClusterName'), '/', parameters('kustoDbName'), '/', parameters('kustoConnectionName'))]",
"location": "[parameters('kustoLocation')]",
"kind": "CosmosDb",
"properties": {
"tableName": "[parameters('kustoTable')]",
"mappingRuleName": "[parameters('kustoMappingRuleName')]",
"managedIdentityResourceId": "[parameters('managedIdentityResourceId')]",
"cosmosDbAccountResourceId": "[parameters('cosmosDbAccountResourceId')]",
"cosmosDbDatabase": "[parameters('cosmosDbDatabase')]",
"cosmosDbContainer": "[parameters('cosmosDbContainer')]",
"retrievalStartDate": "[parameters('retrievalStartDate')]"
}
}]
}
步骤 3:测试数据连接
在 Cosmos DB 容器中,插入以下文档:
{
"name":"Cousteau"
}
在 Azure 数据资源管理器 Web UI 中,运行以下查询:
TestTable
结果集应如下图所示:
注意
Azure 数据资源管理器具有用于排队数据引入的聚合(批处理)策略,旨在优化引入过程。 默认批处理策略配置为在批满足以下条件之一时封装该批:最大延迟时间为 5 分钟、总大小为 1 GB 或 1000 个 blob。 因此,你可能会遇到延迟。 有关详细信息,请参阅批处理策略。 若要降低延迟,请将表配置为支持流式处理。 请参阅流式处理策略。
注意事项
以下注意事项适用于 Cosmos DB 更改源:
更改源不会公开“删除”事件。
Cosmos DB 更改源仅包括新文档和更新的文档。 如果需要了解有关已删除的文档的信息,可以配置源以使用软标记将 Cosmos DB 文档标记为已删除。 这会添加一个属性来更新用于指示文档是否已被删除的事件。 然后,可以在查询中使用 where
运算符来筛选出它们。
例如,如果将 deleted 属性映射到名为“IsDeleted”的表列,则可以使用以下查询筛选出已删除的文档:
TestTable
| where not(IsDeleted)
更改源仅公开文档的最新更新。
若要了解第二个注意事项的影响,请查看以下方案:
Cosmos DB 容器包含文档 A 和 B。下表显示了对名为 foo 的属性的更改:
文档 ID |
属性 foo |
事件 |
文档时间戳 (_ts) |
A |
红色 |
创建 |
10 |
B |
蓝色 |
创建 |
20 |
A |
橙色 |
更新 |
30 |
A |
粉色 |
更新 |
40 |
B |
紫罗兰色 |
更新 |
50 |
A |
胭脂红色 |
更新 |
50 |
B |
霓虹蓝色 |
更新 |
70 |
数据连接器定期轮询(通常每隔几秒钟轮询一次)更改源 API。 每个轮询都包含两次调用之间容器中发生的更改(但每个文档只有最新版的更改)。
为了说明此问题,请考虑带有时间戳 15、35、55 和 75 的一系列 API 调用,如下表所示:
API 调用时间戳 |
文档 ID |
属性 foo |
文档时间戳 (_ts) |
15 |
A |
红色 |
10 |
35 |
B |
蓝色 |
20 |
35 |
A |
橙色 |
30 |
55 |
B |
紫罗兰色 |
50 |
55 |
A |
胭脂红色 |
60 |
75 |
B |
霓虹蓝色 |
70 |
将 API 结果与 Cosmos DB 文档中所做的一系列更改进行比较时,你会发现它们不匹配。 文档 A 的更新事件(已在更改表中的时间戳 40 所在一行突出显示)未显示在 API 调用结果中。
为了了解未显示该事件的原因,我们将检查 API 调用在时间戳 35 和 55 之间对文档 A 所做的更改。 在这两次调用之间,文档 A 更改了两次,如下所示:
文档 ID |
属性 foo |
事件 |
文档时间戳 (_ts) |
A |
粉色 |
更新 |
40 |
A |
胭脂红色 |
更新 |
50 |
在时间戳 55 时进行 API 调用的时候,更改源 API 返回文档的最新版本。 在本例中,最新版本的文档 A 是在时间戳 50 时的更新,即属性 foo 从“粉红色”到“胭脂红色”的更新。
由于这种情况,数据连接器可能会错过一些中间文档更改。 例如,如果数据连接服务关闭几分钟,或者文档更改频率高于 API 轮询频率,则可能会错过某些事件。 但是,每个文档的最新状态均会被捕获。
不支持删除并重新创建 Cosmos DB 容器
Azure 数据资源管理器跟踪更改源的方式是检查它在源中的“位置”。 这是通过在容器的每个物理分区上使用延续令牌来完成的。 删除/重新创建容器时,这些延续令牌无效且不会重置:你必须删除并重新创建数据连接。
估算成本
使用 Cosmos DB 数据连接对 Cosmos DB 容器的请求单位 (RU) 使用量影响有多大?
连接器在容器的每个物理分区上调用 Cosmos DB 更改源 API,每秒最多调用一次。 以下成本与这些调用相关:
成本 |
说明 |
固定成本 |
固定成本大约为每秒每个物理分区 2 RU。 |
可变成本 |
可变成本大约为用于写入文档的 RU 的 2%,不过,具体比例因场景而异。 例如,如果将 100 个文档写入 Cosmos DB 容器,则写入这些文档的成本为 1,000 RU。 使用连接器读取这些文档的相应成本大约为写入文档的成本的 2%,即大约为 20 RU。 |
相关内容