Azure Cosmos DB 中的更改源拉取模型

适用范围: NoSQL

使用更改源拉取模型可以按自己的节奏使用 Azure Cosmos DB 更改源。 与更改源处理器类似,你可以使用更改源拉取模型来并行处理多个更改源使用者之间的更改。

对比更改源处理器

许多情况下,既可以使用更改源处理器又可以使用更改源拉取模型来处理更改源。 拉取模型的延续令牌和更改源处理器的租约容器都可作为更改源中最后处理项(或一批项)的书签。

但是,延续令牌无法转换为租约(反之亦然)。

注意

在大多数情况下,如果需要从更改源中读取数据,最简单的方法是使用更改源处理器

以下情况应考虑使用拉取模型:

  • 读取特定分区键的更改。
  • 控制客户端接收要处理的更改的速度。
  • 一次性读取更改源中现有的数据(例如,用于数据迁移)。

下文阐述了更改源拉取模型与更改源处理器之间的几点关键差异:

功能 更改源处理器 更改源请求模型
持续追踪更改源处理的当前点 租赁(存储在 Azure Cosmos DB 容器中) 继续标记(存储在内存中或手动进行保存)
能够重播过去的更改 是(在使用推送模型的情况下) 是(在使用拉取模型的情况下)
轮询将来的更改 基于用户指定的 WithPollInterval 值自动检查更改 手动
未出现新变化的行为 自动等待 WithPollInterval 值,然后重新检查 必须检查状态并手动重新检查
处理整个容器的更改 是的,自动并行处理从同一容器使用更改的多个线程和机器 支持,请使用 FeedRange 来手动并行处理
仅处理单个分区键的更改 不支持

注意

与使用更改源处理器进行读取不同,当使用拉取模型时,如果未出现新变化,则需要显式处理。

使用拉取模型

若要使用拉取模型处理更改源,请创建一个 FeedIterator 实例。 在最初创建 FeedIterator 时,必须指定所需的 ChangeFeedStartFrom 值,该值由读取更改的起始位置和所需的 FeedRange 值组成。 FeedRange 是分区键值范围,指定根据特定 FeedIterator 可从更改源中读取的项。 此外,必须为所需的更改处理模式指定必需的 ChangeFeedMode 值:最新版本所有版本和删除模式。 使用 ChangeFeedMode.LatestVersionChangeFeedMode.AllVersionsAndDeletes 指示读取更改源的模式。 使用所有版本和删除模式时,必须选择从 Now() 值或从特定延续令牌开始的更改源。

你还可以选择指定 ChangeFeedRequestOptions 以设置 PageSizeHint。 设置后,此属性会对每页收到的项目的最大数目进行设置。 如果受监视集合中的操作通过存储过程执行,则在从更改源读取项时,会保留事务范围。 因此,收到的项数可能高于指定的值,通过同一事务更改的项会作为某一原子批的一部分返回。

以下示例以最新版本模式获取一个返回实体对象(在本例中为 User 对象)的 FeedIterator

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

提示

在低于 3.34.0 的版本中,可以通过设置 ChangeFeedMode.Incremental 来使用最新版本模式。 IncrementalLatestVersion 是指更改源的最新版本模式以及使用任一模式都会看到相同行为的应用程序。

所有版本和删除模式目前为预览版,可以在预览版 .NET SDK 3.32.0-preview 或更高版本中使用。 以下示例显示从所有版本获取和返回 User 对象的删除模式中获取 FeedIterator

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

注意

在最新版本模式下,你将收到代表被更改项的对象和一些附加元数据。 所有版本和删除模式会返回一个不同的数据模型。 有关详细信息,请参阅《分析响应对象》。

你可获取最新版本模式的示例,或所有版本和删除模式的示例。

通过流使用更改源

两种更改源模式的 FeedIterator 都有两种选项。 除了返回实体对象的示例之外,还可以获取提供 Stream 支持的响应。 利用流,你可以在不先将数据反序列化的情况下读取数据,从而节省客户端资源。

以下示例展示如何在最新版本模式下获取返回 StreamFeedIterator

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

使用整个容器的更改

如果没有向 FeedIterator 提供 FeedRange,则可以按自己的节奏处理整个容器的更改源。 以下示例使用最新版本模式读取从当前时间开始的所有更改:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

由于更改源实际上是包含所有后续写入和更新项的无穷列表,因此 HasMoreResults 的值始终为 true。 在尝试读取更改源时,如果未发生新的更改,你会收到 NotModified 状态的响应。 在上述示例中,处理方式是先等待 5 秒钟,然后重新检查更改。

使用分区键的更改

在某些情况下,你可能希望仅处理特定分区键的更改。 可以获取特定分区键的 FeedIterator,并采用处理整个容器的方式来处理更改。

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

使用 FeedRange 实现并行化

更改源处理器中,工作自动分布到多个使用者。 在更改源拉取模型中,可以使用 FeedRange 来并行处理更改源。 FeedRange 表示分区键值的一个范围。

下面的示例展示了如何获取容器的范围列表:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

获取容器的 FeedRange 值列表时,每个物理分区都会获得一个 FeedRange

使用 FeedRange 可以创建一个 FeedIterator,以便跨多个计算机或线程并行处理更改源。 上面的示例展示了如何获取整个容器或某一个分区键的 FeedIterator,与之不同的是,你可以使用 FeedRanges 来获取多个 FeedIterator,这样就可以并行处理更改源。

若要使用 FeedRange,需要通过一个业务流程协调程序进程来获取 FeedRange 并将其分发到那些计算机。 此分发可能是:

  • 使用 FeedRange.ToJsonString 并分发此字符串值。 使用者可以将此值用于 FeedRange.FromJsonString
  • 如果分发正在进行,则传递 FeedRange 对象引用。

下面的示例展示了如何使用两个并行读取的独立虚构计算机从容器的更改源开头进行读取:

计算机 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

计算机 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

保存延续令牌

可以通过获取延续令牌来保存 FeedIterator 的位置。 延续令牌是字符串值,它会跟踪 FeedIterator 的上次处理的更改,并且允许 FeedIterator 稍后在此点进行恢复。 延续令牌(如果已指定)优先于开始时间和“从头开始”值。 以下代码读取自容器创建以来生成的更改源。 当没有更多更改可用时,它会保留一个继续标记,以便以后可以继续使用更改源。

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

使用最新版本模式时, 只要 Azure Cosmos DB 容器存在,FeedIterator 延续令牌就不会过期。 使用所有版本和删除模式时, 只要更改发生在连续备份的保留窗口内,FeedIterator 延续令牌就有效。

后续步骤