从更改源处理器库迁移到 Azure Cosmos DB .NET V3 SDK

适用范围: NoSQL

本文介绍了将使用更改源处理器库的现有应用程序代码迁移到 .NET SDK 最新版本(也称为 .NET V3 SDK)中的更改源功能所需的步骤。

所需的代码更改

.NET V3 SDK 有几项中断性变更,以下是迁移应用程序的关键步骤:

  1. DocumentCollectionInfo 实例转换为受监视容器和租约容器的 Container 引用。
  2. 使用 WithProcessorOptions 的自定义应更新为对时间间隔使用 WithLeaseConfigurationWithPollInterval、对开始时间使用 WithStartTime 以及使用 WithMaxItems 来定义最大项计数。
  3. GetChangeFeedProcessorBuilder 上的 processorName 设置为与 ChangeFeedProcessorOptions.LeasePrefix上配置的值匹配,否则使用 string.Empty
  4. 更改将不再作为 IReadOnlyList<Document> 传递,而是作为 IReadOnlyCollection<T>,其中 T 是需要定义的类型,任何基项类都不再存在。
  5. 若要处理更改,不再需要实现 IChangeFeedObserver,而是需要定义委托。 委托可以是静态函数,或者,如果需要跨执行维护状态,还可以创建自己的类并传递实例方法作为委托。

例如,如果用于生成更改源处理器的原始代码如下所示:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

迁移的代码将如下所示:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

对于委托,可以使用静态方法来接收事件。 如果使用的是 IChangeFeedObserverContext 中的信息,可以迁移以使用 ChangeFeedProcessorContext

  • 可以使用 ChangeFeedProcessorContext.LeaseToken,而不是使用 IChangeFeedObserverContext.PartitionKeyRangeId
  • 可以使用 ChangeFeedProcessorContext.Headers,而不是使用 IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics 包含有关故障排除的请求延迟的详细信息
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

健康状况事件和可观测性

如果以前使用过 IHealthMonitor 或利用过 IChangeFeedObserver.OpenAsyncIChangeFeedObserver.CloseAsync,请使用通知 API

  • IChangeFeedObserver.OpenAsync 可以替换为 WithLeaseAcquireNotification
  • IChangeFeedObserver.CloseAsync 可以替换为 WithLeaseReleaseNotification
  • IHealthMonitor.InspectAsync 可以替换为 WithErrorNotification

状态和租约容器

与更改源处理器库类似,.NET V3 SDK 中的更改源功能使用租约容器来存储状态。 但是,架构是不同的。

SDK V3 更改源处理器将在首次执行迁移的应用程序代码时自动检测任何旧的库状态并将其迁移到新的架构。

可以使用旧代码安全地停止应用程序,将代码迁移到新版本,启动已迁移的应用程序,而在应用程序停止时所发生的任何更改都将由新版本选取并处理。

其他资源

后续步骤

现在,可以通过以下文章继续详细了解更改源处理器: