Azure Cosmos DB 的 Azure 流分析输出Azure Stream Analytics output to Azure Cosmos DB

Azure 流分析可以针对 Azure Cosmos DB 进行 JSON 输出,从而支持对非结构化 JSON 数据进行数据存档和低延迟查询。Azure Stream Analytics can target Azure Cosmos DB for JSON output, enabling data archiving and low-latency queries on unstructured JSON data. 本文档包括用于实现此配置的一些最佳做法。This document covers some best practices for implementing this configuration.

如果不熟悉 Azure Cosmos DB,请参阅 Azure Cosmos DB 文档了解入门知识。If you're unfamiliar with Azure Cosmos DB, see the Azure Cosmos DB documentation to get started.

备注

目前,流分析只支持通过 SQL API 连接到 Azure Cosmos DB。At this time, Stream Analytics supports connection to Azure Cosmos DB only through the SQL API. 尚不支持使用其他 Azure Cosmos DB API。Other Azure Cosmos DB APIs are not yet supported. 如果使用其他 API 将流分析指向已创建的 Azure Cosmos DB 帐户,则可能无法正确存储数据。If you point Stream Analytics to Azure Cosmos DB accounts created with other APIs, the data might not be properly stored.

作为输出目标的 Azure Cosmos DB 的基础知识Basics of Azure Cosmos DB as an output target

使用流分析中的 Azure Cosmos DB 输出可以将流处理结果作为 JSON 输出写入到 Azure Cosmos DB 容器中。The Azure Cosmos DB output in Stream Analytics enables writing your stream processing results as JSON output into your Azure Cosmos DB containers.

流分析不会在数据库中创建容器。Stream Analytics doesn't create containers in your database. 而是需要预先创建它们。Instead, it requires you to create them up front. 然后,你可以控制 Azure Cosmos DB 容器的计费成本。You can then control the billing costs of Azure Cosmos DB containers. 还可以使用 Azure Cosmos DB APIs 直接调整容器的性能、一致性和容量。You can also tune the performance, consistency, and capacity of your containers directly by using the Azure Cosmos DB APIs.

备注

必须从 Azure Cosmos DB 防火墙中将 0.0.0.0 添加到允许的 IP 列表。You must add 0.0.0.0 to the list of allowed IPs from your Azure Cosmos DB firewall.

以下部分详细介绍了 Azure Cosmos DB 的一些容器选项。The following sections detail some of the container options for Azure Cosmos DB.

调整一致性、可用性和延迟Tuning consistency, availability, and latency

为了满足应用程序的要求,Azure Cosmos DB 允许你微调数据库和容器,并在一致性、可用性、延迟和吞吐量之间进行权衡。To match your application requirements, Azure Cosmos DB allows you to fine-tune the database and containers and make trade-offs between consistency, availability, latency, and throughput.

根据方案针对读写延迟需要什么级别的读取一致性,可以在数据库帐户上选择一致性级别。Depending on what levels of read consistency your scenario needs against read and write latency, you can choose a consistency level on your database account. 可以通过纵向扩展容器中的请求单位 (RU) 数来提高吞吐量。You can improve throughput by scaling up Request Units (RUs) on the container.

另外,默认情况下,Azure Cosmos DB 会对容器的每个 CRUD 操作启用同步索引。Also by default, Azure Cosmos DB enables synchronous indexing on each CRUD operation to your container. 这是另一个有用选项,可控制 Azure Cosmos DB 中的读/写性能。This is another useful option to control write/read performance in Azure Cosmos DB.

有关详细信息,请参阅更改数据库和查询的一致性级别一文。For more information, review the Change your database and query consistency levels article.

从流分析进行 Upsert 操作Upserts from Stream Analytics

由于流分析与 Azure Cosmos DB 集成,因此可以在容器中基于给定的“文档 ID”列插入或更新记录。Stream Analytics integration with Azure Cosmos DB allows you to insert or update records in your container based on a given Document ID column. 这也称为“upsert”。This is also called an upsert.

流分析使用乐观 Upsert 方法。Stream Analytics uses an optimistic upsert approach. 即仅当由于文档 ID 冲突而插入失败时才进行更新。Updates happen only when an insert fails with a document ID conflict.

在 1.0 兼容级别中,流分析将此更新作为 PATCH 操作执行,因此可以对文档进行部分更新。With compatibility level 1.0, Stream Analytics performs this update as a PATCH operation, so it enables partial updates to the document. 流分析添加新属性或增量替换现有属性。Stream Analytics adds new properties or replaces an existing property incrementally. 但是,JSON 文档中数组属性值的更改会导致覆盖整个数组。However, changes in the values of array properties in your JSON document result in overwriting the entire array. 也就是说,不会合并数组。That is, the array isn't merged.

在 1.2 级别中,更新插入行为已修改为插入或替换文档。With 1.2, upsert behavior is modified to insert or replace the document. 关于兼容性级别 1.2 的部分,后面将进一步描述这种行为。The later section about compatibility level 1.2 further describes this behavior.

如果传入的 JSON 文档具有现有的 ID 字段,则该字段将自动用作 Azure Cosmos DB 中的“文档 ID”列。If the incoming JSON document has an existing ID field, that field is automatically used as the Document ID column in Azure Cosmos DB. 任何后续的写入操作都以这种方式进行处理,这会导致以下情况之一:Any subsequent writes are handled as such, leading to one of these situations:

  • 唯一ID 导致插入。Unique IDs lead to insert.
  • 重复的 ID 和设置为“ID”的“文档 ID”导致更新插入 。Duplicate IDs and Document ID set to ID lead to upsert.
  • 在第一个文档之后,重复的 ID 和未设置的“文档 ID”导致错误。Duplicate IDs and Document ID not set lead to error, after the first document.

如果要保存“所有”文档(包括具有重复 ID 的文档),请重命名查询中的 ID 字段(使用“AS”关键字)。If you want to save all documents, including the ones that have a duplicate ID, rename the ID field in your query (by using the AS keyword). 让 Azure Cosmos DB 创建 ID 字段,或用另一列的值替换 ID(使用“AS”关键字,或者使用“文档 ID”设置) 。Let Azure Cosmos DB create the ID field or replace the ID with another column's value (by using the AS keyword or by using the Document ID setting).

Azure Cosmos DB 中的数据分区Data partitioning in Azure Cosmos DB

Azure Cosmos DB 会根据工作负载自动缩放分区。Azure Cosmos DB automatically scales partitions based on your workload. 因此,建议使用无限容器作为数据分区方式。So we recommend unlimited containers as the approach for partitioning your data. 当流分析写入到无限制的容器时,它会使用先前查询步骤或输入分区方案中一样多的并行写入器。When Stream Analytics writes to unlimited containers, it uses as many parallel writers as the previous query step or input partitioning scheme.

备注

Azure 流分析仅支持顶级分区键的无限制容器。Azure Stream Analytics supports only unlimited containers with partition keys at the top level. 例如,支持 /regionFor example, /region is supported. 不支持嵌套分区键(例如 /region/name)。Nested partition keys (for example, /region/name) are not supported.

你可能会收到以下警告,具体取决于你选择的分区键:Depending on your choice of partition key, you might receive this warning:

CosmosDB Output contains multiple rows and just one row per partition key. If the output latency is higher than expected, consider choosing a partition key that contains at least several hundred records per partition key.

请务必选择包含许多不同值的分区键属性,并跨这些值均匀分配工作负载。It's important to choose a partition key property that has a number of distinct values, and that lets you distribute your workload evenly across these values. 作为分区的自然项目,涉及同一分区键的请求受到单个分区的最大吞吐量的限制。As a natural artifact of partitioning, requests that involve the same partition key are limited by the maximum throughput of a single partition.

属于同一分区键的文档的存储大小上限为 20 GB。The storage size for documents that belong to the same partition key is limited to 20 GB. 理想的分区键可以作为筛选器频繁出现在查询中,并具有足够的基数,以确保解决方案可缩放。An ideal partition key is one that appears frequently as a filter in your queries and has sufficient cardinality to ensure that your solution is scalable.

分区键也是 Cosmos DB 的存储过程和触发器中的事务处理的边界。A partition key is also the boundary for transactions in stored procedures and triggers for Azure Cosmos DB. 选择分区键时,应确保在事务中同时出现的文档使用相同的分区键值。You should choose the partition key so that documents that occur together in transactions share the same partition key value. 如需详细了解如何选择分区键,请参阅 Azure Cosmos DB 中的分区一文。The article Partitioning in Azure Cosmos DB gives more details on choosing a partition key.

对于固定的 Azure Cosmos DB 容器,在这些容器已满后,流分析不允许通过任何方式进行纵向或横向扩展。For fixed Azure Cosmos DB containers, Stream Analytics allows no way to scale up or out after they're full. 这些集合的大小上限为 10 GB,吞吐量上限为 10,000 RU/秒。They have an upper limit of 10 GB and 10,000 RU/s of throughput. 若要将数据从固定的容器迁移到无限制容器(例如,吞吐量至少为 1,000 RU/秒,且具有分区键),请使用数据迁移工具更改源库To migrate the data from a fixed container to an unlimited container (for example, one with at least 1,000 RU/s and a partition key), use the data migration tool or the change feed library.

即将弃用写入多个固定容器的功能。The ability to write to multiple fixed containers is being deprecated. 不建议将其用于横向扩展流分析作业。We don't recommend it for scaling out your Stream Analytics job.

使用兼容性级别 1.2 改进了吞吐量Improved throughput with compatibility level 1.2

借助兼容性级别 1.2,流分析支持使用本机集成来批量写入到 Azure Cosmos DB。With compatibility level 1.2, Stream Analytics supports native integration to bulk write into Azure Cosmos DB. 该集成可以有效地对 Azure Cosmos DB 进行写入,同时可以最大程度地提高吞吐量和有效处理限制请求。This integration enables writing effectively to Azure Cosmos DB while maximizing throughput and efficiently handling throttling requests.

新兼容性级别改变了更新插入行为,提供一种改进的写入机制。The improved writing mechanism is available under a new compatibility level because of a difference in upsert behavior. 在 1.2 之前的级别,更新插入行为是插入或合并文档。With levels before 1.2, the upsert behavior is to insert or merge the document. 在 1.2 级别中,更新插入行为已修改为插入或替换文档。With 1.2, upsert behavior is modified to insert or replace the document.

在 1.2 之前的级别,流分析需使用自定义的存储过程将文档按分区键批量更新插入到 Azure Cosmos DB 中。With levels before 1.2, Stream Analytics uses a custom stored procedure to bulk upsert documents per partition key into Azure Cosmos DB. 其中的批作为事务写入。There, a batch is written as a transaction. 即使只有一条记录遇到暂时性错误(限制),也必须重试整个批。Even when a single record hits a transient error (throttling), the whole batch has to be retried. 因此,即使受到合理限制的方案的运行速度也会变得相对缓慢。This makes scenarios with even reasonable throttling relatively slow.

以下示例显示了从同一 Azure 事件中心输入读取数据的两个相同流分析作业。The following example shows two identical Stream Analytics jobs reading from the same Azure Event Hubs input. 这两个流分析作业已使用直通查询完全分区,并写入到相同的 Azure Cosmos DB 容器。Both Stream Analytics jobs are fully partitioned with a passthrough query and write to identical Azure Cosmos DB containers. 左侧的指标来自配置了兼容性级别 1.0 的作业。Metrics on the left are from the job configured with compatibility level 1.0. 右侧的指标来自配置了版本 1.2 的作业。Metrics on the right are configured with 1.2. Azure Cosmos DB 容器的分区键是来自输入事件的唯一 GUID。An Azure Cosmos DB container's partition key is a unique GUID that comes from the input event.

流分析指标比较

事件中心的传入事件速率是配置引入的 Azure Cosmos DB 容器 (20K RU) 的 2 倍,因此,预期会在 Azure Cosmos DB 中进行限制。The incoming event rate in Event Hubs is two times higher than Azure Cosmos DB containers (20,000 RUs) are configured to take in, so throttling is expected in Azure Cosmos DB. 但是,使用版本 1.2 的作业一贯以更高的吞吐量(输出事件数/分钟)写入,并且其平均 SU% 利用率更低。However, the job with 1.2 is consistently writing at a higher throughput (output events per minute) and with a lower average SU% utilization. 在你的环境中,这种差异将取决于多个因素。In your environment, this difference will depend on few more factors. 这些因素包括:事件格式的选择、输入事件/消息大小、分区键和查询。These factors include choice of event format, input event/message size, partition keys, and query.

Azure Cosmos DB 指标比较

使用版本 1.2 时,流分析可以更智能地利用 Azure Cosmos DB 中 100% 的可用吞吐量,并且在发生节流限制或速率限制的情况下,只需重新提交极少的次数。With 1.2, Stream Analytics is more intelligent in utilizing 100 percent of the available throughput in Azure Cosmos DB with very few resubmissions from throttling or rate limiting. 对于其他工作负载(例如,同时在容器上运行的查询),这可以提供更好的体验。This provides a better experience for other workloads like queries running on the container at the same time. 如需了解 Azure Cosmos DB 作为接收器(每秒接收 1000 到 10000 条消息)如何横向扩展流分析,请尝试此 Azure 示例项目If you want to see how Stream Analytics scales out with Azure Cosmos DB as a sink for 1,000 to 10,000 messages per second, try this Azure sample project.

1.0 和 1.1 级别的 Azure Cosmos DB 输出具有相同的吞吐量。Throughput of Azure Cosmos DB output is identical with 1.0 and 1.1. 强烈建议对 Azure Cosmos DB 的流分析使用兼容性级别 1.2。We strongly recommend that you use compatibility level 1.2 in Stream Analytics with Azure Cosmos DB.

JSON 输出的 Azure Cosmos DB 设置Azure Cosmos DB settings for JSON output

使用 Azure Cosmos DB 作为流分析中的输出时,会生成以下信息提示。Using Azure Cosmos DB as an output in Stream Analytics generates the following prompt for information.

Azure Cosmos DB 输出流的信息字段

字段Field 说明Description
输出别名Output alias 用于在流分析查询中引用此输出的别名。An alias to refer to this output in your Stream Analytics query.
订阅Subscription Azure 订阅。The Azure subscription.
帐户 IDAccount ID Azure Cosmos DB 帐户的名称或终结点 URI。The name or endpoint URI of the Azure Cosmos DB account.
帐户密钥Account key Azure Cosmos DB 帐户的共享访问密钥。The shared access key for the Azure Cosmos DB account.
数据库Database Azure Cosmos DB 数据库名称。The Azure Cosmos DB database name.
容器名称Container name 容器名称,如 MyContainerThe container name, such as MyContainer. 必须存在名为 MyContainer 的容器。One container named MyContainer must exist.
文档 IDDocument ID 可选。Optional. 输出事件中用作唯一键的列名称,插入或更新操作必须基于该键。The column name in output events used as the unique key on which insert or update operations must be based. 如果将其保留为空,则插入所有事件,但不使用更新选项。If you leave it empty, all events will be inserted, with no update option.

配置 Azure Cosmos DB 输出后,可以在查询中将其用作 INTO 语句的目标。After you configure the Azure Cosmos DB output, you can use it in the query as the target of an INTO statement. 当使用这种方式进行 Azure Cosmos DB 输出时,需要显式设置分区键When you're using an Azure Cosmos DB output that way, a partition key needs to be set explicitly.

输出记录必须包含一个区分大小写的列,该列以 Azure Cosmos DB 中的分区键命名。The output record must contain a case-sensitive column named after the partition key in Azure Cosmos DB. 若要实现更大的并行化,该语句可能需要使用同一列的 PARTITION BY 子句To achieve greater parallelization, the statement might require a PARTITION BY clause that uses the same column.

下面是一个示例查询:Here's a sample query:

    SELECT TollBoothId, PartitionId
    INTO CosmosDBOutput
    FROM Input1 PARTITION BY PartitionId

错误处理和重试Error handling and retries

如果流分析将事件发送到 Azure Cosmos DB 时出现了暂时性故障、服务不可用或受到限制,流分析将无限期重试以成功完成操作。If a transient failure, service unavailability, or throttling happens while Stream Analytics is sending events to Azure Cosmos DB, Stream Analytics retries indefinitely to finish the operation successfully. 但它不会尝试对以下失败进行重试:But it doesn't attempt retries for the following failures:

  • Unauthorized(HTTP 错误代码 401)Unauthorized (HTTP error code 401)
  • NotFound(HTTP 错误代码 404)NotFound (HTTP error code 404)
  • Forbidden(HTTP 错误代码 403)Forbidden (HTTP error code 403)
  • BadRequest(HTTP 错误代码 400)BadRequest (HTTP error code 400)