使用 Azure Cosmos DB 中的更改源支持

Azure Cosmos DB 是一种快速且灵活的全局复制数据库,适用于 IoT、游戏、零售以及运营日志记录应用程序。 在这些应用程序中,一个常见的设计模式是根据对数据所做的更改来触发其他操作。 这些其他操作可以是下述任何操作:

  • 插入或修改文档时触发 API 通知或调用。
  • 针对 IoT 进行流式处理,或者执行分析。
  • 通过与缓存、搜索引擎或数据仓库同步,或者将数据存档到冷存储,进行更多的数据移动。

使用 Azure Cosmos DB 中的更改源支持,可以针对其中的每种模式构建高效、可缩放的解决方案,如下图所示:

使用 Azure Cosmos DB 更改源促成实时分析和事件驱动的计算方案

Note

为 Azure Cosmos DB 中的所有数据模型和容器提供更改源支持。 但是,更改源是使用 DocumentDB 客户端读取的,并将项目序列化为 JSON 格式。 由于 JSON 格式化,MongoDB 客户端会体验到 BSON 格式文档和 JSON 格式更改源之间的不匹配。

更改源工作原理

Azure Cosmos DB 中的更改源支持的工作原理是:侦听 Azure Cosmos DB 集合中出现的任何更改, 然后会输出一个排序的列表,其中包含已更改的文档,其顺序与修改顺序一样。 所做的更改会持久保存,并能够以异步和增量方式进行处理,而且输出可以分发到一个或多个使用者进行并行处理。

可以通过三种不同的方式读取更改源,如本文后面所述:

  1. 使用 Azure Cosmos DB SDK
  2. 使用 Azure Cosmos DB 更改源处理器库

更改源适用于文档集合中的每个分区键范围,因此,可以将其分散到一个或多个使用者进行并行处理,如下图所示。

Azure Cosmos DB 更改源的分布式处理

其他详细信息:

  • 默认为所有帐户启用更改源。
  • 可以使用写入区域或任何读取区域中的预配吞吐量从更改源中读取数据,就像任何其他 Azure Cosmos DB 操作一样。
  • 更改源包括针对集合中的文档所做的插入和更新操作。 可以通过在文档中的删除位置设置“软删除”标志来捕获删除操作。 或者,可以通过 TTL 功能为文档设置有限的到期期限(例如 24 小时),然后使用该属性的值捕获删除操作。 使用此解决方案时,处理更改的时间间隔必须比 TTL 过期期限要短。
  • 在更改源中,对文档的每个更改都将显示一次,且客户端管理其检查点逻辑。 更改源处理器库提供自动检查点和“至少一次”语义。
  • 更改日志中仅包含最近对给定文档所做的更改, 而不包含中途的更改。
  • 更改源按照每个分区键值中的修改顺序排序。 无法保证各分区键值中的顺序一致。
  • 更改可从任意时间点同步,也就是说,发生更改的数据没有固定的保留期。
  • 更改以分区键范围区块提供。 多个使用者/服务器可以使用此功能并行处理大型集合中发生的更改。
  • 应用程序可以针对同一个集合同时请求多个更改源。

用例和方案

使用更改源可对具有大量写入操作的大型数据集进行有效处理,这样就不需要查询整个数据集来识别发生了哪些更改。

例如,可以使用更改源有效地执行以下任务:

  • 使用 Azure Cosmos DB 中存储的数据更新缓存、搜索索引或数据仓库。
  • 实现应用程序级别的数据分层和存档,即,将“热数据”存储在 Azure Cosmos DB 中,将“冷数据”搁置在 Azure Blob 存储Azure Data Lake Store 中。
  • 使用 Apache Hadoop 实现数据批量分析。
  • 在不造成任何停机的情况下迁移到使用不同分区方案的另一个 Azure Cosmos DB 帐户。
  • 使用 Azure Cosmos DB 在 Azure 上实现 lambda 管道。 Azure Cosmos DB 提供了一种可缩放的数据库解决方案,该解决方案可处理引入和查询,实现 TCO 较低的 lambda 体系结构。
  • 接收和存储设备、传感器、基础结构和应用程序发出的事件数据,并使用 Azure 流分析Apache StormApache Spark 实时处理这些事件。

下图显示了 lambda 管道如何使用更改源支持,这些管道使用 Azure Cosmos DB 进行引入和查询:

用于引入和查询的基于 Azure Cosmos DB 的 lambda 管道

另外,可以在无服务器 Web 应用和移动应用中跟踪各种事件(例如,对客户配置文件、首选项或位置的更改),这些事件会触发特定的操作 例如,若要使用 Azure Cosmos DB 来构建游戏,可以使用更改源,根据已完成的游戏的分数实时更新排行榜。

使用 SDK

用于 Azure Cosmos DB 的 DocumentDB SDK 提供了读取和管理更改源所需的所有功能。 但是,功能越强,责任也越重。 若要管理检查点、处理文档序列号、对分区键进行精细控制,则可使用 SDK。

此部分详细介绍如何使用 DocumentDB SDK 来处理更改源。

  1. 一开始请读取 appconfig 中的以下资源。 更新连接字符串中提供了检索终结点和授权密钥的说明。

    DocumentClient client;
    string DatabaseName = ConfigurationManager.AppSettings["database"];
    string CollectionName = ConfigurationManager.AppSettings["collection"];
    string endpointUrl = ConfigurationManager.AppSettings["endpoint"];
    string authorizationKey = ConfigurationManager.AppSettings["authKey"];
    
  2. 按如下所述创建此客户端:

    using (client = new DocumentClient(new Uri(endpointUrl), authorizationKey,
    new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp }))
    {
    }
    
  3. 获取分区键范围:

    FeedResponse pkRangesResponse = await client.ReadPartitionKeyRangeFeedAsync(
        collectionUri,
        new FeedOptions
            {RequestContinuation = pkRangesResponseContinuation });
    
    partitionKeyRanges.AddRange(pkRangesResponse);
    pkRangesResponseContinuation = pkRangesResponse.ResponseContinuation;
    
  4. 针对每个分区键范围调用 ExecuteNextAsync:

    foreach (PartitionKeyRange pkRange in partitionKeyRanges){
        string continuation = null;
        checkpoints.TryGetValue(pkRange.Id, out continuation);
        IDocumentQuery<Document> query = client.CreateDocumentChangeFeedQuery(
            collectionUri,
            new ChangeFeedOptions
            {
                PartitionKeyRangeId = pkRange.Id,
                StartFromBeginning = true,
                RequestContinuation = continuation,
                MaxItemCount = -1,
                // Set reading time: only show change feed results modified since StartTime
                StartTime = DateTime.Now - TimeSpan.FromSeconds(30)
            });
        while (query.HasMoreResults)
            {
                FeedResponse<dynamic> readChangesResponse = query.ExecuteNextAsync<dynamic>().Result;
    
                foreach (dynamic changedDocument in readChangesResponse)
                    {
                         Console.WriteLine("document: {0}", changedDocument);
                    }
                checkpoints[pkRange.Id] = readChangesResponse.ResponseContinuation;
            }
    }
    

如果有多个读取器,则可使用 ChangeFeedOptions 将读取负载分发到不同的线程或客户端。

使用短短的几行代码即可读取更改源,就这么简单! 可从 GitHub 存储库获取本文中使用的完整代码。

在上面的步骤 4 的代码中,最后一行的 ResponseContinuation 具有文档的最后一个逻辑序列号 (LSN),下次读取该序列号后面的新文档时需要用到。 可以使用 ChangeFeedOption 的 StartTime 扩大获取文档的范围。 因此,如果 ResponseContinuation 为 null,但 StartTime 为以前的时间,则会获取自 StartTime 以来更改过的所有文档。 但是,如果 ResponseContinuation 有一个值,则系统会提供自该 LSN 以来的所有文档。

因此,检查点数组只是保留每个分区的 LSN。 但是,如果不想处理分区、检查点、LSN、启动时间等事项,则可选择较简单的选项,即使用更改源处理器库。

使用更改源处理器库

使用 Azure Cosmos DB 更改源处理器库,可以轻松地跨多个使用者分发事件处理。 此库简化了跨分区和多个并行工作的线程读取更改的过程。

更改源处理器库的主要好处是,不需管理每个分区和继续标记,也不需手动轮询每个集合。

更改源处理器库简化了跨分区和多个并行工作的线程读取更改的过程。 它通过租用机制自动管理跨分区读取更改的过程。 如下图所示,如果启动两个使用更改源处理器库的客户端,这两个客户端会在内部分配工作。 继续增加客户端的数量时,这些客户端仍在内部分配工作。

Azure Cosmos DB 更改源的分布式处理

左客户端首先启动,并开始监视所有分区;接着第二个客户端启动,然后第一个客户端将部分租约转给第二个客户端。 可以看出,这是在不同计算机和客户端之间分发工作的理想方式。

请注意,如果有两个无服务器 Azure 函数在监视同一集合并使用同一租约,则这两个函数可能会获取不同的文档,具体取决于处理器库如何决定分区的处理。

了解更改源处理器库

实现更改源处理器需要四个主要组件:监视集合、租用集合、处理器主机和使用者。

Warning

创建集合会影响定价,因为要保留应用程序的吞吐量才能与 Azure Cosmos DB 进行通信。 有关详细信息,请访问定价页

监视集合:监视集合是生成更改源的数据。 对监视集合的任何插入和更改都会反映在集合的更改源中。

租用集合:租用集合协调处理跨多个辅助角色的更改源。 单独集合用于存储租用,一个分区一个租用。 最好将此租用集合存储在不同的帐户(写入区域更靠近更改源处理器运行位置)。 租用对象有以下属性:

  • 所有者:指定拥有租用的主机
  • 继续:为特殊分区指定更改源中的位置(继续标记)
  • 时间戳:租用最近更新时间;时间戳可用于检查租用是否到期

处理器主机:每个主机根据具有活动租用的主机其他实例数目,确定要处理的分区数目。

  1. 主机启动时,将获取租用,以在所有主机中均衡工作负荷。 主机定期续订租用,使租用保持活动状态。
  2. 主机为每次读取检查其租用的最近继续标记。 为确保并发安全,主机会检查 ETag 的每次租用更新。 此外还支持其他检查点策略。
  3. 关闭后,主机会释放所有租用,但保留继续信息,以便稍后从存储检查点继续读取。

此时,主机数量不能大于分区(租用)数量。

使用者:使用者或辅助角色是执行每个主机启动的更改源处理的线程。 每个处理器主机可以有多个使用者。 每个使用者从分配给其的分区上读取更改源,并就更改和过期的租用通知主机。

若要进一步了解更改源处理器的四个元素是如何协同工作的,请看下图中的一个示例。 监视集合存储文档,并将“city”用作分区键。 可以看到蓝色分区包含“A - E”中的“city”字段的文档。 有两个主机,每个主机有两个从四个并行分区读取的使用者。 箭头显示的是从更改源中特定位置读取的使用者。 在第一个分区中,深蓝色表示更改源上未读取的更改,而浅蓝色表示更改源上已读取的更改。 主机使用租用集合来存储“继续”值,跟踪每个使用者的当前读取位置。

使用 Azure Cosmos DB 更改源处理器主机

使用更改源处理器库

安装更改源处理器 NuGet 包之前,请先安装:

  • Microsoft.Azure.DocumentDB 1.13.1 或更高版本
  • Newtonsoft.Json 9.0.1 或更高版本

然后安装 Microsoft.Azure.DocumentDB.ChangeFeedProcessor Nuget 包并将其作为引用包括进去。

若要实现更改源处理器库,必须执行以下操作:

  1. 实现 DocumentFeedObserver 对象,以便实现 IChangeFeedObserver。

  2. 实现 DocumentFeedObserverFactory,以便实现 IChangeFeedObserverFactory。

  3. 在 DocumentFeedObserverFacory 的 CreateObserver 方法中,实例化在步骤 1 中创建的 ChangeFeedObserver,然后将其返回。

    public IChangeFeedObserver CreateObserver()
    {
              DocumentFeedObserver newObserver = new DocumentFeedObserver(this.client, this.collectionInfo);
              return newObserver;
    }
    
  4. 实例化 DocumentObserverFactory。

  5. 实例化 ChangeFeedEventHost:

    ChangeFeedEventHost host = new ChangeFeedEventHost(
                     hostName,
                     documentCollectionLocation,
                     leaseCollectionLocation,
                     feedOptions,
                     feedHostOptions);
    
  6. 向主机注册 DocumentFeedObserverFactory。

步骤 4 到 6 的代码为:

ChangeFeedOptions feedOptions = new ChangeFeedOptions();
feedOptions.StartFromBeginning = true;

ChangeFeedHostOptions feedHostOptions = new ChangeFeedHostOptions();

// Customizing lease renewal interval to 15 seconds.
// Can customize LeaseRenewInterval, LeaseAcquireInterval, LeaseExpirationInterval, FeedPollDelay
feedHostOptions.LeaseRenewInterval = TimeSpan.FromSeconds(15);

using (DocumentClient destClient = new DocumentClient(destCollInfo.Uri, destCollInfo.MasterKey))
{
        DocumentFeedObserverFactory docObserverFactory = new DocumentFeedObserverFactory(destClient, destCollInfo);
        ChangeFeedEventHost host = new ChangeFeedEventHost(hostName, documentCollectionLocation, leaseCollectionLocation, feedOptions, feedHostOptions);
        await host.RegisterObserverFactoryAsync(docObserverFactory);
        await host.UnregisterObserversAsync();
}

就这么简单。 经过这些步骤以后,文档将介绍 DocumentFeedObserver ProcessChangesAsync 方法。

后续步骤

若要详细了解如何使用更改源处理器库,请参阅以下资源:

若要详细了解如何通过 SDK 使用更改源,请参阅以下资源: