Azure Cosmos DB 更改源处理器
适用范围: NoSQL
更改源处理器是 Azure Cosmos DB .NET V3 和 Java V4 SDK 的一部分。 它简化了读取更改源的过程,可有效地在多个使用者之间分布事件处理。
使用更改源处理器的主要优点是其容错设计,可保证更改源中的所有事件传递“至少一次”。
支持的 SDK
.Net V3 | Java | Node.JS | Python |
---|---|---|---|
✓ | ✓ | ✕ | ✕ |
更改源处理器的组件
更改源处理器有四个主要组件:
监视的容器:监视的容器具有用于生成更改源的数据。 对受监视的容器的任何插入和更新都会反映在容器的更改源中。
租用容器:租用容器充当状态存储并协调多个辅助角色之间的更改源的处理。 租用容器可以与受监视的容器存储在同一帐户中,也可以存储在单独的帐户中。
计算实例:计算实例承载更改源处理器以便侦听更改。 它可以由虚拟机 (VM)、Kubernetes Pod、Azure 应用服务实例或实际的物理计算机来表示,具体取决于平台。 计算实例具有唯一标识符,在本文中称为“实例名称”。
委托:委托是用于定义开发人员要对更改源处理器读取的每一批更改执行何种操作的代码。
若要进一步了解更改源处理器的四个元素是如何协同工作的,请看下图中的一个示例。 受监视的容器会存储项,并将“City”用作分区键。 分区键值分布在包含项的范围内(每个范围表示一个物理分区)。
图中显示了两个计算实例,更改源处理器向每个实例分配不同的范围,以最大程度地提高计算分布率。 每个实例拥有一个不同的唯一名称。
每个区域都是并行读取的。 范围进程的维护通过租用文档独立于租用容器中的其他范围。 租用的组合表示更改源处理器的当前状态。
实现更改源处理器
.NET 中的更改源处理器适用于最新版本模式和所有版本与删除模式。 所有版本与删除模式以预览版提供,从版本 3.40.0-preview.0
开始支持用于更改源处理器。 两种模式的入口点始终是受监视的容器。
若要使用最新版本模式进行读取,请在 Container
实例中调用 GetChangeFeedProcessorBuilder
:
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Fluent;
using Microsoft.Extensions.Configuration;
namespace ChangeFeedSample
{
class Program
{
static async Task Main(string[] args)
{
IConfiguration configuration = BuildConfiguration();
CosmosClient cosmosClient = BuildCosmosClient(configuration);
await InitializeContainersAsync(cosmosClient, configuration);
ChangeFeedProcessor processor = await StartChangeFeedProcessorAsync(cosmosClient, configuration);
await GenerateItemsAsync(cosmosClient, processor, configuration);
}
// <Delegate>
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
// </Delegate>
/// <summary>
/// Create required containers for the sample.
/// Change Feed processing requires a source container to read the Change Feed from, and a container to store the state on, called leases.
/// </summary>
private static async Task InitializeContainersAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
if (string.IsNullOrEmpty(databaseName)
|| string.IsNullOrEmpty(sourceContainerName)
|| string.IsNullOrEmpty(leaseContainerName))
{
throw new ArgumentNullException("'SourceDatabaseName', 'SourceContainerName', and 'LeasesContainerName' settings are required. Verify your configuration.");
}
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName);
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(sourceContainerName, "/id"));
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(leaseContainerName, "/id"));
}
// <DefineProcessor>
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
// </DefineProcessor>
/// <summary>
/// Generate sample items based on user input.
/// </summary>
private static async Task GenerateItemsAsync(
CosmosClient cosmosClient,
ChangeFeedProcessor changeFeedProcessor,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
Container sourceContainer = cosmosClient.GetContainer(databaseName, sourceContainerName);
while(true)
{
Console.WriteLine("Enter a number of items to insert in the container or 'exit' to stop:");
string command = Console.ReadLine();
if ("exit".Equals(command, StringComparison.InvariantCultureIgnoreCase))
{
Console.WriteLine();
break;
}
if (int.TryParse(command, out int itemsToInsert))
{
Console.WriteLine($"Generating {itemsToInsert} items...");
for (int i = 0; i < itemsToInsert; i++)
{
string id = Guid.NewGuid().ToString();
await sourceContainer.CreateItemAsync<ToDoItem>(
new ToDoItem()
{
id = id,
creationTime = DateTime.UtcNow
},
new PartitionKey(id));
}
}
}
Console.WriteLine("Stopping Change Feed Processor...");
await changeFeedProcessor.StopAsync();
Console.WriteLine("Stopped Change Feed Processor.");
}
private static IConfiguration BuildConfiguration()
{
return new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
.Build();
}
private static CosmosClient BuildCosmosClient(IConfiguration configuration)
{
if (string.IsNullOrEmpty(configuration["ConnectionString"]) || "<Your-Connection-String>".Equals(configuration["ConnectionString"]))
{
throw new ArgumentNullException("Missing 'ConnectionString' setting in configuration.");
}
return new CosmosClientBuilder(configuration["ConnectionString"])
.Build();
}
}
}
若要使用所有版本与删除模式进行读取,请从 Container
实例调用 GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes
:
namespace ChangeFeedAllVersionsAndDeletes
{
using System.Net;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Configuration;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://learn.microsoft.com/azure/cosmos-db/create-cosmosdb-resources-portal
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
// Sample - demonstrates common Change Feed operations using All Versions and Deletes mode
//
// 1. Listening for changes that happen after a Change Feed Processor is started.
//
// 2. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification
//
// 3. Generate Estimator metrics to expose current Change Feed Processor progress on demand
//
// 4. Error handling and advanced logging
//-----------------------------------------------------------------------------------------------------------
internal class Program
{
private static readonly string databaseName = "changefeed-db";
private static readonly string monitoredContainerPrefix = "monitored-";
private static readonly string leasesContainer = "leases";
private static readonly string partitionKeyPath = "/id";
static async Task Main(string[] args)
{
try
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddJsonFile("appSettings.json")
.Build();
string endpoint = configuration["EndPointUrl"];
if (string.IsNullOrEmpty(endpoint))
{
throw new ArgumentNullException("Please specify a valid endpoint in the appSettings.json");
}
string authKey = configuration["AuthorizationKey"];
if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key"))
{
throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json");
}
using (CosmosClient client = new CosmosClient(endpoint, authKey))
{
Console.WriteLine($"\n1. Listening for changes that happen after a Change Feed Processor is started.");
await Program.RunBasicChangeFeed(monitoredContainerPrefix + "changefeed-basic", client);
Console.WriteLine($"\n2. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification.");
await Program.RunEstimatorChangeFeed(monitoredContainerPrefix + "changefeed-estimator", client);
Console.WriteLine($"\n3. Generate Estimator metrics to expose current Change Feed Processor progress on demand.");
await Program.RunEstimatorPullChangeFeed(monitoredContainerPrefix + "changefeed-estimator-detailed", client);
Console.WriteLine($"\n4. Error handling and advanced logging.");
await Program.RunWithNotifications(monitoredContainerPrefix + "changefeed-logging", client);
}
}
finally
{
Console.WriteLine("End of demo, press any key to exit.");
Console.ReadKey();
}
}
/// <summary>
/// Basic change feed functionality using all versions and deletes mode.
/// </summary>
/// <remarks>
/// When StartAsync is called, the Change Feed Processor starts an initialization process that can take several milliseconds,
/// in which it starts connections and initializes locks in the leases container.
/// </remarks>
public static async Task RunBasicChangeFeed(
string containerName,
CosmosClient client)
{
await Program.InitializeAsync(containerName, client);
// <BasicInitialization>
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </BasicInitialization>
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, monitoredContainer);
Console.WriteLine("Updating 10 items that will be picked up by the delegate...");
await Program.UpdateItems(10, monitoredContainer);
Console.WriteLine("Deleting 10 items that will be picked up by the delegate...");
await Program.DeleteItems(10, monitoredContainer);
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// </remarks>
public static async Task RunEstimatorChangeFeed(
string containerName,
CosmosClient client)
{
await Program.InitializeAsync(containerName, client);
// <StartProcessorEstimator>
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedEstimator", onChangesDelegate: Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimator>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait random time for the delegate to output all messages
await Task.Delay(1000);
// <StartEstimator>
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
// </StartEstimator>
Console.WriteLine($"Starting Change Feed Estimator...");
await changeFeedEstimator.StartAsync();
Console.WriteLine("Change Feed Estimator started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.GenerateItems(10, monitoredContainer);
Console.WriteLine("Updating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.UpdateItems(10, monitoredContainer);
Console.WriteLine("Deleting 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.DeleteItems(10, monitoredContainer);
// Wait random time for the delegate to output all messages
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
await changeFeedEstimator.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator with the detailed iterator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// The iterator exposes detailed, per-lease, information on estimation and ownership.
/// </remarks>
public static async Task RunEstimatorPullChangeFeed(
string containerName,
CosmosClient client)
{
await Program.InitializeAsync(containerName, client);
// <StartProcessorEstimatorDetailed>
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>("changeFeedEstimatorPull", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimatorDetailed>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait some seconds for instances to acquire leases
await Task.Delay(5000);
Console.WriteLine("Generating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.GenerateItems(10, monitoredContainer);
Console.WriteLine("Updating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.UpdateItems(10, monitoredContainer);
Console.WriteLine("Deleting 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.DeleteItems(10, monitoredContainer);
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
// <StartEstimatorDetailed>
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
// </StartEstimatorDetailed>
// <GetIteratorEstimatorDetailed>
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
// </GetIteratorEstimatorDetailed>
Console.WriteLine("Stopping processor.");
await changeFeedProcessor.StopAsync();
// Wait for processor to shutdown completely so the next items generate lag
await Task.Delay(7500);
Console.WriteLine("Generating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.GenerateItems(10, monitoredContainer);
Console.WriteLine("Updating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.UpdateItems(10, monitoredContainer);
Console.WriteLine("Deleting 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.DeleteItems(10, monitoredContainer);
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIteratorAfter = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIteratorAfter.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIteratorAfter.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
// Host ownership should be empty as we have already stopped the estimator
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
}
/// <summary>
/// Setup notification APIs for events.
/// </summary>
public static async Task RunWithNotifications(
string containerName,
CosmosClient client)
{
await Program.InitializeAsync(containerName, client);
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim();
Container.ChangeFeedHandler<ChangeFeedItem<ToDoItem>> handleChanges = (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken) =>
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken} but throwing an exception to bubble to notifications.");
manualResetEventSlim.Set();
throw new Exception("This is an unhandled exception from inside the delegate");
};
// <StartWithNotifications>
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartWithNotifications>
Console.WriteLine($"Starting Change Feed Processor with logging enabled...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.GenerateItems(10, monitoredContainer);
Console.WriteLine("Updating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.UpdateItems(10, monitoredContainer);
Console.WriteLine("Deleting 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.DeleteItems(10, monitoredContainer);
// Wait random time for the delegate to output all messages after initialization is done
manualResetEventSlim.Wait();
await Task.Delay(1000);
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
// <Delegate>
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ChangeFeedItem<ToDoItem> item in changes)
{
if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Previous.id}.");
}
else
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
}
// Simulate work
await Task.Delay(1);
}
}
// </Delegate>
/// <summary>
/// The delegate for the Estimator receives a numeric representation of items pending to be read.
/// This is an estimate only and is not an exact count of outstanding items.
/// </summary>
// <EstimationDelegate>
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
// </EstimationDelegate>
private static async Task InitializeAsync(string containerName, CosmosClient client)
{
Database database = await client.CreateDatabaseIfNotExistsAsync(id: Program.databaseName, throughput: 1000);
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(containerName, Program.partitionKeyPath));
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(Program.leasesContainer, Program.partitionKeyPath));
}
private static async Task GenerateItems(int itemsToInsert, Container container)
{
await Task.Delay(500);
for (int i = 0; i < itemsToInsert; i++)
{
await container.CreateItemAsync<ToDoItem>(
new ToDoItem()
{
id = i.ToString(),
CreationTime = DateTime.UtcNow
},
new PartitionKey(i.ToString()));
}
}
private static async Task UpdateItems(int itemsToUpdate, Container container)
{
await Task.Delay(500);
for (int i = 0; i < itemsToUpdate; i++)
{
await container.ReplaceItemAsync<ToDoItem>(
new ToDoItem()
{
id = i.ToString(),
CreationTime = DateTime.UtcNow,
Status = "updated"
},
i.ToString());
}
}
private static async Task DeleteItems(int itemsToDelete, Container container)
{
await Task.Delay(500);
for (int i = 0; i < itemsToDelete; i++)
{
await container.DeleteItemAsync<ToDoItem>(i.ToString(), new PartitionKey(i.ToString()));
}
}
}
// <Model>
public class ToDoItem
{
public string? id { get; set; }
public DateTime CreationTime { get; set; }
public string? Status { get; set; }
}
// </Model>
}
对于这两种模式,第一个参数是描述此处理器的目标的非重复名称。 第二个名称是处理更改的委托实现。
下面是最新版本模式的委托示例:
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Fluent;
using Microsoft.Extensions.Configuration;
namespace ChangeFeedSample
{
class Program
{
static async Task Main(string[] args)
{
IConfiguration configuration = BuildConfiguration();
CosmosClient cosmosClient = BuildCosmosClient(configuration);
await InitializeContainersAsync(cosmosClient, configuration);
ChangeFeedProcessor processor = await StartChangeFeedProcessorAsync(cosmosClient, configuration);
await GenerateItemsAsync(cosmosClient, processor, configuration);
}
// <Delegate>
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
// </Delegate>
/// <summary>
/// Create required containers for the sample.
/// Change Feed processing requires a source container to read the Change Feed from, and a container to store the state on, called leases.
/// </summary>
private static async Task InitializeContainersAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
if (string.IsNullOrEmpty(databaseName)
|| string.IsNullOrEmpty(sourceContainerName)
|| string.IsNullOrEmpty(leaseContainerName))
{
throw new ArgumentNullException("'SourceDatabaseName', 'SourceContainerName', and 'LeasesContainerName' settings are required. Verify your configuration.");
}
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName);
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(sourceContainerName, "/id"));
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(leaseContainerName, "/id"));
}
// <DefineProcessor>
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
// </DefineProcessor>
/// <summary>
/// Generate sample items based on user input.
/// </summary>
private static async Task GenerateItemsAsync(
CosmosClient cosmosClient,
ChangeFeedProcessor changeFeedProcessor,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
Container sourceContainer = cosmosClient.GetContainer(databaseName, sourceContainerName);
while(true)
{
Console.WriteLine("Enter a number of items to insert in the container or 'exit' to stop:");
string command = Console.ReadLine();
if ("exit".Equals(command, StringComparison.InvariantCultureIgnoreCase))
{
Console.WriteLine();
break;
}
if (int.TryParse(command, out int itemsToInsert))
{
Console.WriteLine($"Generating {itemsToInsert} items...");
for (int i = 0; i < itemsToInsert; i++)
{
string id = Guid.NewGuid().ToString();
await sourceContainer.CreateItemAsync<ToDoItem>(
new ToDoItem()
{
id = id,
creationTime = DateTime.UtcNow
},
new PartitionKey(id));
}
}
}
Console.WriteLine("Stopping Change Feed Processor...");
await changeFeedProcessor.StopAsync();
Console.WriteLine("Stopped Change Feed Processor.");
}
private static IConfiguration BuildConfiguration()
{
return new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
.Build();
}
private static CosmosClient BuildCosmosClient(IConfiguration configuration)
{
if (string.IsNullOrEmpty(configuration["ConnectionString"]) || "<Your-Connection-String>".Equals(configuration["ConnectionString"]))
{
throw new ArgumentNullException("Missing 'ConnectionString' setting in configuration.");
}
return new CosmosClientBuilder(configuration["ConnectionString"])
.Build();
}
}
}
然后,使用 WithInstanceName
定义计算实例名称或唯一标识符。 对于要部署的每个计算实例,计算实例名称应该是唯一且不同的。 使用 WithLeaseContainer
设置容器以保持租约状态。
调用 Build
可让你获得可通过调用 StartAsync
启动的处理器实例。
处理生命周期
主机实例的正常生命周期为:
- 读取更改源。
- 如果没有发生更改,请在一段预定义的时间内保持睡眠状态(可在生成器中使用
WithPollInterval
进行自定义),然后转到 #1。 - 如果发生了更改,请将其发送给委托。
- 委托成功地处理更改后,以最新的处理时间点更新租用存储,然后转到 #1。
错误处理。
更改源处理器可在发生用户代码错误后复原。 如果委托实现发生未经处理的异常(步骤 #4),处理那一批特定更改的线程将停止,并最终创建新的线程。 新线程将检查租约存储为该分区键值范围保存的最新时间点。 新线程将从该时间点重启,从而有效地向委托发送同一批更改。 此行为将一直持续到委托正确处理完更改为止,这也是更改源处理器能够提供“至少一次”保证的原因。
注意
当仅有一个方案时,不会重试一批更改。 如果第一次执行委托时发生故障,则租用存储没有以前保存的状态可在重试时使用。 在这种情况下,重试将使用初始启动配置,该配置可能包含也可能不包含最后一批次。
若要防止更改源处理器不断地重试同一批更改,应在委托代码中添加逻辑,以便在出现异常时将文档写入出错的消息队列中。 此设计可确保你可以跟踪未处理的更改,同时仍然能够继续处理将来的更改。 出错消息队列可能是另一个 Azure Cosmos DB 容器。 确切的数据存储并不重要。 只需保留未处理的更改。
还可使用更改源估算器在更改源处理器实例读取更改源时监视其进度,或使用生命周期通知来检测潜在故障。
生命周期通知
可以将更改源处理器连接到其生命周期中的任何相关事件。 可以选择接收其中一个或全部事件的通知。 建议至少注册错误通知:
- 为
WithLeaseAcquireNotification
注册处理程序,以便在当前主机获得租约开始处理它时收到通知。 - 为
WithLeaseReleaseNotification
注册处理程序,以便在当前主机释放租约并停止处理它时收到通知。 - 为
WithErrorNotification
注册处理程序,以便当前主机在处理过程中遇到异常时收到通知。 你需要能够区分源是用户委托(未经处理的异常),还是处理器在尝试访问受监视容器时遇到的错误(例如网络问题)。
生命周期通知在两种更改源模式下均可用。 下面是最新版本模式下的生命周期通知示例:
namespace Cosmos.Samples.ChangeFeed
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Extensions.Configuration;
using ChangeFeedProcessorLibrary = Microsoft.Azure.Documents.ChangeFeedProcessor;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://learn.microsoft.com/azure/cosmos-db/create-cosmosdb-resources-portal
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
// Sample - demonstrates common Change Feed operations
//
// 1. Listening for changes that happen after a Change Feed Processor is started.
//
// 2. Listening for changes that happen after a certain point in time.
//
// 3. Listening for changes that happen since the container was created.
//
// 4. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification
//
// 5. Generate Estimator metrics to expose current Change Feed Processor progress on demand
//
// 6. Code migration template from existing Change Feed Processor library V2
//
// 7. Error handling and advanced logging
//-----------------------------------------------------------------------------------------------------------
class Program
{
private static readonly string monitoredContainer = "monitored";
private static readonly string leasesContainer = "leases";
private static readonly string partitionKeyPath = "/id";
// Async main requires c# 7.1 which is set in the csproj with the LangVersion attribute
static async Task Main(string[] args)
{
try
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddJsonFile("appSettings.json")
.Build();
string endpoint = configuration["EndPointUrl"];
if (string.IsNullOrEmpty(endpoint))
{
throw new ArgumentNullException("Please specify a valid endpoint in the appSettings.json");
}
string authKey = configuration["AuthorizationKey"];
if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key"))
{
throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json");
}
using (CosmosClient client = new CosmosClient(endpoint, authKey))
{
Console.WriteLine($"\n1. Listening for changes that happen after a Change Feed Processor is started.");
await Program.RunBasicChangeFeed("changefeed-basic", client);
Console.WriteLine($"\n2. Listening for changes that happen after a certain point in time.");
await Program.RunStartTimeChangeFeed("changefeed-time", client);
Console.WriteLine($"\n3. Listening for changes that happen since the container was created.");
await Program.RunStartFromBeginningChangeFeed("changefeed-beginning", client);
Console.WriteLine($"\n4. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification.");
await Program.RunEstimatorChangeFeed("changefeed-estimator", client);
Console.WriteLine($"\n5. Generate Estimator metrics to expose current Change Feed Processor progress on demand.");
await Program.RunEstimatorPullChangeFeed("changefeed-estimator-detailed", client);
Console.WriteLine($"\n6. Code migration template from existing Change Feed Processor library V2.");
await Program.RunMigrationSample("changefeed-migration", client, configuration);
Console.WriteLine($"\n7. Error handling and advanced logging.");
await Program.RunWithNotifications("changefeed-logging", client);
}
}
finally
{
Console.WriteLine("End of demo, press any key to exit.");
Console.ReadKey();
}
}
/// <summary>
/// Basic change feed functionality.
/// </summary>
/// <remarks>
/// When StartAsync is called, the Change Feed Processor starts an initialization process that can take several milliseconds,
/// in which it starts connections and initializes locks in the leases container.
/// </remarks>
public static async Task RunBasicChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <BasicInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBasic", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </BasicInitialization>
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, monitoredContainer);
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// StartTime will make the Change Feed Processor start processing changes at a particular point in time, all previous changes are ignored.
/// </summary>
/// <remarks>
/// StartTime only works if the leaseContainer is empty or contains no leases for the particular processor name.
/// </remarks>
public static async Task RunStartTimeChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 5 items that will not be picked up.");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine($"Items generated at {DateTime.UtcNow}");
// Generate a future point in time
await Task.Delay(2000);
DateTime particularPointInTime = DateTime.UtcNow;
Console.WriteLine("Generating 5 items that will be picked up by the delegate...");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
// <TimeInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(particularPointInTime)
.Build();
// </TimeInitialization>
Console.WriteLine($"Starting Change Feed Processor with changes after {particularPointInTime}...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Reading the Change Feed since the beginning of time.
/// </summary>
/// <remarks>
/// StartTime only works if the leaseContainer is empty or contains no leases for the particular processor name.
/// </remarks>
public static async Task RunStartFromBeginningChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// <StartFromBeginningInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
// </StartFromBeginningInitialization>
Console.WriteLine($"Starting Change Feed Processor with changes since the beginning...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// </remarks>
public static async Task RunEstimatorChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <StartProcessorEstimator>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimator>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
// <StartEstimator>
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
// </StartEstimator>
Console.WriteLine($"Starting Change Feed Estimator...");
await changeFeedEstimator.StartAsync();
Console.WriteLine("Change Feed Estimator started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
await changeFeedEstimator.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator with the detailed iterator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// The iterator exposes detailed, per-lease, information on estimation and ownership.
/// </remarks>
public static async Task RunEstimatorPullChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <StartProcessorEstimatorDetailed>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimatorDetailed>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait some seconds for instances to acquire leases
await Task.Delay(5000);
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
// <StartEstimatorDetailed>
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
// </StartEstimatorDetailed>
// <GetIteratorEstimatorDetailed>
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
// </GetIteratorEstimatorDetailed>
Console.WriteLine("Stopping processor to show how the lag increases if no processing is happening.");
await changeFeedProcessor.StopAsync();
// Wait for processor to shutdown completely so the next items generate lag
await Task.Delay(5000);
Console.WriteLine("Generating 10 items that will be seen by the Estimator...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIteratorAfter = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIteratorAfter.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIteratorAfter.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
// Host ownership should be empty as we have already stopped the estimator
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
}
/// <summary>
/// Example of a code migration template from Change Feed Processor V2 to SDK V3.
/// </summary>
/// <returns></returns>
public static async Task RunMigrationSample(
string databaseId,
CosmosClient client,
IConfigurationRoot configuration)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 10 items that will be picked up by the old Change Feed Processor library...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// This is how you would initialize the processor in V2
// <ChangeFeedProcessorLibrary>
ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.monitoredContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.leasesContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
.WithHostName("consoleHost")
.WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
{
StartFromBeginning = true,
LeasePrefix = "MyLeasePrefix",
MaxItemCount = 10,
FeedPollDelay = TimeSpan.FromSeconds(1)
})
.WithFeedCollection(monitoredCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithObserver<ChangeFeedObserver>()
.BuildAsync();
// </ChangeFeedProcessorLibrary>
await oldChangeFeedProcessor.StartAsync();
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Now we will stop the V2 Processor and start a V3 with the same parameters to pick up from the same state, press any key to continue...");
Console.ReadKey();
await oldChangeFeedProcessor.StopAsync();
Console.WriteLine("Generating 5 items that will be picked up by the new Change Feed Processor...");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
// This is how you would do the same initialization in V3
// <ChangeFeedProcessorMigrated>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithMaxItems(10)
.WithPollInterval(TimeSpan.FromSeconds(1))
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
// </ChangeFeedProcessorMigrated>
await changeFeedProcessor.StartAsync();
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Setup notification APIs for events.
/// </summary>
public static async Task RunWithNotifications(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim();
Container.ChangeFeedHandler<ToDoItem> handleChanges = (ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken) =>
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken} but throwing an exception to bubble to notifications.");
manualResetEventSlim.Set();
throw new Exception("This is an unhandled exception from inside the delegate");
};
// <StartWithNotifications>
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartWithNotifications>
Console.WriteLine($"Starting Change Feed Processor with logging enabled...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
manualResetEventSlim.Wait();
await Task.Delay(1000);
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
// <Delegate>
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate work
await Task.Delay(1);
}
}
// </Delegate>
/// <summary>
/// The delegate for the Estimator receives a numeric representation of items pending to be read.
/// </summary>
// <EstimationDelegate>
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
// </EstimationDelegate>
private static async Task GenerateItems(
int itemsToInsert,
Container container)
{
await Task.Delay(500);
for (int i = 0; i < itemsToInsert; i++)
{
string id = Guid.NewGuid().ToString();
await container.CreateItemAsync<ToDoItem>(
new ToDoItem()
{
id = id,
creationTime = DateTime.UtcNow
},
new PartitionKey(id));
}
}
private static async Task InitializeAsync(
string databaseId,
CosmosClient client)
{
Database database;
// Recreate database
try
{
database = await client.GetDatabase(databaseId).ReadAsync();
await database.DeleteAsync();
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
}
database = await client.CreateDatabaseAsync(databaseId);
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(Program.monitoredContainer, Program.partitionKeyPath));
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(Program.leasesContainer, Program.partitionKeyPath));
}
}
// <Model>
public class ToDoItem
{
public string id { get; set; }
public DateTime creationTime { get; set; }
}
// </Model>
internal class ChangeFeedObserver : IChangeFeedObserver
{
public Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
{
return Task.CompletedTask;
}
public Task OpenAsync(IChangeFeedObserverContext context)
{
return Task.CompletedTask;
}
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Microsoft.Azure.Documents.Document> docs, CancellationToken cancellationToken)
{
foreach (Microsoft.Azure.Documents.Document doc in docs)
{
Console.WriteLine($"\t[OLD Processor] Detected operation for item with id {doc.Id}, created at {doc.GetPropertyValue<DateTime>("creationTime")}.");
}
return Task.CompletedTask;
}
}
}
部署单元
单个更改源处理器部署单元包含一个或多个具有相同 processorName
值、相同租用容器配置但实例名称不同的计算实例。 可以有多个部署单元,其中每个单元可以具有不同的更改业务流,且每个部署单元可由一个或多个实例组成。
例如,你可能有一个部署单元,每次容器发生更改时,该部署单元就会触发外部 API。 另一个部署单元可能会在每次发生更改时实时移动数据。 当受监视的容器中发生更改时,所有部署单元都会收到通知。
动态缩放
如前所述,在某个部署单元中,可以有一个或多个计算实例。 若要充分利用部署单元内的计算分布,只需满足以下关键要求:
- 所有实例应具有相同的租用容器配置。
- 所有实例都应具有相同的
processorName
值。 - 每个实例都需要具有不同的实例名称 (
WithInstanceName
)。
如果符合这三个条件,更改源处理器将使用均等分布算法,将租用容器中的所有租用分布到该部署单元的所有正在运行的实例,并将计算并行化。 在任意时间,一个租用归一个实例所有,因此,实例数应不大于租用数。
实例数可以增长和收缩。 更改源处理器通过相应地重新分发负载来动态调整负载。
而且,如果容器的吞吐量或存储增加,更改源处理器还可以动态调整到容器规模。 当容器增多时,更改源处理器会以透明方式应对这种情况,方法是动态增加租约并将新的租约分布到现有实例。
开始时间
默认情况下,首次启动更改源处理器时,它会初始化租用容器,并开始其处理生命周期。 不会检测到首次初始化更改源处理器之前在受监视容器中发生的任何更改。
从以前的某个日期和时间读取
将 DateTime
的实例传递给 WithStartTime
生成器扩展,可将更改源处理器初始化为从特定的日期和时间开始读取更改:
namespace Cosmos.Samples.ChangeFeed
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Extensions.Configuration;
using ChangeFeedProcessorLibrary = Microsoft.Azure.Documents.ChangeFeedProcessor;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://learn.microsoft.com/azure/cosmos-db/create-cosmosdb-resources-portal
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
// Sample - demonstrates common Change Feed operations
//
// 1. Listening for changes that happen after a Change Feed Processor is started.
//
// 2. Listening for changes that happen after a certain point in time.
//
// 3. Listening for changes that happen since the container was created.
//
// 4. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification
//
// 5. Generate Estimator metrics to expose current Change Feed Processor progress on demand
//
// 6. Code migration template from existing Change Feed Processor library V2
//
// 7. Error handling and advanced logging
//-----------------------------------------------------------------------------------------------------------
class Program
{
private static readonly string monitoredContainer = "monitored";
private static readonly string leasesContainer = "leases";
private static readonly string partitionKeyPath = "/id";
// Async main requires c# 7.1 which is set in the csproj with the LangVersion attribute
static async Task Main(string[] args)
{
try
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddJsonFile("appSettings.json")
.Build();
string endpoint = configuration["EndPointUrl"];
if (string.IsNullOrEmpty(endpoint))
{
throw new ArgumentNullException("Please specify a valid endpoint in the appSettings.json");
}
string authKey = configuration["AuthorizationKey"];
if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key"))
{
throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json");
}
using (CosmosClient client = new CosmosClient(endpoint, authKey))
{
Console.WriteLine($"\n1. Listening for changes that happen after a Change Feed Processor is started.");
await Program.RunBasicChangeFeed("changefeed-basic", client);
Console.WriteLine($"\n2. Listening for changes that happen after a certain point in time.");
await Program.RunStartTimeChangeFeed("changefeed-time", client);
Console.WriteLine($"\n3. Listening for changes that happen since the container was created.");
await Program.RunStartFromBeginningChangeFeed("changefeed-beginning", client);
Console.WriteLine($"\n4. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification.");
await Program.RunEstimatorChangeFeed("changefeed-estimator", client);
Console.WriteLine($"\n5. Generate Estimator metrics to expose current Change Feed Processor progress on demand.");
await Program.RunEstimatorPullChangeFeed("changefeed-estimator-detailed", client);
Console.WriteLine($"\n6. Code migration template from existing Change Feed Processor library V2.");
await Program.RunMigrationSample("changefeed-migration", client, configuration);
Console.WriteLine($"\n7. Error handling and advanced logging.");
await Program.RunWithNotifications("changefeed-logging", client);
}
}
finally
{
Console.WriteLine("End of demo, press any key to exit.");
Console.ReadKey();
}
}
/// <summary>
/// Basic change feed functionality.
/// </summary>
/// <remarks>
/// When StartAsync is called, the Change Feed Processor starts an initialization process that can take several milliseconds,
/// in which it starts connections and initializes locks in the leases container.
/// </remarks>
public static async Task RunBasicChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <BasicInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBasic", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </BasicInitialization>
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, monitoredContainer);
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// StartTime will make the Change Feed Processor start processing changes at a particular point in time, all previous changes are ignored.
/// </summary>
/// <remarks>
/// StartTime only works if the leaseContainer is empty or contains no leases for the particular processor name.
/// </remarks>
public static async Task RunStartTimeChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 5 items that will not be picked up.");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine($"Items generated at {DateTime.UtcNow}");
// Generate a future point in time
await Task.Delay(2000);
DateTime particularPointInTime = DateTime.UtcNow;
Console.WriteLine("Generating 5 items that will be picked up by the delegate...");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
// <TimeInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(particularPointInTime)
.Build();
// </TimeInitialization>
Console.WriteLine($"Starting Change Feed Processor with changes after {particularPointInTime}...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Reading the Change Feed since the beginning of time.
/// </summary>
/// <remarks>
/// StartTime only works if the leaseContainer is empty or contains no leases for the particular processor name.
/// </remarks>
public static async Task RunStartFromBeginningChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// <StartFromBeginningInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
// </StartFromBeginningInitialization>
Console.WriteLine($"Starting Change Feed Processor with changes since the beginning...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// </remarks>
public static async Task RunEstimatorChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <StartProcessorEstimator>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimator>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
// <StartEstimator>
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
// </StartEstimator>
Console.WriteLine($"Starting Change Feed Estimator...");
await changeFeedEstimator.StartAsync();
Console.WriteLine("Change Feed Estimator started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
await changeFeedEstimator.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator with the detailed iterator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// The iterator exposes detailed, per-lease, information on estimation and ownership.
/// </remarks>
public static async Task RunEstimatorPullChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <StartProcessorEstimatorDetailed>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimatorDetailed>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait some seconds for instances to acquire leases
await Task.Delay(5000);
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
// <StartEstimatorDetailed>
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
// </StartEstimatorDetailed>
// <GetIteratorEstimatorDetailed>
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
// </GetIteratorEstimatorDetailed>
Console.WriteLine("Stopping processor to show how the lag increases if no processing is happening.");
await changeFeedProcessor.StopAsync();
// Wait for processor to shutdown completely so the next items generate lag
await Task.Delay(5000);
Console.WriteLine("Generating 10 items that will be seen by the Estimator...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIteratorAfter = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIteratorAfter.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIteratorAfter.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
// Host ownership should be empty as we have already stopped the estimator
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
}
/// <summary>
/// Example of a code migration template from Change Feed Processor V2 to SDK V3.
/// </summary>
/// <returns></returns>
public static async Task RunMigrationSample(
string databaseId,
CosmosClient client,
IConfigurationRoot configuration)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 10 items that will be picked up by the old Change Feed Processor library...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// This is how you would initialize the processor in V2
// <ChangeFeedProcessorLibrary>
ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.monitoredContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.leasesContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
.WithHostName("consoleHost")
.WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
{
StartFromBeginning = true,
LeasePrefix = "MyLeasePrefix",
MaxItemCount = 10,
FeedPollDelay = TimeSpan.FromSeconds(1)
})
.WithFeedCollection(monitoredCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithObserver<ChangeFeedObserver>()
.BuildAsync();
// </ChangeFeedProcessorLibrary>
await oldChangeFeedProcessor.StartAsync();
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Now we will stop the V2 Processor and start a V3 with the same parameters to pick up from the same state, press any key to continue...");
Console.ReadKey();
await oldChangeFeedProcessor.StopAsync();
Console.WriteLine("Generating 5 items that will be picked up by the new Change Feed Processor...");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
// This is how you would do the same initialization in V3
// <ChangeFeedProcessorMigrated>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithMaxItems(10)
.WithPollInterval(TimeSpan.FromSeconds(1))
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
// </ChangeFeedProcessorMigrated>
await changeFeedProcessor.StartAsync();
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Setup notification APIs for events.
/// </summary>
public static async Task RunWithNotifications(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim();
Container.ChangeFeedHandler<ToDoItem> handleChanges = (ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken) =>
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken} but throwing an exception to bubble to notifications.");
manualResetEventSlim.Set();
throw new Exception("This is an unhandled exception from inside the delegate");
};
// <StartWithNotifications>
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartWithNotifications>
Console.WriteLine($"Starting Change Feed Processor with logging enabled...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
manualResetEventSlim.Wait();
await Task.Delay(1000);
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
// <Delegate>
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate work
await Task.Delay(1);
}
}
// </Delegate>
/// <summary>
/// The delegate for the Estimator receives a numeric representation of items pending to be read.
/// </summary>
// <EstimationDelegate>
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
// </EstimationDelegate>
private static async Task GenerateItems(
int itemsToInsert,
Container container)
{
await Task.Delay(500);
for (int i = 0; i < itemsToInsert; i++)
{
string id = Guid.NewGuid().ToString();
await container.CreateItemAsync<ToDoItem>(
new ToDoItem()
{
id = id,
creationTime = DateTime.UtcNow
},
new PartitionKey(id));
}
}
private static async Task InitializeAsync(
string databaseId,
CosmosClient client)
{
Database database;
// Recreate database
try
{
database = await client.GetDatabase(databaseId).ReadAsync();
await database.DeleteAsync();
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
}
database = await client.CreateDatabaseAsync(databaseId);
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(Program.monitoredContainer, Program.partitionKeyPath));
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(Program.leasesContainer, Program.partitionKeyPath));
}
}
// <Model>
public class ToDoItem
{
public string id { get; set; }
public DateTime creationTime { get; set; }
}
// </Model>
internal class ChangeFeedObserver : IChangeFeedObserver
{
public Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
{
return Task.CompletedTask;
}
public Task OpenAsync(IChangeFeedObserverContext context)
{
return Task.CompletedTask;
}
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Microsoft.Azure.Documents.Document> docs, CancellationToken cancellationToken)
{
foreach (Microsoft.Azure.Documents.Document doc in docs)
{
Console.WriteLine($"\t[OLD Processor] Detected operation for item with id {doc.Id}, created at {doc.GetPropertyValue<DateTime>("creationTime")}.");
}
return Task.CompletedTask;
}
}
}
更改源处理器将根据该特定日期和时间初始化,并开始读取此日期和时间之后发生的更改。
从头开始读取
在其他方案(例如数据迁移,或分析容器的整个历史记录)中,需要从该容器的生存期开始时间读取更改源。 可以在生成器扩展中使用 WithStartTime
,但需要传递 DateTime.MinValue.ToUniversalTime()
,以便生成最小 DateTime
值的 UTC 表示形式,如以下示例所示:
namespace Cosmos.Samples.ChangeFeed
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Extensions.Configuration;
using ChangeFeedProcessorLibrary = Microsoft.Azure.Documents.ChangeFeedProcessor;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://learn.microsoft.com/azure/cosmos-db/create-cosmosdb-resources-portal
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
// Sample - demonstrates common Change Feed operations
//
// 1. Listening for changes that happen after a Change Feed Processor is started.
//
// 2. Listening for changes that happen after a certain point in time.
//
// 3. Listening for changes that happen since the container was created.
//
// 4. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification
//
// 5. Generate Estimator metrics to expose current Change Feed Processor progress on demand
//
// 6. Code migration template from existing Change Feed Processor library V2
//
// 7. Error handling and advanced logging
//-----------------------------------------------------------------------------------------------------------
class Program
{
private static readonly string monitoredContainer = "monitored";
private static readonly string leasesContainer = "leases";
private static readonly string partitionKeyPath = "/id";
// Async main requires c# 7.1 which is set in the csproj with the LangVersion attribute
static async Task Main(string[] args)
{
try
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddJsonFile("appSettings.json")
.Build();
string endpoint = configuration["EndPointUrl"];
if (string.IsNullOrEmpty(endpoint))
{
throw new ArgumentNullException("Please specify a valid endpoint in the appSettings.json");
}
string authKey = configuration["AuthorizationKey"];
if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key"))
{
throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json");
}
using (CosmosClient client = new CosmosClient(endpoint, authKey))
{
Console.WriteLine($"\n1. Listening for changes that happen after a Change Feed Processor is started.");
await Program.RunBasicChangeFeed("changefeed-basic", client);
Console.WriteLine($"\n2. Listening for changes that happen after a certain point in time.");
await Program.RunStartTimeChangeFeed("changefeed-time", client);
Console.WriteLine($"\n3. Listening for changes that happen since the container was created.");
await Program.RunStartFromBeginningChangeFeed("changefeed-beginning", client);
Console.WriteLine($"\n4. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification.");
await Program.RunEstimatorChangeFeed("changefeed-estimator", client);
Console.WriteLine($"\n5. Generate Estimator metrics to expose current Change Feed Processor progress on demand.");
await Program.RunEstimatorPullChangeFeed("changefeed-estimator-detailed", client);
Console.WriteLine($"\n6. Code migration template from existing Change Feed Processor library V2.");
await Program.RunMigrationSample("changefeed-migration", client, configuration);
Console.WriteLine($"\n7. Error handling and advanced logging.");
await Program.RunWithNotifications("changefeed-logging", client);
}
}
finally
{
Console.WriteLine("End of demo, press any key to exit.");
Console.ReadKey();
}
}
/// <summary>
/// Basic change feed functionality.
/// </summary>
/// <remarks>
/// When StartAsync is called, the Change Feed Processor starts an initialization process that can take several milliseconds,
/// in which it starts connections and initializes locks in the leases container.
/// </remarks>
public static async Task RunBasicChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <BasicInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBasic", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </BasicInitialization>
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, monitoredContainer);
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// StartTime will make the Change Feed Processor start processing changes at a particular point in time, all previous changes are ignored.
/// </summary>
/// <remarks>
/// StartTime only works if the leaseContainer is empty or contains no leases for the particular processor name.
/// </remarks>
public static async Task RunStartTimeChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 5 items that will not be picked up.");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine($"Items generated at {DateTime.UtcNow}");
// Generate a future point in time
await Task.Delay(2000);
DateTime particularPointInTime = DateTime.UtcNow;
Console.WriteLine("Generating 5 items that will be picked up by the delegate...");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
// <TimeInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(particularPointInTime)
.Build();
// </TimeInitialization>
Console.WriteLine($"Starting Change Feed Processor with changes after {particularPointInTime}...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Reading the Change Feed since the beginning of time.
/// </summary>
/// <remarks>
/// StartTime only works if the leaseContainer is empty or contains no leases for the particular processor name.
/// </remarks>
public static async Task RunStartFromBeginningChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// <StartFromBeginningInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
// </StartFromBeginningInitialization>
Console.WriteLine($"Starting Change Feed Processor with changes since the beginning...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// </remarks>
public static async Task RunEstimatorChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <StartProcessorEstimator>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimator>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
// <StartEstimator>
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
// </StartEstimator>
Console.WriteLine($"Starting Change Feed Estimator...");
await changeFeedEstimator.StartAsync();
Console.WriteLine("Change Feed Estimator started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
await changeFeedEstimator.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator with the detailed iterator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// The iterator exposes detailed, per-lease, information on estimation and ownership.
/// </remarks>
public static async Task RunEstimatorPullChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <StartProcessorEstimatorDetailed>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimatorDetailed>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait some seconds for instances to acquire leases
await Task.Delay(5000);
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
// <StartEstimatorDetailed>
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
// </StartEstimatorDetailed>
// <GetIteratorEstimatorDetailed>
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
// </GetIteratorEstimatorDetailed>
Console.WriteLine("Stopping processor to show how the lag increases if no processing is happening.");
await changeFeedProcessor.StopAsync();
// Wait for processor to shutdown completely so the next items generate lag
await Task.Delay(5000);
Console.WriteLine("Generating 10 items that will be seen by the Estimator...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIteratorAfter = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIteratorAfter.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIteratorAfter.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
// Host ownership should be empty as we have already stopped the estimator
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
}
/// <summary>
/// Example of a code migration template from Change Feed Processor V2 to SDK V3.
/// </summary>
/// <returns></returns>
public static async Task RunMigrationSample(
string databaseId,
CosmosClient client,
IConfigurationRoot configuration)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 10 items that will be picked up by the old Change Feed Processor library...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// This is how you would initialize the processor in V2
// <ChangeFeedProcessorLibrary>
ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.monitoredContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.leasesContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
.WithHostName("consoleHost")
.WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
{
StartFromBeginning = true,
LeasePrefix = "MyLeasePrefix",
MaxItemCount = 10,
FeedPollDelay = TimeSpan.FromSeconds(1)
})
.WithFeedCollection(monitoredCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithObserver<ChangeFeedObserver>()
.BuildAsync();
// </ChangeFeedProcessorLibrary>
await oldChangeFeedProcessor.StartAsync();
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Now we will stop the V2 Processor and start a V3 with the same parameters to pick up from the same state, press any key to continue...");
Console.ReadKey();
await oldChangeFeedProcessor.StopAsync();
Console.WriteLine("Generating 5 items that will be picked up by the new Change Feed Processor...");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
// This is how you would do the same initialization in V3
// <ChangeFeedProcessorMigrated>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithMaxItems(10)
.WithPollInterval(TimeSpan.FromSeconds(1))
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
// </ChangeFeedProcessorMigrated>
await changeFeedProcessor.StartAsync();
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Setup notification APIs for events.
/// </summary>
public static async Task RunWithNotifications(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim();
Container.ChangeFeedHandler<ToDoItem> handleChanges = (ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken) =>
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken} but throwing an exception to bubble to notifications.");
manualResetEventSlim.Set();
throw new Exception("This is an unhandled exception from inside the delegate");
};
// <StartWithNotifications>
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartWithNotifications>
Console.WriteLine($"Starting Change Feed Processor with logging enabled...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
manualResetEventSlim.Wait();
await Task.Delay(1000);
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
// <Delegate>
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate work
await Task.Delay(1);
}
}
// </Delegate>
/// <summary>
/// The delegate for the Estimator receives a numeric representation of items pending to be read.
/// </summary>
// <EstimationDelegate>
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
// </EstimationDelegate>
private static async Task GenerateItems(
int itemsToInsert,
Container container)
{
await Task.Delay(500);
for (int i = 0; i < itemsToInsert; i++)
{
string id = Guid.NewGuid().ToString();
await container.CreateItemAsync<ToDoItem>(
new ToDoItem()
{
id = id,
creationTime = DateTime.UtcNow
},
new PartitionKey(id));
}
}
private static async Task InitializeAsync(
string databaseId,
CosmosClient client)
{
Database database;
// Recreate database
try
{
database = await client.GetDatabase(databaseId).ReadAsync();
await database.DeleteAsync();
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
}
database = await client.CreateDatabaseAsync(databaseId);
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(Program.monitoredContainer, Program.partitionKeyPath));
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(Program.leasesContainer, Program.partitionKeyPath));
}
}
// <Model>
public class ToDoItem
{
public string id { get; set; }
public DateTime creationTime { get; set; }
}
// </Model>
internal class ChangeFeedObserver : IChangeFeedObserver
{
public Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
{
return Task.CompletedTask;
}
public Task OpenAsync(IChangeFeedObserverContext context)
{
return Task.CompletedTask;
}
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Microsoft.Azure.Documents.Document> docs, CancellationToken cancellationToken)
{
foreach (Microsoft.Azure.Documents.Document doc in docs)
{
Console.WriteLine($"\t[OLD Processor] Detected operation for item with id {doc.Id}, created at {doc.GetPropertyValue<DateTime>("creationTime")}.");
}
return Task.CompletedTask;
}
}
}
更改源处理器将会初始化,并从容器生存期的开始时间读取更改。
注意
这些自定义选项仅用于设置更改源处理器的起始时间点。 首次初始化租用容器后,更改这些选项不起作用。
自定义起点操作仅适用于最新版本更改源模式。 使用所有版本与删除模式时,必须从处理器启动时开始读取,或者从帐户的连续备份保留期内的前一租用状态恢复。
更改源和预配吞吐量
针对受监视容器的更改源读取操作会消耗请求单位。 确保受监视容器未遇到限制。 限制会增加处理器接收更改源事件的延迟。
租用容器上的操作(更新和维护状态)将使用请求单位。 使用相同租用容器的实例数越多,潜在的请求单位消耗量就越高。 确保租用容器未遇到限制。 限制会增加接收更改源事件的延迟。 限制甚至可能完全中止处理。
共享租用容器
可以在多个部署单元之间共享租用容器。 在共享租用容器中,每个部署单元侦听不同的受监视容器或具有不同的 processorName
值。 在此配置中,每个部署单元都将在租用容器上保持独立状态。 查看租用容器上的请求单位消耗量,确保预配的吞吐量足以满足所有部署单元的需求。
高级租用配置
有三个关键配置会影响更改源处理器的工作方式。 每个配置都会影响租用容器上的请求单位消耗量。 可以在创建更改源处理器时设置以下配置之一,但请谨慎使用:
- 租约获取:默认间隔为 17 秒。 主机将定期检查租约存储的状态,并考虑将租约获取操作设为动态缩放过程的一部分。 此过程是通过对租用容器执行查询来完成的。 减小此值可以加快重新平衡和获取租约的速度,但会增大租用容器上的请求单位消耗量。
- 租约有效期:默认为 60 秒。 定义租用在被另一个主机获取之前,在没有任何续订活动的情况下可以存在的最长时间量。 当某台主机崩溃时,它拥有的租约将在这段时间加上配置的续订间隔之后被其他主机拾取。 减小此值能够加快主机崩溃后的恢复速度,但有效期值永远不应低于续订间隔。
- 租约续订:默认间隔为 13 秒。 拥有租约的主机将定期续订该租约,即使没有新的更改可供使用。 此过程是通过对租用执行替换操作来完成的。 减小此值能够减少检测租约因主机崩溃而丢失的问题所需的时间,但会增大租用容器上的请求单位消耗量。
托管更改源处理器的位置
更改源处理器可以托管在任何支持长时间运行的进程或任务的平台中。 下面是一些示例:
- Azure 应用服务中 Web 作业的连续运行实例
- Azure 虚拟机实例中的进程
- Azure Kubernetes 服务中的后台作业
- Azure Functions 中的无服务器函数
- ASP.NET 托管服务
虽然更改源处理器可以在生存期较短的环境中运行,但由于租用容器会对状态进行维护,这些环境的启动周期会导致接收通知的延迟时间增加(因为每次启动环境时存在启动处理器的开销)。
基于角色的访问要求
使用 Microsoft Entra ID 作为身份验证机制时,请确保标识具有适当的权限:
- 在被监视的容器上:
Microsoft.DocumentDB/databaseAccounts/readMetadata
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
- 在租用容器上:
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery
其他资源
后续步骤
通过以下文章详细了解更改源处理器: