使用 Azure Cosmos DB 中的更改源支持

Azure Cosmos DB 是快速灵活的全球复制数据库服务,用于存储大量事务与操作数据,读取和写入时的延迟为个位数的毫秒且可预测。 它非常适合用于 IoT、游戏、零售和操作日志记录应用程序。 这些应用程序中的一种常见设计模式是跟踪对 Azure Cosmos DB 数据所做的更改、更新具体化视图、执行实时分析、将数据存档到冷存储,以及在发生特定事件时根据这些更改触发通知。 使用 Azure Cosmos DB 中的更改源支持,可以针对其中的每种模式构建高效、可缩放的解决方案。

借助更改源支持,Azure Cosmos DB 在 Azure Cosmos DB 集合中按文档修改顺序提供有序的文档列表。 此源可用于侦听对集合中数据所做的修改,以及执行如下操作:

  • 插入或修改文档时触发 API 调用
  • 针对更新执行实时(流)处理
  • 将数据与缓存、搜索引擎或数据仓库同步

Azure Cosmos DB 中发生的更改将持久保存,并能够以异步方式进行处理,以及分发到一个或多个使用者供并行处理。 让我们了解更改源的 API,以及如何使用它们来构建可缩放的实时应用程序。 本文介绍了如何通过 Azure Cosmos DB DocumentDB API 处理空间数据。

使用 Azure Cosmos DB 更改源促成实时分析和事件驱动的计算方案

用例和方案

使用更改源可对具有大量写入操作的大型数据集进行有效处理,这样就不需要查询整个数据集来识别发生了哪些更改。 例如,可以有效地执行以下任务:

  • 使用 Azure Cosmos DB 中存储的数据更新缓存、搜索索引或数据仓库。
  • 实现应用程序级别的数据分层和存档,即,将“热数据”存储在 Azure Cosmos DB 中,将“冷数据”搁置在 Azure Blob 存储中。
  • 使用 Apache Hadoop 实现数据批量分析。
  • 使用 Azure Cosmos DB 在 Azure 上实现 lambda 管道。 Azure Cosmos DB 提供了一种可缩放的数据库解决方案,该解决方案可处理引入和查询,实现 TCO 较低的 lambda 体系结构。
  • 在不造成任何停机的情况下迁移到使用不同分区方案的另一个 Azure Cosmos DB 帐户。

使用 Azure Cosmos DB 构建用于引入和查询的 Lambda 管道:

用于引入和查询的基于 Azure Cosmos DB 的 lambda 管道

可以使用 Azure Cosmos DB 接收和存储来自设备、传感器、基础结构和应用程序的事件数据,然后使用 Apache StormApache Spark 实时处理这些事件。

在 Web 应用和移动应用中,可以跟踪各种事件(例如,对客户配置文件、首选项或位置的更改),以触发特定的操作,例如,使用应用服务向客户的设备发送推送通知。 例如,若要使用 Azure Cosmos DB 来构建游戏,可以使用更改源,根据已完成的游戏的分数实时更新排行榜。

更改源在 Azure Cosmos DB 中的工作原理

Azure Cosmos DB 能够以增量方式读取对 Azure Cosmos DB 集合所做的更新。 此更改源具有以下属性:

  • 更改将在 Azure Cosmos DB 中持久保存,并能够以异步方式进行处理。
  • 对集合中的文档所做的更改立即在更改源中出现。
  • 每次对文档所做的更改只会在更改源中出现一次。 更改日志中仅包含最近对给定文档所做的更改, 而不包含中途的更改。
  • 更改源按照每个分区键值中的修改顺序排序。 无法保证各分区键值中的顺序一致。
  • 更改可从任意时间点同步,也就是说,发生更改的数据没有固定的保留期。
  • 更改以分区键范围区块提供。 多个使用者/服务器可以使用此功能并行处理大型集合中发生的更改。
  • 应用程序可以针对同一个集合同时请求多个更改源。

Azure Cosmos DB 的更改源默认已针对所有帐户启用,不会在帐户中产生任何额外的费用。 可以使用写入区域或任何读取区域中的预配吞吐量从更改源中读取数据,就像在 Azure Cosmos DB 中执行其他任何操作一样。 更改源包括针对集合中的文档所做的插入和更新操作。 可以通过在文档中的删除位置设置“软删除”标志来捕获删除操作。 或者,可以通过 TTL 功能为文档设置有限的到期期限(例如 24 小时),然后使用该属性的值捕获删除操作。 使用此解决方案时,处理更改的时间间隔必须比 TTL 过期期限要短。 更改源适用于文档集合中的每个分区键范围,因此,可以分散到一个或多个使用者供并行处理。

Azure Cosmos DB 更改源的分布式处理

以下部分介绍如何使用 Azure Cosmos DB REST API 和 SDK 访问更改源。 对于 .NET 应用程序,建议使用更改源处理器库处理来自更改源的事件。

使用 REST API 和 SDK

Azure Cosmos DB 为存储和吞吐量提供了称为集合的弹性容器。 集合中的数据已使用分区键进行逻辑分组,以提高可伸缩性与性能。 Azure Cosmos DB 提供了各种 API 来访问这些数据,包括按 ID(读取/获取)、查询和读取源(扫描)进行查找。 可以通过在 DocumentDB ReadDocumentFeed API 中填充两个新请求标头来获取更改源,然后跨多个分区键范围并行处理更改源。

ReadDocumentFeed API

让我们简单了解一下 ReadDocumentFeed 的工作原理。 Azure Cosmos DB 支持通过 ReadDocumentFeed API 读取集合中文档的源。 例如,以下请求返回 serverlogs 集合中的文档页面。

GET https://mydocumentdb.documents.azure.cn/dbs/smalldb/colls/serverlogs HTTP/1.1
x-ms-date: Tue, 22 Nov 2016 17:05:14 GMT
authorization: type%3dmaster%26ver%3d1.0%26sig%3dgo7JEogZDn6ritWhwc5hX%2fNTV4wwM1u9V2Is1H4%2bDRg%3d
Cache-Control: no-cache
x-ms-consistency-level: Strong
User-Agent: Microsoft.Azure.Documents.Client/1.10.27.5
x-ms-version: 2016-07-11
Accept: application/json
Host: mydocumentdb.documents.azure.cn

可以使用 x-ms-max-item-count 限制结果;可以通过使用前一响应中返回的 x-ms-continuation 标头重新提交请求来恢复读取。 在单个客户端中执行时,ReadDocumentFeed 以串行方式循环访问各分区的结果。

串行读取文档源

用户还可以使用某个受支持的 Azure Cosmos DB SDK 检索文档源。 例如,下面的代码片段演示如何在 .NET 中执行 ReadDocumentFeed。

FeedResponse<dynamic> feedResponse = null;
do
{
    feedResponse = await client.ReadDocumentFeedAsync(collection, new FeedOptions { MaxItemCount = -1 });
}
while (feedResponse.ResponseContinuation != null);

ReadDocumentFeed 的分布式执行

对于包含 TB 量级数据的集合,或者在引入大量更新的情况下,从一台客户端计算机以串行方式执行源读取可能不可行。 为了支持这些大数据方案,Azure Cosmos DB 提供了相应的 API,以透明方式在多个客户端读取者/使用者之间分布 ReadDocumentFeed 调用。

分布式读取文档源

为了针对增量更改提供可缩放的处理,Azure Cosmos DB 根据分区键的范围为更改源 API 提供扩展模型支持。

  • 执行 ReadPartitionKeyRanges 调用可以获取集合的分区键范围列表。
  • 对于每个分区键范围,可以执行 ReadDocumentFeed 来读取具有该范围内的分区键的文档。

检索集合的分区键范围

可以通过请求集合中的 pkranges 资源来检索分区键范围。 例如,以下请求检索 serverlogs 集合的分区键范围列表:

GET https://querydemo.documents.azure.cn/dbs/bigdb/colls/serverlogs/pkranges HTTP/1.1
x-ms-date: Tue, 15 Nov 2016 07:26:51 GMT
authorization: type%3dmaster%26ver%3d1.0%26sig%3dEConYmRgDExu6q%2bZ8GjfUGOH0AcOx%2behkancw3LsGQ8%3d
x-ms-consistency-level: Session
x-ms-version: 2016-07-11
Accept: application/json
Host: querydemo.documents.azure.cn

此请求返回以下响应,其中包含有关分区键范围的元数据:

HTTP/1.1 200 Ok
Content-Type: application/json
x-ms-item-count: 25
x-ms-schemaversion: 1.1
Date: Tue, 15 Nov 2016 07:26:51 GMT

{
   "_rid":"qYcAAPEvJBQ=",
   "PartitionKeyRanges":[
      {
         "_rid":"qYcAAPEvJBQCAAAAAAAAUA==",
         "id":"0",
         "_etag":"\"00002800-0000-0000-0000-580ac4ea0000\"",
         "minInclusive":"",
         "maxExclusive":"05C1CFFFFFFFF8",
         "_self":"dbs\/qYcAAA==\/colls\/qYcAAPEvJBQ=\/pkranges\/qYcAAPEvJBQCAAAAAAAAUA==\/",
         "_ts":1477100776
      },
      ...
   ],
   "_count": 25
}

分区键范围属性:每个分区键范围包括下表中的元数据属性:


标头名称 说明
id

分区键范围的 ID。 此 ID 在每个集合中是固定且唯一的。

必须在以下调用中使用此属性才能按分区键范围读取更改。

maxExclusive 分区键范围的最大分区键哈希值。 供内部使用。
minInclusive 分区键范围的最小分区键哈希值。 供内部使用。

可以使用某个受支持的 Azure Cosmos DB SDK 完成此操作。 例如,以下代码片段演示如何在 .NET 中检索分区键范围。

string pkRangesResponseContinuation = null;
List<PartitionKeyRange> partitionKeyRanges = new List<PartitionKeyRange>();

do
{
    FeedResponse<PartitionKeyRange> pkRangesResponse = await client.ReadPartitionKeyRangeFeedAsync(
        collectionUri, 
        new FeedOptions { RequestContinuation = pkRangesResponseContinuation });

    partitionKeyRanges.AddRange(pkRangesResponse);
    pkRangesResponseContinuation = pkRangesResponse.ResponseContinuation;
}
while (pkRangesResponseContinuation != null);

Azure Cosmos DB 支持通过设置可选的 x-ms-documentdb-partitionkeyrangeid 标头按分区键范围检索文档。

执行增量 ReadDocumentFeed

ReadDocumentFeed 支持使用以下方案/任务对 Azure Cosmos DB 集合中的更改进行增量处理:

  • 读取自始至终(从创建集合时开始)对文档所做的全部更改。
  • 读取从当前时间开始对文档所做的全部更改。
  • 通过集合的逻辑版本 (ETag) 读取对文档所做的全部更改。 可以通过增量读取源请求根据返回的 ETag 设置使用者检查点。

这些更改包括文档插入和更新。 若要捕获删除操作,必须在文档中使用“软删除”属性,或使用内置的 TTL 属性在更改源中发出待删除信号。

下表列出了 ReadDocumentFeed 操作的请求和响应标头。

增量 ReadDocumentFeed 的请求标头

标头名称 说明
A-IM 必须设置为“Incremental feed”,否则省略
If-None-Match

无标头:返回从一开始(创建集合时)所做的全部更改

"*":返回最近对集合中数据所做的全部更改

<etag>:如果设置为集合 ETag,则返回从该逻辑时间戳开始所做的全部更改

x-ms-documentdb-partitionkeyrangeid 用于读取数据的分区键范围 ID。

增量 ReadDocumentFeed 的响应标头

标头名称 说明
etag

响应中返回的最后一个文档的逻辑序列号 (LSN)。

在 If-None-Match 中重新提交此值可以恢复增量 ReadDocumentFeed。

以下示例请求通过逻辑版本/ ETag 28535 和分区键范围 16 返回集合中发生的所有增量更改:

GET https://mydocumentdb.documents.azure.cn/dbs/bigdb/colls/bigcoll/docs HTTP/1.1
x-ms-max-item-count: 1
If-None-Match: "28535"
A-IM: Incremental feed
x-ms-documentdb-partitionkeyrangeid: 16
x-ms-date: Tue, 22 Nov 2016 20:43:01 GMT
authorization: type%3dmaster%26ver%3d1.0%26sig%3dzdpL2QQ8TCfiNbW%2fEcT88JHNvWeCgDA8gWeRZ%2btfN5o%3d
x-ms-version: 2016-07-11
Accept: application/json
Host: mydocumentdb.documents.azure.cn

更改已按分区键范围内每个分区键值中的时间排序。 无法保证各分区键值中的顺序一致。 如果结果太多,无法在一个页面中显示,可以使用 If-None-Match 标头(其值等于前一响应中的 etag)重新提交请求来阅读下一页结果。 如果在存储过程或触发器中以事务方式插入或更新了多个文档,这些文档都会在同一个响应页面中返回。

Note

通过更改源,在存储过程或触发器中插入或更新了多个文档的情况下,页面中可能会返回比 x-ms-max-item-count 中指定的数目更多的项。

.NET SDK 提供 CreateDocumentChangeFeedQueryChangeFeedOptions帮助器类,以访问对集合进行的更改。 以下代码片段演示如何在单个客户端中使用.NET SDK 检索从一开始所做的全部更改。

private async Task<Dictionary<string, string>> GetChanges(
    DocumentClient client,
    string collection,
    Dictionary<string, string> checkpoints)
{
    string pkRangesResponseContinuation = null;
    List<PartitionKeyRange> partitionKeyRanges = new List<PartitionKeyRange>();

    do
    {
        FeedResponse<PartitionKeyRange> pkRangesResponse = await client.ReadPartitionKeyRangeFeedAsync(
            collectionUri, 
            new FeedOptions { RequestContinuation = pkRangesResponseContinuation });

        partitionKeyRanges.AddRange(pkRangesResponse);
        pkRangesResponseContinuation = pkRangesResponse.ResponseContinuation;
    }
    while (pkRangesResponseContinuation != null);

    foreach (PartitionKeyRange pkRange in partitionKeyRanges)
    {
        string continuation = null;
        checkpoints.TryGetValue(pkRange.Id, out continuation);

        IDocumentQuery<Document> query = client.CreateDocumentChangeFeedQuery(
            collection,
            new ChangeFeedOptions
            {
                PartitionKeyRangeId = pkRange.Id,
                StartFromBeginning = true,
                RequestContinuation = continuation,
                MaxItemCount = 1
            });

        while (query.HasMoreResults)
        {
            FeedResponse<DeviceReading> readChangesResponse = query.ExecuteNextAsync<DeviceReading>().Result;

            foreach (DeviceReading changedDocument in readChangesResponse)
            {
                Console.WriteLine(changedDocument.Id);
            }

            checkpoints[pkRange.Id] = readChangesResponse.ResponseContinuation;
        }
    }

    return checkpoints;
}

以下代码片段演示如何使用更改源支持和上述函数,在 Azure Cosmos DB 中实时处理更改。 第一个调用返回集合中的所有文档,第二个调用仅返回自上一个检查点后创建的两个文档。

// Returns all documents in the collection.
Dictionary<string, string> checkpoints = await GetChanges(client, collection, new Dictionary<string, string>());

await client.CreateDocumentAsync(collection, new DeviceReading { DeviceId = "xsensr-201", MetricType = "Temperature", Unit = "Celsius", MetricValue = 1000 });
await client.CreateDocumentAsync(collection, new DeviceReading { DeviceId = "xsensr-212", MetricType = "Pressure", Unit = "psi", MetricValue = 1000 });

// Returns only the two documents created above.
checkpoints = await GetChanges(client, collection, checkpoints);

还可以筛选更改源 - 使用客户端逻辑有选择性地处理事件即可。 例如,以下代码片段使用客户端 LINQ 专门处理设备传感器发送的温度更改事件。

FeedResponse<DeviceReading> readChangesResponse = query.ExecuteNextAsync<DeviceReading>().Result;

foreach (DeviceReading changedDocument in 
    readChangesResponse.AsEnumerable().Where(d => d.MetricType == "Temperature" && d.MetricValue > 1000L))
{
    // trigger an action, like call an API
}

更改源处理器库

Azure Cosmos DB 更改源处理器库可用于从多个使用者的更改源分发事件处理。 在 .NET 平台上生成更改源读取器时,应使用此实现。 ChangeFeedProcessorHost 类为事件处理器实现提供线程安全、多进程安全的运行时环境,该环境还能提供检查点和分区租用管理。

若要使用 ChangeFeedProcessorHost 类,可实现 IChangeFeedObserver。 此接口包含三个方法:

  • OpenAsync
  • CloseAsync
  • ProcessEventsAsync

若要开始处理事件,请实例化 ChangeFeedProcessorHost,并为 Azure Cosmos DB 集合提供适当的参数。 然后,调用 RegisterObserverAsync 在运行时注册 IChangeFeedObserver 实现。 此时,主机会尝试使用“贪婪”算法获取 Azure Cosmos DB 集合内每个分区键范围上的租约。 这些租约将持续指定的时限,并且必须续订。 新节点(本例中的工作线程实例)进入联机状态时,它们会保留租约,以后每次尝试获取更多租约时,负载会在节点之间转移。

使用 Azure Cosmos DB 更改源处理器主机

经过一段时间后,就会建立平衡。 通过这种动态功能,可以向使用者应用基于 CPU 的自动缩放,以实现向上扩展和向下缩减。 如果 Azure Cosmos DB 中的更改提供速率超过了使用者可以处理的速率,则可使用使用者的 CPU 增大功能来实现辅助角色实例数的自动缩放。

ChangeFeedProcessorHost 类还使用单独的 Azure Cosmos DB 租约集合实现了检查点机制。 此机制按分区存储偏移量,使每个使用者都能确定前一个使用者的最后一个检查点是什么。 当分区通过租约在节点之间转移时,正是这个同步机制在促进负载转移。

以下是用于将更改打印到控制台的简单更改源处理器主机的代码片段:

    class DocumentFeedObserver : IChangeFeedObserver
    {
        private static int s_totalDocs = 0;
        public Task OpenAsync(ChangeFeedObserverContext context)
        {
            Console.WriteLine("Worker opened, {0}", context.PartitionKeyRangeId);
            return Task.CompletedTask;  // Requires targeting .NET 4.6+.
        }
        public Task CloseAsync(ChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
        {
            Console.WriteLine("Worker closed, {0}", context.PartitionKeyRangeId);
            return Task.CompletedTask;
        }
        public Task ProcessEventsAsync(IReadOnlyList<Document> docs, ChangeFeedObserverContext context)
        {
            Console.WriteLine("Change feed: total {0} doc(s)", Interlocked.Add(ref s_totalDocs, docs.Count));
            return Task.CompletedTask;
        }
    }

以下代码片段演示如何注册新主机以侦听 Azure Cosmos DB 集合的更改。 在这里我们配置单独的集合,以管理跨多个使用者的分区的租约:

    string hostName = Guid.NewGuid().ToString();
    DocumentCollectionInfo documentCollectionLocation = new DocumentCollectionInfo
    {
        Uri = new Uri("https://YOUR_SERVICE.documents.azure.cn:443/"),
        MasterKey = "YOUR_SECRET_KEY==",
        DatabaseName = "db1",
        CollectionName = "documents"
    };

    DocumentCollectionInfo leaseCollectionLocation = new DocumentCollectionInfo
    {
        Uri = new Uri("https://YOUR_SERVICE.documents.azure.cn:443/"),
        MasterKey = "YOUR_SECRET_KEY==",
        DatabaseName = "db1",
        CollectionName = "leases"
    };

    ChangeFeedEventHost host = new ChangeFeedEventHost(hostName, documentCollectionLocation, leaseCollectionLocation);
    await host.RegisterObserverAsync<DocumentFeedObserver>();

本文逐步讲解了 Azure Cosmos DB 的更改源支持,以及如何使用 REST API 和/或 SDK 跟踪对 Azure Cosmos DB 数据所做的更改。

后续步骤