从更改源处理器库迁移到 Azure Cosmos DB .NET V3 SDKMigrate from the change feed processor library to the Azure Cosmos DB .NET V3 SDK

本文介绍了将使用更改源处理器库的现有应用程序代码迁移到 .NET SDK 最新版本(也称为 .NET V3 SDK)中的更改源功能所需的步骤。This article describes the required steps to migrate an existing application's code that uses the change feed processor library to the change feed feature in the latest version of the .NET SDK (also referred as .NET V3 SDK).

所需的代码更改Required code changes

.NET V3 SDK 有几项中断性变更,以下是迁移应用程序的关键步骤:The .NET V3 SDK has several breaking changes, the following are the key steps to migrate your application:

  1. DocumentCollectionInfo 实例转换为受监视容器和租约容器的 Container 引用。Convert the DocumentCollectionInfo instances into Container references for the monitored and leases containers.
  2. 使用 WithProcessorOptions 的自定义应更新为对时间间隔使用 WithLeaseConfigurationWithPollInterval、对WithStartTime开始时间使用 以及使用 WithMaxItems 来定义最大项计数。Customizations that use WithProcessorOptions should be updated to use WithLeaseConfiguration and WithPollInterval for intervals, WithStartTime for start time, and WithMaxItems to define the maximum item count.
  3. processorName 上的 GetChangeFeedProcessorBuilder 设置为与 ChangeFeedProcessorOptions.LeasePrefix上配置的值匹配,否则使用 string.EmptySet the processorName on GetChangeFeedProcessorBuilder to match the value configured on ChangeFeedProcessorOptions.LeasePrefix, or use string.Empty otherwise.
  4. 更改将不再作为 IReadOnlyList<Document> 传递,而是作为 IReadOnlyCollection<T>,其中 T 是需要定义的类型,任何基项类都不再存在。The changes are no longer delivered as a IReadOnlyList<Document>, instead, it's a IReadOnlyCollection<T> where T is a type you need to define, there is no base item class anymore.
  5. 若要处理这些更改,你不再需要一个实现,而是需要定义一个委托To handle the changes, you no longer need an implementation, instead you need to define a delegate. 委托可以是静态函数,或者,如果需要跨执行维护状态,还可以创建自己的类并传递实例方法作为委托。The delegate can be a static Function or, if you need to maintain state across executions, you can create your own class and pass an instance method as delegate.

例如,如果用于生成更改源处理器的原始代码如下所示:For example, if the original code to build the change feed processor looks as follows:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix" })
     .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions()
     {
         MaxItemCount = 10,
         FeedPollDelay = TimeSpan.FromSeconds(1)
     })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

迁移的代码将如下所示:The migrated code will look like:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

而委托可以是静态方法:And the delegate, can be a static method:

static async Task HandleChangesAsync(IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

状态和租约容器State and lease container

与更改源处理器库类似,.NET V3 SDK 中的更改源功能使用租约容器来存储状态。Similar to the change feed processor library, the change feed feature in .NET V3 SDK uses a lease container to store the state. 但是,架构是不同的。However, the schemas are different.

SDK V3 更改源处理器将在首次执行迁移的应用程序代码时自动检测任何旧的库状态并将其迁移到新的架构。The SDK V3 change feed processor will detect any old library state and migrate it to the new schema automatically upon the first execution of the migrated application code.

可以使用旧代码安全地停止应用程序,将代码迁移到新版本,启动已迁移的应用程序,而在应用程序停止时所发生的任何更改都将由新版本选取并处理。You can safely stop the application using the old code, migrate the code to the new version, start the migrated application, and any changes that happened while the application was stopped, will be picked up and processed by the new version.

其他资源Additional resources

后续步骤Next steps

现在,可以通过以下文章继续详细了解更改源处理器:You can now proceed to learn more about change feed processor in the following articles: