使用 Azure Cosmos DB 中的更改源支持

Azure Cosmos DB 是一种快速且灵活的多区域复制数据库,适用于 IoT、游戏、零售以及运营日志记录应用程序。 在这些应用程序中,一个常见的设计模式是根据对数据所做的更改来触发其他操作。 这些其他操作可以是下述任何操作:

  • 插入或修改文档时触发 API 通知或调用。
  • 针对 IoT 进行流式处理,或者执行分析。
  • 通过与缓存、搜索引擎或数据仓库同步,或者将数据存档到冷存储,进行更多的数据移动。

使用 Azure Cosmos DB 中的更改源支持,可以针对其中的每种模式构建高效、可缩放的解决方案,如下图所示:

使用 Azure Cosmos DB 更改源促成实时分析和事件驱动的计算方案

Note

为 Azure Cosmos DB 中的所有数据模型和容器提供更改源支持。 但是,更改源是使用 SQL 客户端读取的,并将项目序列化为 JSON 格式。 由于 JSON 格式化,MongoDB 客户端会体验到 BSON 格式文档和 JSON 格式更改源之间的不匹配。

更改源工作原理

Azure Cosmos DB 中的更改源支持的工作原理是:侦听 Azure Cosmos DB 集合中出现的任何更改, 然后会输出一个排序的列表,其中包含已更改的文档,其顺序与修改顺序一样。 所做的更改会持久保存,并能够以异步和增量方式进行处理,而且输出可以分发到一个或多个使用者进行并行处理。

可以通过三种不同的方式读取更改源,如本文后面所述:

更改源适用于文档集合中的每个分区键范围,因此,可以将其分散到一个或多个使用者进行并行处理,如下图所示。

Azure Cosmos DB 更改源的分布式处理

其他详细信息:

  • 默认为所有帐户启用更改源。
  • 可以使用写入区域或任何读取区域中的预配吞吐量从更改源中读取数据,就像任何其他 Azure Cosmos DB 操作一样。
  • 更改源包括针对集合中的文档所做的插入和更新操作。 可以通过在文档中的删除位置设置“软删除”标志来捕获删除操作。 或者,可以通过 TTL 功能为文档设置有限的到期期限(例如 24 小时),然后使用该属性的值捕获删除操作。 使用此解决方案时,处理更改的时间间隔必须比 TTL 过期期限要短。
  • 在更改源中,对文档的每个更改都将显示一次,且客户端管理其检查点逻辑。 更改源处理器库提供自动检查点和“至少一次”语义。
  • 更改日志中仅包含最近对给定文档所做的更改, 而不包含中途的更改。
  • 更改源按照每个分区键值中的修改顺序排序。 无法保证各分区键值中的顺序一致。
  • 更改可从任意时间点同步,也就是说,发生更改的数据没有固定的保留期。
  • 更改以分区键范围区块提供。 多个使用者/服务器可以使用此功能并行处理大型集合中发生的更改。
  • 应用程序可以针对同一个集合同时请求多个更改源。
  • 可以使用 ChangeFeedOptions.StartTime 提供初始起点,例如,为了查找给定时钟时间所对应的继续标记。 ContinuationToken(如果指定)将优先于 StartTime 和 StartFromBeginning 值。 ChangeFeedOptions.StartTime 的精度是 ~5 秒。

用例和方案

使用更改源可对具有大量写入操作的大型数据集进行有效处理,这样就不需要查询整个数据集来识别发生了哪些更改。

例如,可以使用更改源有效地执行以下任务:

  • 使用 Azure Cosmos DB 中存储的数据更新缓存、搜索索引或数据仓库。
  • 实现应用程序级别的数据分层和存档,即,将“热数据”存储在 Azure Cosmos DB 中,将“冷数据”搁置在 Azure Blob 存储中。
  • 在不造成任何停机的情况下迁移到使用不同分区方案的另一个 Azure Cosmos DB 帐户。
  • 使用 Azure Cosmos DB 在 Azure 上实现 lambda 管道。 Azure Cosmos DB 提供了一种可缩放的数据库解决方案,该解决方案可处理引入和查询,实现 TCO 较低的 lambda 体系结构。
  • 接收和存储设备、传感器、基础结构和应用程序发出的事件数据,并使用 Azure 流分析Apache StormApache Spark 实时处理这些事件。

下图显示了 lambda 管道如何使用更改源支持,这些管道使用 Azure Cosmos DB 进行引入和查询:

用于引入和查询的基于 Azure Cosmos DB 的 lambda 管道

使用 Azure Functions

如果使用的是 Azure Functions,则若要连接到 Azure Cosmos DB 更改源,最简单的方法是向 Azure Functions 应用添加 Azure Cosmos DB 触发器。 在 Azure Functions 应用中创建 Azure Cosmos DB 触发器时,请选择要连接到的 Azure Cosmos DB 集合。然后,每当出现集合更改时,系统就会触发该函数。

可以通过 Azure Functions 门户、Azure Cosmos DB 门户或编程方式创建触发器。

使用 SDK

用于 Azure Cosmos DB 的 SQL SDK 提供了读取和管理更改源所需的所有功能。 但是,功能越强,责任也越重。 若要管理检查点、处理文档序列号、对分区键进行精细控制,则可使用 SDK。

本部分详细介绍了如何使用 SQL SDK 来处理更改源。

  1. 一开始请读取 appconfig 中的以下资源。 更新连接字符串中提供了检索终结点和授权密钥的说明。

    DocumentClient client;
    string DatabaseName = ConfigurationManager.AppSettings["database"];
    string CollectionName = ConfigurationManager.AppSettings["collection"];
    string endpointUrl = ConfigurationManager.AppSettings["endpoint"];
    string authorizationKey = ConfigurationManager.AppSettings["authKey"];
    
  2. 按如下所述创建此客户端:

    using (client = new DocumentClient(new Uri(endpointUrl), authorizationKey,
    new ConnectionPolicy { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Protocol.Tcp }))
    {
    }
    
  3. 获取分区键范围:

    FeedResponse pkRangesResponse = await client.ReadPartitionKeyRangeFeedAsync(
        collectionUri,
        new FeedOptions
            {RequestContinuation = pkRangesResponseContinuation });
    
    partitionKeyRanges.AddRange(pkRangesResponse);
    pkRangesResponseContinuation = pkRangesResponse.ResponseContinuation;
    
  4. 针对每个分区键范围调用 ExecuteNextAsync:

    foreach (PartitionKeyRange pkRange in partitionKeyRanges){
        string continuation = null;
        checkpoints.TryGetValue(pkRange.Id, out continuation);
        IDocumentQuery<Document> query = client.CreateDocumentChangeFeedQuery(
            collectionUri,
            new ChangeFeedOptions
            {
                PartitionKeyRangeId = pkRange.Id,
                StartFromBeginning = true,
                RequestContinuation = continuation,
                MaxItemCount = -1,
                // Set reading time: only show change feed results modified since StartTime
                StartTime = DateTime.Now - TimeSpan.FromSeconds(30)
            });
        while (query.HasMoreResults)
            {
                FeedResponse<dynamic> readChangesResponse = query.ExecuteNextAsync<dynamic>().Result;
    
                foreach (dynamic changedDocument in readChangesResponse)
                    {
                         Console.WriteLine("document: {0}", changedDocument);
                    }
                checkpoints[pkRange.Id] = readChangesResponse.ResponseContinuation;
            }
    }
    

Note

可以使用 ChangeFeedOptions.PartitionKey(而不是 ChangeFeedOptions.PartitionKeyRangeId)指定要为其获取更改源的单个分区键。 例如,PartitionKey = new PartitionKey("D8CFA2FD-486A-4F3E-8EA6-F3AA94E5BD44")

如果有多个读取器,则可使用 ChangeFeedOptions 将读取负载分发到不同的线程或客户端。

使用短短的几行代码即可读取更改源,就这么简单! 可从 GitHub 存储库获取本文中使用的完整代码。

在上面的步骤 4 的代码中,最后一行的 ResponseContinuation 具有文档的最后一个逻辑序列号 (LSN),下次读取该序列号后面的新文档时需要用到。 可以使用 ChangeFeedOption 的 StartTime 扩大获取文档的范围。 因此,如果 ResponseContinuation 为 null,但 StartTime 为以前的时间,则会获取自 StartTime 以来更改过的所有文档。 但是,如果 ResponseContinuation 有一个值,则系统会提供自该 LSN 以来的所有文档。

因此,检查点数组只是保留每个分区的 LSN。 但是,如果不想处理分区、检查点、LSN、启动时间等项,则可选择较简单的选项,即使用更改源处理器库。

使用更改源处理器库

借助 Azure Cosmos DB 更改源处理器库,可以轻松地在多个使用者之间分配事件处理负载。 此库简化了跨分区和多个并行工作的线程读取更改的过程。

更改源处理器库的主要好处是,不需管理每个分区和继续标记,也不需手动轮询每个集合。

更改源处理器库简化了跨分区的以及并行工作的多个线程中的更改的读取。 它通过租用机制自动管理跨分区读取更改的过程。 如下图中所示,如果启动两个使用更改源处理器库的客户端,它们会在彼此之间划分工作。 继续增加客户端的数量时,这些客户端仍在内部分配工作。

Azure Cosmos DB 更改源的分布式处理

左客户端首先启动,并开始监视所有分区;接着第二个客户端启动,然后第一个客户端将部分租约转给第二个客户端。 可以看出,这是在不同计算机和客户端之间分发工作的理想方式。

请注意,如果有两个无服务器 Azure 函数在监视同一集合并使用同一租约,则这两个函数可能会获取不同的文档,具体取决于处理器库如何决定分区的处理。

了解更改源处理器库

实现更改源处理器库需要四个主要组件:监视集合、租用集合、处理器主机和使用者。

Warning

创建集合会影响定价,因为要保留应用程序的吞吐量才能与 Azure Cosmos DB 进行通信。 有关详细信息,请访问定价页

监视集合:监视集合是生成更改源的数据。 对监视集合的任何插入和更改都会反映在集合的更改源中。

租用集合:租用集合协调处理跨多个辅助角色的更改源。 单独集合用于存储租用,一个分区一个租用。 最好将此租用集合存储在不同的帐户(写入区域更靠近更改源处理器运行位置)。 租用对象有以下属性:

  • 所有者:指定拥有租用的主机
  • 继续:为特殊分区指定更改源中的位置(继续标记)
  • 时间戳:租用最近更新时间;时间戳可用于检查租用是否到期

处理器主机:每个主机根据具有活动租用的主机其他实例数目,确定要处理的分区数目。

  1. 主机启动时,将获取租用,以在所有主机中均衡工作负荷。 主机定期续订租用,使租用保持活动状态。
  2. 主机为每次读取检查其租用的最近继续标记。 为确保并发安全,主机会检查 ETag 的每次租用更新。 此外还支持其他检查点策略。
  3. 关闭后,主机会释放所有租用,但保留继续信息,以便稍后从存储检查点继续读取。

此时,主机数量不能大于分区(租用)数量。

使用者:使用者或辅助角色是执行每个主机启动的更改源处理的线程。 每个处理器主机可以有多个使用者。 每个使用者从分配给其的分区上读取更改源,并就更改和过期的租用通知主机。

若要进一步了解更改源处理器的四个元素是如何协同工作的,请看下图中的一个示例。 监视集合存储文档,并将“city”用作分区键。 可以看到蓝色分区包含“A - E”中的“city”字段的文档。 有两个主机,每个主机有两个从四个并行分区读取的使用者。 箭头显示的是从更改源中特定位置读取的使用者。 在第一个分区中,深蓝色表示更改源上未读取的更改,而浅蓝色表示更改源上已读取的更改。 主机使用租用集合来存储“继续”值,跟踪每个使用者的当前读取位置。

使用 Azure Cosmos DB 更改源处理器主机

使用更改源处理器库

安装更改源处理器 NuGet 包之前,请先安装:

  • Microsoft.Azure.DocumentDB 最新版本。
  • Newtonsoft.Json 最新版本

然后安装 Microsoft.Azure.DocumentDB.ChangeFeedProcessor Nuget 包并将其作为引用包括进去。

若要实现更改源处理器库,必须执行以下操作:

  1. 实现 DocumentFeedObserver 对象,以便实现 IChangeFeedObserver。

    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.Documents;
    using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
    using Microsoft.Azure.Documents.Client;
    
    /// <summary>
    /// This class implements the IChangeFeedObserver interface and is used to observe 
    /// changes on change feed. ChangeFeedEventHost will create as many instances of 
    /// this class as needed. 
    /// </summary>
    public class DocumentFeedObserver : IChangeFeedObserver
    {
        private static int totalDocs = 0;
    
        /// <summary>
        /// Initializes a new instance of the <see cref="DocumentFeedObserver" /> class.
        /// Saves input DocumentClient and DocumentCollectionInfo parameters to class fields
        /// </summary>
        /// <param name="client"> Client connected to destination collection </param>
        /// <param name="destCollInfo"> Destination collection information </param>
        public DocumentFeedObserver()
        {
    
        }
    
        /// <summary>
        /// Called when change feed observer is opened; 
        /// this function prints out observer partition key id. 
        /// </summary>
        /// <param name="context">The context specifying partition for this observer, etc.</param>
        /// <returns>A Task to allow asynchronous execution</returns>
        public Task OpenAsync(IChangeFeedObserverContext context)
        {
            Console.ForegroundColor = ConsoleColor.Magenta;
            Console.WriteLine("Observer opened for partition Key Range: {0}", context.PartitionKeyRangeId);
            return Task.CompletedTask;
        }
    
        /// <summary>
        /// Called when change feed observer is closed; 
        /// this function prints out observer partition key id and reason for shut down. 
        /// </summary>
        /// <param name="context">The context specifying partition for this observer, etc.</param>
        /// <param name="reason">Specifies the reason the observer is closed.</param>
        /// <returns>A Task to allow asynchronous execution</returns>
        public Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
        {
            Console.ForegroundColor = ConsoleColor.Cyan;
            Console.WriteLine("Observer closed, {0}", context.PartitionKeyRangeId);
            Console.WriteLine("Reason for shutdown, {0}", reason);
            return Task.CompletedTask;
        }
    
        public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Document> docs, CancellationToken cancellationToken)
        {
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine("Change feed: PartitionId {0} total {1} doc(s)", context.PartitionKeyRangeId, Interlocked.Add(ref totalDocs, docs.Count));
            foreach (Document doc in docs)
            {
                Console.ForegroundColor = ConsoleColor.Yellow;
                Console.WriteLine(doc.Id.ToString());
            }
    
            return Task.CompletedTask;
        }
    }
    
  2. 实现 DocumentFeedObserverFactory,以便实现 IChangeFeedObserverFactory。

     using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
    
    /// <summary>
    /// Factory class to create instance of document feed observer. 
    /// </summary>
    public class DocumentFeedObserverFactory : IChangeFeedObserverFactory
    {
        /// <summary>
        /// Initializes a new instance of the <see cref="DocumentFeedObserverFactory" /> class.
        /// Saves input DocumentClient and DocumentCollectionInfo parameters to class fields
        /// </summary>
        public DocumentFeedObserverFactory()
        {
        }
    
        /// <summary>
        /// Creates document observer instance with client and destination collection information
        /// </summary>
        /// <returns>DocumentFeedObserver with client and destination collection information</returns>
        public IChangeFeedObserver CreateObserver()
        {
            DocumentFeedObserver newObserver = new DocumentFeedObserver();
            return newObserver as IChangeFeedObserver;
        }
    }
    
  3. 定义 CancellationTokenSource 和 ChangeFeedProcessorBuilder

    private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
    private readonly ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder();
    
  4. 在定义相关对象后生成 ChangeFeedProcessorBuilder

    string hostName = Guid.NewGuid().ToString();
    
    // monitored collection info 
    DocumentCollectionInfo documentCollectionInfo = new DocumentCollectionInfo
    {
        Uri = new Uri(this.monitoredUri),
        MasterKey = this.monitoredSecretKey,
        DatabaseName = this.monitoredDbName,
        CollectionName = this.monitoredCollectionName
    };
    
    DocumentCollectionInfo leaseCollectionInfo = new DocumentCollectionInfo
        {
            Uri = new Uri(this.leaseUri),
            MasterKey = this.leaseSecretKey,
            DatabaseName = this.leaseDbName,
            CollectionName = this.leaseCollectionName
        };
    DocumentFeedObserverFactory docObserverFactory = new DocumentFeedObserverFactory();
    
    ChangeFeedProcessorOptions feedProcessorOptions = new ChangeFeedProcessorOptions();
    
    // ie. customizing lease renewal interval to 15 seconds
    // can customize LeaseRenewInterval, LeaseAcquireInterval, LeaseExpirationInterval, FeedPollDelay 
    feedProcessorOptions.LeaseRenewInterval = TimeSpan.FromSeconds(15);
    feedProcessorOptions.StartFromBeginning = true;
    
    this.builder
        .WithHostName(hostName)
        .WithFeedCollection(documentCollectionInfo)
        .WithLeaseCollection(leaseCollectionInfo)
        .WithProcessorOptions (feedProcessorOptions)
        .WithObserverFactory(new DocumentFeedObserverFactory());               
        //.WithObserver<DocumentFeedObserver>();  If no factory then just pass an observer
    
    var result =  await this.builder.BuildAsync();
    await result.StartAsync();
    Console.Read();
    await result.StopAsync();    
    

就这么简单。 完成这几个步骤后,文档会开始显示在 DocumentFeedObserver ProcessChangesAsync 方法中。

上述代码用于说明目的,以显示不同类型的对象及其交互。 必须定义适当的变量并使用正确的值启动它们。 可从 GitHub 存储库获取本文中使用的完整代码。

Note

不应在代码或配置文件中包含主密钥,如上述代码所示。 请参阅如何使用 Key Vault 检索密钥

常见问题

可通过哪些不同的方法读取更改源?何时使用哪种方法?

可通过三个选项读取更改源:

  • 使用 Azure Cosmos DB SQL API .NET SDK

    使用此方法可对更改源进行低级控制。 可以管理检查点、访问特定的分区键,等等。如果有多个读取器,则可使用 ChangeFeedOptions 将读取负载分发到不同的线程或客户端。 上获取。

  • 使用 Azure Cosmos DB 更改源处理器库

    若要大量转移更改源的复杂性,可以使用更改源处理器库。 此库可以大幅消除复杂性,同时仍让你保持更改源的完全控制度。 此库遵循观察程序模式,处理函数由 SDK 调用。

    如果有高吞吐量的更改源,可以实例化多个客户端来读取更改源。 由于使用的是“更改源处理器库”,它会自动在不同的客户端之间分配负载。 你不需要执行任何操作。 SDK 会解决所有的复杂性。 但是,若要使用自己的负载均衡器,可以针对自定义的分区策略实现 IParitionLoadBalancingStrategy。 实现 IPartitionProcessor - 用于对分区进行自定义处理更改。 使用 SDK 可以处理分区范围,但是,若要处理特定的分区键,则必须使用 SDK for SQL API。

  • 使用 Azure Functions

    Azure 函数是最后一个选项,也是最简单的选项。 我们建议使用此选项。 在 Azure Functions 应用中创建 Azure Cosmos DB 触发器时,请选择要连接到的 Azure Cosmos DB 集合,以及每当对该集合做出更改时要触发的函数。 观看使用 Azure 函数和更改源的幻灯片

    可以通过 Azure Functions 门户、Azure Cosmos DB 门户或编程方式创建触发器。 Visual Studio 和 VS Code 很好地支持编写 Azure 函数。 可以在桌面上编写和调试代码,然后单击一下鼠标部署函数。

更改源中文档的排序顺序是什么?

更改源文档按其修改时间排序。 只能按分区保证这种排序顺序。

对于多区域帐户,写入区域故障转移时更改源会发生什么情况? 更改源是否也故障转移? 更改源是否仍显示为连续状态,或者,故障转移是否导致更改源重置?

是的,每次执行手动故障转移操作时,更改源会正常工作,并且是连续的。

如果将文档的 TTL(生存时间)属性设置为 -1,更改源会将更改的数据保留多久?

更改源会永久保留。 如果数据未被删除,它会保留在更改源中。

如何将 Azure Functions 配置为从特定的区域读取数据,因为更改源默认已在所有读取区域提供?

目前无法将 Azure Functions 配置为从特定的区域读取数据。 设置任何 Azure Cosmos DB 绑定和触发器的首选区域时,Azure Functions 存储库中会出现一个 GitHub 问题。

Azure Functions 使用默认连接策略。 可以在 Azure Functions 中配置连接模式,默认情况下,Azure Functions 从写入区域读取数据,因此,最好是将 Azure Functions 共置在同一区域。

Azure Functions 中的默认批大小是什么?

每次调用 Azure Functions 时会读取 100 个文档。 但是,可在 function.json 文件中配置此数字。 下面是完整的配置选项列表。 如果在本地进行开发,请更新 local.settings.json 文件中的应用程序设置。

我正在监视一个集合并读取其更改源,但发现无法获取所有插入的文档,某些文档已缺失。 这是怎么回事?

请确保没有其他函数正在读取具有相同租用集合的相同集合。 我也遇到过这种情况,后来我认识到,缺少的文档已由另一个 Azure 函数处理,该函数也使用了相同的租约。

因此,如果创建多个 Azure Functions 来读取相同的更改源,则这些 Azure Functions 必须使用不同的租用集合,或使用“leasePrefix”配置来共享相同的集合。 但是,如果使用更改源处理器库,则可以启动函数的多个实例,SDK 会自动在不同的实例之间分割文档。

我的文档每隔一秒更新一次,但无法在侦听更改源的 Azure Functions 中获取所有更改。

Azure Functions 每隔 5 秒轮询更改源一次,因此,在 5 秒间隔内发生的任何更改都会丢失。 Azure Cosmos DB 只存储 5 秒间隔的一个版本,因此,你会获取文档的第 5 次更改。 但是,若要使用 5 秒以下的频率,每隔一秒轮询更改源,则可以配置轮询时间“feedPollTime”,具体请参阅 Azure Cosmos DB 绑定。 该时间以毫秒定义,默认值为 5000。 可以使用小于 1 秒的时间,但不建议使用,否则会消耗更多的 CPU。

我在 Mongo API 集合中插入了一个文档,但在更改源中获取该文档时,显示了一个不同的 ID 值。 原因是什么?

你的集合是 Mongo API 集合。 请记住,更改源是使用 SQL 客户端读取的,会将项序列化为 JSON 格式。 由于 JSON 格式化,MongoDB 客户端会体验到 BSON 格式文档和 JSON 格式更改源之间的不匹配。 显示的是 BSON 文档的 JSON 表示形式。 如果在 Mongo 帐户中使用二进制属性,它们会转换为 JSON。

是否可以通过某种方式来只控制更改源的更新,而不控制插入?

暂时不可以,但此功能已在规划中。 目前,可以在文档中添加更新软标记。

是否可以通过某种方式来获取更改源的删除?

目前更改源不会记录删除操作。 更改源在不断改进,此功能已在规划中。 目前,可以在文档中添加删除软标记。 在文档中添加名为“deleted”的属性并将其设置为“true”,并在文档中设置 TTL,以便自动删除文档。

是否可以读取历史文档的更改源(例如,5 年前添加的文档)?

是的,如果未删除该文档,则可以读取不超过集合原始时间的更改源。

是否可以使用 JavaScript 读取更改源?

是的,最近已添加对更改源的 Node.js SDK 初始支持。 可按以下示例中所示使用 JavaScript。在运行代码之前,请将 documentdb 模块更新到新版本:


var DocumentDBClient = require('documentdb').DocumentClient;
const host = "https://your_host:443/";
const masterKey = "your_master_key==";
const databaseId = "db";
const collectionId = "c1";
const dbLink = 'dbs/' + databaseId;
const collLink = dbLink + '/colls/' + collectionId;
var client = new DocumentDBClient(host, { masterKey: masterKey });
let options = {
    a_im: "Incremental feed",
    accessCondition: {
        type: "IfNoneMatch",        // Use: - empty condition (or remove accessCondition entirely) to start from beginning.
        //      - '*' to start from current.
        //      - specific etag value to start from specific continuation.
        condition: ""
    }
};

var query = client.readDocuments(collLink, options);
query.executeNext((err, results, headers) =&gt; {
    // Now we have headers.etag, which can be used in next readDocuments in accessCondition option.
    console.log(results);
    console.log(headers.etag);
    console.log(results.length);
    options.accessCondition = { type: "IfNoneMatch", condition: headers.etag };
    var query = client.readDocuments(collLink, options);
    query.executeNext((err, results, headers) =&gt; {
        console.log("next one:", results[0]);
    });
});<span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span>

是否可以使用 Java 读取更改源?

Github 存储库中提供了用于读取更改源的 Java 库。 但是,基于 .NET 库的 Java 库版本目前很少。 不久后,这两个库将会同步。

是否可对响应中获取的内部簿记使用 _etag、_lsn 或 _ts?

_etag 属于内部格式,请不要依赖它(不要分析它),因为它随时可能更改。 _ts 是修改或创建时间戳。 可以使用 _ts 进行时间顺序比较。 _lsn 是仅为更改源添加的批 ID,它表示存储中的事务 ID。 许多文档可能具有相同的 _lsn。 另请注意,FeedResponse 中的 ETag 不同于文档中显示的 _etag。 _etag 是用于实现并发性的内部标识符,它告知文档的版本,而 ETag 用于将源定序。

读取更改源是否会提高成本?

需要支付消耗的 RU 费用,也就是说,将数据移入和移出 Azure Cosmos DB 集合始终会消耗 RU。 用户将根据租用集合支付消耗的 RU 费用。

多个 Azure Functions 是否可以读取一个集合的更改源?

是的。 多个 Azure Functions 可以读取同一集合的更改源。 但是,需要为 Azure Functions 定义不同的 leaseCollectionPrefix。

是否应将租用集合分区?

不需要,租用集合可以固定。 不需要分区的租用集合,而且目前不支持。

如果我使用 Azure Functions 处理更改源(例如,包含 10 个文档的批),在处理第 7 个文档时遇到错误, 在这种情况下,最后 3 个文档不会得到处理。如何在下一个源中从失败的文档 (即第 7 个文档)开始处理?

若要处理错误,建议的模式是使用 try-catch 块包装代码。 捕获错误,将该文档放入队列(死信),然后定义逻辑来处理已生成错误的文档。 如果批中的文档数有 200 个,并且只有一个文档失败,则使用此方法就无需丢弃整个批。

如果出错,则不应将检查点回退到开始位置,否则可以从更改源获取这些文档。 请记住,更改源保留文档的最后一个最终快照,因此,可能会丢失文档中的前一个快照。 更改源只保留文档的最后一个版本,在不同的版本之间,其他进程可能会更改文档。

在不断修复代码的过程中,你很快就会发现,死信队列中没有任何文档。 Azure Functions 由更改源系统自动调用,检查点等内容由 Azure 函数在内部维护。 若要回滚检查点并控制其各个方面,应考虑使用更改源处理器 SDK。

后续步骤

有关使用更改源处理器库的详细信息,请参阅以下资源:

若要详细了解如何通过 SDK 使用更改源,请参阅以下资源: