Change feed processor in Azure Cosmos DB
APPLIES TO: NoSQL
The change feed processor is part of the Azure Cosmos DB .NET V3 and Java V4 SDKs. It simplifies the process of reading the change feed and distributes the event processing across multiple consumers effectively.
The main benefit of using the change feed processor is its fault-tolerant design, which assures an "at-least-once" delivery of all the events in the change feed.
Supported SDKs
.Net V3 | Java | Node.JS | Python |
---|---|---|---|
✓ | ✓ | ✕ | ✕ |
Components of the change feed processor
The change feed processor has four main components:
The monitored container: The monitored container has the data from which the change feed is generated. Any inserts and updates to the monitored container are reflected in the change feed of the container.
The lease container: The lease container acts as state storage and coordinates the processing of the change feed across multiple workers. The lease container can be stored in the same account as the monitored container or in a separate account.
The compute instance: A compute instance hosts the change feed processor to listen for changes. Depending on the platform, it might be represented by a virtual machine (VM), a Kubernetes pod, an Azure App Service instance, or an actual physical machine. The compute instance has a unique identifier that's called the instance name throughout this article.
The delegate: The delegate is the code that defines what you, the developer, want to do with each batch of changes that the change feed processor reads.
To further understand how these four elements of the change feed processor work together, let's look at an example in the following diagram. The monitored container stores items and uses 'City' as the partition key. The partition key values are distributed in ranges (each range represents a physical partition) that contain items.
The diagram shows two compute instances, and the change feed processor assigns different ranges to each instance to maximize compute distribution. Each instance has a different, unique name.
Each range is read in parallel. A range's progress is maintained separately from other ranges in the lease container through a lease document. The combination of the leases represents the current state of the change feed processor.
Implement the change feed processor
The change feed processor in .NET is available for latest version mode and all versions and deletes mode. All versions and deletes mode is in preview and is supported for the change feed processor beginning in version 3.40.0-preview.0
. The point of entry for both modes is always the monitored container.
To read using latest version mode, in a Container
instance, you call GetChangeFeedProcessorBuilder
:
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Fluent;
using Microsoft.Extensions.Configuration;
namespace ChangeFeedSample
{
class Program
{
static async Task Main(string[] args)
{
IConfiguration configuration = BuildConfiguration();
CosmosClient cosmosClient = BuildCosmosClient(configuration);
await InitializeContainersAsync(cosmosClient, configuration);
ChangeFeedProcessor processor = await StartChangeFeedProcessorAsync(cosmosClient, configuration);
await GenerateItemsAsync(cosmosClient, processor, configuration);
}
// <Delegate>
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
// </Delegate>
/// <summary>
/// Create required containers for the sample.
/// Change Feed processing requires a source container to read the Change Feed from, and a container to store the state on, called leases.
/// </summary>
private static async Task InitializeContainersAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
if (string.IsNullOrEmpty(databaseName)
|| string.IsNullOrEmpty(sourceContainerName)
|| string.IsNullOrEmpty(leaseContainerName))
{
throw new ArgumentNullException("'SourceDatabaseName', 'SourceContainerName', and 'LeasesContainerName' settings are required. Verify your configuration.");
}
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName);
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(sourceContainerName, "/id"));
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(leaseContainerName, "/id"));
}
// <DefineProcessor>
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
// </DefineProcessor>
/// <summary>
/// Generate sample items based on user input.
/// </summary>
private static async Task GenerateItemsAsync(
CosmosClient cosmosClient,
ChangeFeedProcessor changeFeedProcessor,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
Container sourceContainer = cosmosClient.GetContainer(databaseName, sourceContainerName);
while(true)
{
Console.WriteLine("Enter a number of items to insert in the container or 'exit' to stop:");
string command = Console.ReadLine();
if ("exit".Equals(command, StringComparison.InvariantCultureIgnoreCase))
{
Console.WriteLine();
break;
}
if (int.TryParse(command, out int itemsToInsert))
{
Console.WriteLine($"Generating {itemsToInsert} items...");
for (int i = 0; i < itemsToInsert; i++)
{
string id = Guid.NewGuid().ToString();
await sourceContainer.CreateItemAsync<ToDoItem>(
new ToDoItem()
{
id = id,
creationTime = DateTime.UtcNow
},
new PartitionKey(id));
}
}
}
Console.WriteLine("Stopping Change Feed Processor...");
await changeFeedProcessor.StopAsync();
Console.WriteLine("Stopped Change Feed Processor.");
}
private static IConfiguration BuildConfiguration()
{
return new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
.Build();
}
private static CosmosClient BuildCosmosClient(IConfiguration configuration)
{
if (string.IsNullOrEmpty(configuration["ConnectionString"]) || "<Your-Connection-String>".Equals(configuration["ConnectionString"]))
{
throw new ArgumentNullException("Missing 'ConnectionString' setting in configuration.");
}
return new CosmosClientBuilder(configuration["ConnectionString"])
.Build();
}
}
}
To read using all versions and deletes mode, call GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes
from the Container
instance:
namespace ChangeFeedAllVersionsAndDeletes
{
using System.Net;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Configuration;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://learn.microsoft.com/azure/cosmos-db/create-cosmosdb-resources-portal
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
// Sample - demonstrates common Change Feed operations using All Versions and Deletes mode
//
// 1. Listening for changes that happen after a Change Feed Processor is started.
//
// 2. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification
//
// 3. Generate Estimator metrics to expose current Change Feed Processor progress on demand
//
// 4. Error handling and advanced logging
//-----------------------------------------------------------------------------------------------------------
internal class Program
{
private static readonly string databaseName = "changefeed-db";
private static readonly string monitoredContainerPrefix = "monitored-";
private static readonly string leasesContainer = "leases";
private static readonly string partitionKeyPath = "/id";
static async Task Main(string[] args)
{
try
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddJsonFile("appSettings.json")
.Build();
string endpoint = configuration["EndPointUrl"];
if (string.IsNullOrEmpty(endpoint))
{
throw new ArgumentNullException("Please specify a valid endpoint in the appSettings.json");
}
string authKey = configuration["AuthorizationKey"];
if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key"))
{
throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json");
}
using (CosmosClient client = new CosmosClient(endpoint, authKey))
{
Console.WriteLine($"\n1. Listening for changes that happen after a Change Feed Processor is started.");
await Program.RunBasicChangeFeed(monitoredContainerPrefix + "changefeed-basic", client);
Console.WriteLine($"\n2. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification.");
await Program.RunEstimatorChangeFeed(monitoredContainerPrefix + "changefeed-estimator", client);
Console.WriteLine($"\n3. Generate Estimator metrics to expose current Change Feed Processor progress on demand.");
await Program.RunEstimatorPullChangeFeed(monitoredContainerPrefix + "changefeed-estimator-detailed", client);
Console.WriteLine($"\n4. Error handling and advanced logging.");
await Program.RunWithNotifications(monitoredContainerPrefix + "changefeed-logging", client);
}
}
finally
{
Console.WriteLine("End of demo, press any key to exit.");
Console.ReadKey();
}
}
/// <summary>
/// Basic change feed functionality using all versions and deletes mode.
/// </summary>
/// <remarks>
/// When StartAsync is called, the Change Feed Processor starts an initialization process that can take several milliseconds,
/// in which it starts connections and initializes locks in the leases container.
/// </remarks>
public static async Task RunBasicChangeFeed(
string containerName,
CosmosClient client)
{
await Program.InitializeAsync(containerName, client);
// <BasicInitialization>
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </BasicInitialization>
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, monitoredContainer);
Console.WriteLine("Updating 10 items that will be picked up by the delegate...");
await Program.UpdateItems(10, monitoredContainer);
Console.WriteLine("Deleting 10 items that will be picked up by the delegate...");
await Program.DeleteItems(10, monitoredContainer);
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// </remarks>
public static async Task RunEstimatorChangeFeed(
string containerName,
CosmosClient client)
{
await Program.InitializeAsync(containerName, client);
// <StartProcessorEstimator>
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedEstimator", onChangesDelegate: Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimator>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait random time for the delegate to output all messages
await Task.Delay(1000);
// <StartEstimator>
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
// </StartEstimator>
Console.WriteLine($"Starting Change Feed Estimator...");
await changeFeedEstimator.StartAsync();
Console.WriteLine("Change Feed Estimator started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.GenerateItems(10, monitoredContainer);
Console.WriteLine("Updating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.UpdateItems(10, monitoredContainer);
Console.WriteLine("Deleting 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.DeleteItems(10, monitoredContainer);
// Wait random time for the delegate to output all messages
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
await changeFeedEstimator.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator with the detailed iterator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// The iterator exposes detailed, per-lease, information on estimation and ownership.
/// </remarks>
public static async Task RunEstimatorPullChangeFeed(
string containerName,
CosmosClient client)
{
await Program.InitializeAsync(containerName, client);
// <StartProcessorEstimatorDetailed>
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>("changeFeedEstimatorPull", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimatorDetailed>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait some seconds for instances to acquire leases
await Task.Delay(5000);
Console.WriteLine("Generating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.GenerateItems(10, monitoredContainer);
Console.WriteLine("Updating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.UpdateItems(10, monitoredContainer);
Console.WriteLine("Deleting 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.DeleteItems(10, monitoredContainer);
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
// <StartEstimatorDetailed>
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
// </StartEstimatorDetailed>
// <GetIteratorEstimatorDetailed>
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
// </GetIteratorEstimatorDetailed>
Console.WriteLine("Stopping processor.");
await changeFeedProcessor.StopAsync();
// Wait for processor to shutdown completely so the next items generate lag
await Task.Delay(7500);
Console.WriteLine("Generating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.GenerateItems(10, monitoredContainer);
Console.WriteLine("Updating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.UpdateItems(10, monitoredContainer);
Console.WriteLine("Deleting 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.DeleteItems(10, monitoredContainer);
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIteratorAfter = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIteratorAfter.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIteratorAfter.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
// Host ownership should be empty as we have already stopped the estimator
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
}
/// <summary>
/// Setup notification APIs for events.
/// </summary>
public static async Task RunWithNotifications(
string containerName,
CosmosClient client)
{
await Program.InitializeAsync(containerName, client);
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim();
Container.ChangeFeedHandler<ChangeFeedItem<ToDoItem>> handleChanges = (ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken) =>
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken} but throwing an exception to bubble to notifications.");
manualResetEventSlim.Set();
throw new Exception("This is an unhandled exception from inside the delegate");
};
// <StartWithNotifications>
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartWithNotifications>
Console.WriteLine($"Starting Change Feed Processor with logging enabled...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.GenerateItems(10, monitoredContainer);
Console.WriteLine("Updating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.UpdateItems(10, monitoredContainer);
Console.WriteLine("Deleting 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.DeleteItems(10, monitoredContainer);
// Wait random time for the delegate to output all messages after initialization is done
manualResetEventSlim.Wait();
await Task.Delay(1000);
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
// <Delegate>
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ChangeFeedItem<ToDoItem> item in changes)
{
if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Previous.id}.");
}
else
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
}
// Simulate work
await Task.Delay(1);
}
}
// </Delegate>
/// <summary>
/// The delegate for the Estimator receives a numeric representation of items pending to be read.
/// This is an estimate only and is not an exact count of outstanding items.
/// </summary>
// <EstimationDelegate>
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
// </EstimationDelegate>
private static async Task InitializeAsync(string containerName, CosmosClient client)
{
Database database = await client.CreateDatabaseIfNotExistsAsync(id: Program.databaseName, throughput: 1000);
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(containerName, Program.partitionKeyPath));
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(Program.leasesContainer, Program.partitionKeyPath));
}
private static async Task GenerateItems(int itemsToInsert, Container container)
{
await Task.Delay(500);
for (int i = 0; i < itemsToInsert; i++)
{
await container.CreateItemAsync<ToDoItem>(
new ToDoItem()
{
id = i.ToString(),
CreationTime = DateTime.UtcNow
},
new PartitionKey(i.ToString()));
}
}
private static async Task UpdateItems(int itemsToUpdate, Container container)
{
await Task.Delay(500);
for (int i = 0; i < itemsToUpdate; i++)
{
await container.ReplaceItemAsync<ToDoItem>(
new ToDoItem()
{
id = i.ToString(),
CreationTime = DateTime.UtcNow,
Status = "updated"
},
i.ToString());
}
}
private static async Task DeleteItems(int itemsToDelete, Container container)
{
await Task.Delay(500);
for (int i = 0; i < itemsToDelete; i++)
{
await container.DeleteItemAsync<ToDoItem>(i.ToString(), new PartitionKey(i.ToString()));
}
}
}
// <Model>
public class ToDoItem
{
public string? id { get; set; }
public DateTime CreationTime { get; set; }
public string? Status { get; set; }
}
// </Model>
}
For both modes, the first parameter is a distinct name that describes the goal of this processor. The second name is the delegate implementation that handles changes.
Here's an example of a delegate for latest version mode:
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Fluent;
using Microsoft.Extensions.Configuration;
namespace ChangeFeedSample
{
class Program
{
static async Task Main(string[] args)
{
IConfiguration configuration = BuildConfiguration();
CosmosClient cosmosClient = BuildCosmosClient(configuration);
await InitializeContainersAsync(cosmosClient, configuration);
ChangeFeedProcessor processor = await StartChangeFeedProcessorAsync(cosmosClient, configuration);
await GenerateItemsAsync(cosmosClient, processor, configuration);
}
// <Delegate>
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
// </Delegate>
/// <summary>
/// Create required containers for the sample.
/// Change Feed processing requires a source container to read the Change Feed from, and a container to store the state on, called leases.
/// </summary>
private static async Task InitializeContainersAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
if (string.IsNullOrEmpty(databaseName)
|| string.IsNullOrEmpty(sourceContainerName)
|| string.IsNullOrEmpty(leaseContainerName))
{
throw new ArgumentNullException("'SourceDatabaseName', 'SourceContainerName', and 'LeasesContainerName' settings are required. Verify your configuration.");
}
Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseName);
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(sourceContainerName, "/id"));
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(leaseContainerName, "/id"));
}
// <DefineProcessor>
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
// </DefineProcessor>
/// <summary>
/// Generate sample items based on user input.
/// </summary>
private static async Task GenerateItemsAsync(
CosmosClient cosmosClient,
ChangeFeedProcessor changeFeedProcessor,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
Container sourceContainer = cosmosClient.GetContainer(databaseName, sourceContainerName);
while(true)
{
Console.WriteLine("Enter a number of items to insert in the container or 'exit' to stop:");
string command = Console.ReadLine();
if ("exit".Equals(command, StringComparison.InvariantCultureIgnoreCase))
{
Console.WriteLine();
break;
}
if (int.TryParse(command, out int itemsToInsert))
{
Console.WriteLine($"Generating {itemsToInsert} items...");
for (int i = 0; i < itemsToInsert; i++)
{
string id = Guid.NewGuid().ToString();
await sourceContainer.CreateItemAsync<ToDoItem>(
new ToDoItem()
{
id = id,
creationTime = DateTime.UtcNow
},
new PartitionKey(id));
}
}
}
Console.WriteLine("Stopping Change Feed Processor...");
await changeFeedProcessor.StopAsync();
Console.WriteLine("Stopped Change Feed Processor.");
}
private static IConfiguration BuildConfiguration()
{
return new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
.Build();
}
private static CosmosClient BuildCosmosClient(IConfiguration configuration)
{
if (string.IsNullOrEmpty(configuration["ConnectionString"]) || "<Your-Connection-String>".Equals(configuration["ConnectionString"]))
{
throw new ArgumentNullException("Missing 'ConnectionString' setting in configuration.");
}
return new CosmosClientBuilder(configuration["ConnectionString"])
.Build();
}
}
}
Afterward, you define the compute instance name or unique identifier by using WithInstanceName
. The compute instance name should be unique and different for each compute instance you're deploying. You set the container to maintain the lease state by using WithLeaseContainer
.
Calling Build
gives you the processor instance that you can start by calling StartAsync
.
Note
The preceding code snippets are taken from samples in GitHub. You can get the sample for latest version mode or all versions and deletes mode.
Processing life cycle
The normal life cycle of a host instance is:
- Read the change feed.
- If there are no changes, sleep for a predefined amount of time (customizable by using
WithPollInterval
in the Builder) and go to #1. - If there are changes, send them to the delegate.
- When the delegate finishes processing the changes successfully, update the lease store with the latest processed point in time and go to #1.
Error handling
The change feed processor is resilient to user code errors. If your delegate implementation has an unhandled exception (step #4), the thread that is processing that particular batch of changes stops, and a new thread is eventually created. The new thread checks the latest point in time that the lease store has saved for that range of partition key values. The new thread restarts from there, effectively sending the same batch of changes to the delegate. This behavior continues until your delegate processes the changes correctly, and it's the reason the change feed processor has an "at least once" guarantee.
Note
In only one scenario, a batch of changes is not retried. If the failure happens on the first-ever delegate execution, the lease store has no previous saved state to be used on the retry. In those cases, the retry uses the initial starting configuration, which might or might not include the last batch.
To prevent your change feed processor from getting "stuck" continuously retrying the same batch of changes, you should add logic in your delegate code to write documents, upon exception, to an errored-message queue. This design ensures that you can keep track of unprocessed changes while still being able to continue to process future changes. The errored-message queue might be another Azure Cosmos DB container. The exact data store doesn't matter. You simply want the unprocessed changes to be persisted.
You also can use the change feed estimator to monitor the progress of your change feed processor instances as they read the change feed, or you can use life cycle notifications to detect underlying failures.
Life cycle notifications
You can connect the change feed processor to any relevant event in its life cycle. You can choose to be notified to one or all of them. The recommendation is to at least register the error notification:
- Register a handler for
WithLeaseAcquireNotification
to be notified when the current host acquires a lease to start processing it. - Register a handler for
WithLeaseReleaseNotification
to be notified when the current host releases a lease and stops processing it. - Register a handler for
WithErrorNotification
to be notified when the current host encounters an exception during processing. You need to be able to distinguish whether the source is the user delegate (an unhandled exception) or an error that the processor encounters when it tries to access the monitored container (for example, networking issues).
Life cycle notifications are available in both change feed modes. Here's an example of life cycle notifications in latest version mode:
namespace Cosmos.Samples.ChangeFeed
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Extensions.Configuration;
using ChangeFeedProcessorLibrary = Microsoft.Azure.Documents.ChangeFeedProcessor;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://learn.microsoft.com/azure/cosmos-db/create-cosmosdb-resources-portal
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
// Sample - demonstrates common Change Feed operations
//
// 1. Listening for changes that happen after a Change Feed Processor is started.
//
// 2. Listening for changes that happen after a certain point in time.
//
// 3. Listening for changes that happen since the container was created.
//
// 4. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification
//
// 5. Generate Estimator metrics to expose current Change Feed Processor progress on demand
//
// 6. Code migration template from existing Change Feed Processor library V2
//
// 7. Error handling and advanced logging
//-----------------------------------------------------------------------------------------------------------
class Program
{
private static readonly string monitoredContainer = "monitored";
private static readonly string leasesContainer = "leases";
private static readonly string partitionKeyPath = "/id";
// Async main requires c# 7.1 which is set in the csproj with the LangVersion attribute
static async Task Main(string[] args)
{
try
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddJsonFile("appSettings.json")
.Build();
string endpoint = configuration["EndPointUrl"];
if (string.IsNullOrEmpty(endpoint))
{
throw new ArgumentNullException("Please specify a valid endpoint in the appSettings.json");
}
string authKey = configuration["AuthorizationKey"];
if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key"))
{
throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json");
}
using (CosmosClient client = new CosmosClient(endpoint, authKey))
{
Console.WriteLine($"\n1. Listening for changes that happen after a Change Feed Processor is started.");
await Program.RunBasicChangeFeed("changefeed-basic", client);
Console.WriteLine($"\n2. Listening for changes that happen after a certain point in time.");
await Program.RunStartTimeChangeFeed("changefeed-time", client);
Console.WriteLine($"\n3. Listening for changes that happen since the container was created.");
await Program.RunStartFromBeginningChangeFeed("changefeed-beginning", client);
Console.WriteLine($"\n4. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification.");
await Program.RunEstimatorChangeFeed("changefeed-estimator", client);
Console.WriteLine($"\n5. Generate Estimator metrics to expose current Change Feed Processor progress on demand.");
await Program.RunEstimatorPullChangeFeed("changefeed-estimator-detailed", client);
Console.WriteLine($"\n6. Code migration template from existing Change Feed Processor library V2.");
await Program.RunMigrationSample("changefeed-migration", client, configuration);
Console.WriteLine($"\n7. Error handling and advanced logging.");
await Program.RunWithNotifications("changefeed-logging", client);
}
}
finally
{
Console.WriteLine("End of demo, press any key to exit.");
Console.ReadKey();
}
}
/// <summary>
/// Basic change feed functionality.
/// </summary>
/// <remarks>
/// When StartAsync is called, the Change Feed Processor starts an initialization process that can take several milliseconds,
/// in which it starts connections and initializes locks in the leases container.
/// </remarks>
public static async Task RunBasicChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <BasicInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBasic", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </BasicInitialization>
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, monitoredContainer);
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// StartTime will make the Change Feed Processor start processing changes at a particular point in time, all previous changes are ignored.
/// </summary>
/// <remarks>
/// StartTime only works if the leaseContainer is empty or contains no leases for the particular processor name.
/// </remarks>
public static async Task RunStartTimeChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 5 items that will not be picked up.");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine($"Items generated at {DateTime.UtcNow}");
// Generate a future point in time
await Task.Delay(2000);
DateTime particularPointInTime = DateTime.UtcNow;
Console.WriteLine("Generating 5 items that will be picked up by the delegate...");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
// <TimeInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(particularPointInTime)
.Build();
// </TimeInitialization>
Console.WriteLine($"Starting Change Feed Processor with changes after {particularPointInTime}...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Reading the Change Feed since the beginning of time.
/// </summary>
/// <remarks>
/// StartTime only works if the leaseContainer is empty or contains no leases for the particular processor name.
/// </remarks>
public static async Task RunStartFromBeginningChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// <StartFromBeginningInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
// </StartFromBeginningInitialization>
Console.WriteLine($"Starting Change Feed Processor with changes since the beginning...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// </remarks>
public static async Task RunEstimatorChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <StartProcessorEstimator>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimator>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
// <StartEstimator>
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
// </StartEstimator>
Console.WriteLine($"Starting Change Feed Estimator...");
await changeFeedEstimator.StartAsync();
Console.WriteLine("Change Feed Estimator started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
await changeFeedEstimator.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator with the detailed iterator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// The iterator exposes detailed, per-lease, information on estimation and ownership.
/// </remarks>
public static async Task RunEstimatorPullChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <StartProcessorEstimatorDetailed>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimatorDetailed>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait some seconds for instances to acquire leases
await Task.Delay(5000);
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
// <StartEstimatorDetailed>
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
// </StartEstimatorDetailed>
// <GetIteratorEstimatorDetailed>
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
// </GetIteratorEstimatorDetailed>
Console.WriteLine("Stopping processor to show how the lag increases if no processing is happening.");
await changeFeedProcessor.StopAsync();
// Wait for processor to shutdown completely so the next items generate lag
await Task.Delay(5000);
Console.WriteLine("Generating 10 items that will be seen by the Estimator...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIteratorAfter = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIteratorAfter.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIteratorAfter.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
// Host ownership should be empty as we have already stopped the estimator
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
}
/// <summary>
/// Example of a code migration template from Change Feed Processor V2 to SDK V3.
/// </summary>
/// <returns></returns>
public static async Task RunMigrationSample(
string databaseId,
CosmosClient client,
IConfigurationRoot configuration)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 10 items that will be picked up by the old Change Feed Processor library...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// This is how you would initialize the processor in V2
// <ChangeFeedProcessorLibrary>
ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.monitoredContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.leasesContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
.WithHostName("consoleHost")
.WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
{
StartFromBeginning = true,
LeasePrefix = "MyLeasePrefix",
MaxItemCount = 10,
FeedPollDelay = TimeSpan.FromSeconds(1)
})
.WithFeedCollection(monitoredCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithObserver<ChangeFeedObserver>()
.BuildAsync();
// </ChangeFeedProcessorLibrary>
await oldChangeFeedProcessor.StartAsync();
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Now we will stop the V2 Processor and start a V3 with the same parameters to pick up from the same state, press any key to continue...");
Console.ReadKey();
await oldChangeFeedProcessor.StopAsync();
Console.WriteLine("Generating 5 items that will be picked up by the new Change Feed Processor...");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
// This is how you would do the same initialization in V3
// <ChangeFeedProcessorMigrated>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithMaxItems(10)
.WithPollInterval(TimeSpan.FromSeconds(1))
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
// </ChangeFeedProcessorMigrated>
await changeFeedProcessor.StartAsync();
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Setup notification APIs for events.
/// </summary>
public static async Task RunWithNotifications(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim();
Container.ChangeFeedHandler<ToDoItem> handleChanges = (ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken) =>
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken} but throwing an exception to bubble to notifications.");
manualResetEventSlim.Set();
throw new Exception("This is an unhandled exception from inside the delegate");
};
// <StartWithNotifications>
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartWithNotifications>
Console.WriteLine($"Starting Change Feed Processor with logging enabled...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
manualResetEventSlim.Wait();
await Task.Delay(1000);
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
// <Delegate>
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate work
await Task.Delay(1);
}
}
// </Delegate>
/// <summary>
/// The delegate for the Estimator receives a numeric representation of items pending to be read.
/// </summary>
// <EstimationDelegate>
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
// </EstimationDelegate>
private static async Task GenerateItems(
int itemsToInsert,
Container container)
{
await Task.Delay(500);
for (int i = 0; i < itemsToInsert; i++)
{
string id = Guid.NewGuid().ToString();
await container.CreateItemAsync<ToDoItem>(
new ToDoItem()
{
id = id,
creationTime = DateTime.UtcNow
},
new PartitionKey(id));
}
}
private static async Task InitializeAsync(
string databaseId,
CosmosClient client)
{
Database database;
// Recreate database
try
{
database = await client.GetDatabase(databaseId).ReadAsync();
await database.DeleteAsync();
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
}
database = await client.CreateDatabaseAsync(databaseId);
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(Program.monitoredContainer, Program.partitionKeyPath));
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(Program.leasesContainer, Program.partitionKeyPath));
}
}
// <Model>
public class ToDoItem
{
public string id { get; set; }
public DateTime creationTime { get; set; }
}
// </Model>
internal class ChangeFeedObserver : IChangeFeedObserver
{
public Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
{
return Task.CompletedTask;
}
public Task OpenAsync(IChangeFeedObserverContext context)
{
return Task.CompletedTask;
}
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Microsoft.Azure.Documents.Document> docs, CancellationToken cancellationToken)
{
foreach (Microsoft.Azure.Documents.Document doc in docs)
{
Console.WriteLine($"\t[OLD Processor] Detected operation for item with id {doc.Id}, created at {doc.GetPropertyValue<DateTime>("creationTime")}.");
}
return Task.CompletedTask;
}
}
}
Deployment unit
A single change feed processor deployment unit consists of one or more compute instances that have the same value for processorName
and the same lease container configuration, but different instance names. You can have many deployment units in which each unit has a different business flow for the changes and each deployment unit consists of one or more instances.
For example, you might have one deployment unit that triggers an external API each time there's a change in your container. Another deployment unit might move data in real time each time there's a change. When a change happens in your monitored container, all your deployment units are notified.
Dynamic scaling
As mentioned earlier, within a deployment unit, you can have one or more compute instances. To take advantage of the compute distribution within the deployment unit, the only key requirements are that:
- All instances should have the same lease container configuration.
- All instances should have the same value for
processorName
. - Each instance needs to have a different instance name (
WithInstanceName
).
If these three conditions apply, then the change feed processor distributes all the leases that are in the lease container across all running instances of that deployment unit, and it parallelizes compute by using an equal-distribution algorithm. A lease is owned by one instance at any time, so the number of instances shouldn't be greater than the number of leases.
The number of instances can grow and shrink. The change feed processor dynamically adjusts the load by redistributing it accordingly.
Moreover, the change feed processor can dynamically adjust a container's scale if the container's throughput or storage increases. When your container grows, the change feed processor transparently handles the scenario by dynamically increasing the leases and distributing the new leases among existing instances.
Starting time
By default, when a change feed processor starts for the first time, it initializes the lease container and starts its processing life cycle. Any changes that happened in the monitored container before the change feed processor is initialized for the first time aren't detected.
Reading from a previous date and time
It's possible to initialize the change feed processor to read changes starting at a specific date and time by passing an instance of DateTime
to the WithStartTime
builder extension:
namespace Cosmos.Samples.ChangeFeed
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Extensions.Configuration;
using ChangeFeedProcessorLibrary = Microsoft.Azure.Documents.ChangeFeedProcessor;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://learn.microsoft.com/azure/cosmos-db/create-cosmosdb-resources-portal
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
// Sample - demonstrates common Change Feed operations
//
// 1. Listening for changes that happen after a Change Feed Processor is started.
//
// 2. Listening for changes that happen after a certain point in time.
//
// 3. Listening for changes that happen since the container was created.
//
// 4. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification
//
// 5. Generate Estimator metrics to expose current Change Feed Processor progress on demand
//
// 6. Code migration template from existing Change Feed Processor library V2
//
// 7. Error handling and advanced logging
//-----------------------------------------------------------------------------------------------------------
class Program
{
private static readonly string monitoredContainer = "monitored";
private static readonly string leasesContainer = "leases";
private static readonly string partitionKeyPath = "/id";
// Async main requires c# 7.1 which is set in the csproj with the LangVersion attribute
static async Task Main(string[] args)
{
try
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddJsonFile("appSettings.json")
.Build();
string endpoint = configuration["EndPointUrl"];
if (string.IsNullOrEmpty(endpoint))
{
throw new ArgumentNullException("Please specify a valid endpoint in the appSettings.json");
}
string authKey = configuration["AuthorizationKey"];
if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key"))
{
throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json");
}
using (CosmosClient client = new CosmosClient(endpoint, authKey))
{
Console.WriteLine($"\n1. Listening for changes that happen after a Change Feed Processor is started.");
await Program.RunBasicChangeFeed("changefeed-basic", client);
Console.WriteLine($"\n2. Listening for changes that happen after a certain point in time.");
await Program.RunStartTimeChangeFeed("changefeed-time", client);
Console.WriteLine($"\n3. Listening for changes that happen since the container was created.");
await Program.RunStartFromBeginningChangeFeed("changefeed-beginning", client);
Console.WriteLine($"\n4. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification.");
await Program.RunEstimatorChangeFeed("changefeed-estimator", client);
Console.WriteLine($"\n5. Generate Estimator metrics to expose current Change Feed Processor progress on demand.");
await Program.RunEstimatorPullChangeFeed("changefeed-estimator-detailed", client);
Console.WriteLine($"\n6. Code migration template from existing Change Feed Processor library V2.");
await Program.RunMigrationSample("changefeed-migration", client, configuration);
Console.WriteLine($"\n7. Error handling and advanced logging.");
await Program.RunWithNotifications("changefeed-logging", client);
}
}
finally
{
Console.WriteLine("End of demo, press any key to exit.");
Console.ReadKey();
}
}
/// <summary>
/// Basic change feed functionality.
/// </summary>
/// <remarks>
/// When StartAsync is called, the Change Feed Processor starts an initialization process that can take several milliseconds,
/// in which it starts connections and initializes locks in the leases container.
/// </remarks>
public static async Task RunBasicChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <BasicInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBasic", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </BasicInitialization>
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, monitoredContainer);
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// StartTime will make the Change Feed Processor start processing changes at a particular point in time, all previous changes are ignored.
/// </summary>
/// <remarks>
/// StartTime only works if the leaseContainer is empty or contains no leases for the particular processor name.
/// </remarks>
public static async Task RunStartTimeChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 5 items that will not be picked up.");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine($"Items generated at {DateTime.UtcNow}");
// Generate a future point in time
await Task.Delay(2000);
DateTime particularPointInTime = DateTime.UtcNow;
Console.WriteLine("Generating 5 items that will be picked up by the delegate...");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
// <TimeInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(particularPointInTime)
.Build();
// </TimeInitialization>
Console.WriteLine($"Starting Change Feed Processor with changes after {particularPointInTime}...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Reading the Change Feed since the beginning of time.
/// </summary>
/// <remarks>
/// StartTime only works if the leaseContainer is empty or contains no leases for the particular processor name.
/// </remarks>
public static async Task RunStartFromBeginningChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// <StartFromBeginningInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
// </StartFromBeginningInitialization>
Console.WriteLine($"Starting Change Feed Processor with changes since the beginning...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// </remarks>
public static async Task RunEstimatorChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <StartProcessorEstimator>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimator>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
// <StartEstimator>
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
// </StartEstimator>
Console.WriteLine($"Starting Change Feed Estimator...");
await changeFeedEstimator.StartAsync();
Console.WriteLine("Change Feed Estimator started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
await changeFeedEstimator.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator with the detailed iterator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// The iterator exposes detailed, per-lease, information on estimation and ownership.
/// </remarks>
public static async Task RunEstimatorPullChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <StartProcessorEstimatorDetailed>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimatorDetailed>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait some seconds for instances to acquire leases
await Task.Delay(5000);
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
// <StartEstimatorDetailed>
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
// </StartEstimatorDetailed>
// <GetIteratorEstimatorDetailed>
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
// </GetIteratorEstimatorDetailed>
Console.WriteLine("Stopping processor to show how the lag increases if no processing is happening.");
await changeFeedProcessor.StopAsync();
// Wait for processor to shutdown completely so the next items generate lag
await Task.Delay(5000);
Console.WriteLine("Generating 10 items that will be seen by the Estimator...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIteratorAfter = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIteratorAfter.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIteratorAfter.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
// Host ownership should be empty as we have already stopped the estimator
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
}
/// <summary>
/// Example of a code migration template from Change Feed Processor V2 to SDK V3.
/// </summary>
/// <returns></returns>
public static async Task RunMigrationSample(
string databaseId,
CosmosClient client,
IConfigurationRoot configuration)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 10 items that will be picked up by the old Change Feed Processor library...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// This is how you would initialize the processor in V2
// <ChangeFeedProcessorLibrary>
ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.monitoredContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.leasesContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
.WithHostName("consoleHost")
.WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
{
StartFromBeginning = true,
LeasePrefix = "MyLeasePrefix",
MaxItemCount = 10,
FeedPollDelay = TimeSpan.FromSeconds(1)
})
.WithFeedCollection(monitoredCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithObserver<ChangeFeedObserver>()
.BuildAsync();
// </ChangeFeedProcessorLibrary>
await oldChangeFeedProcessor.StartAsync();
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Now we will stop the V2 Processor and start a V3 with the same parameters to pick up from the same state, press any key to continue...");
Console.ReadKey();
await oldChangeFeedProcessor.StopAsync();
Console.WriteLine("Generating 5 items that will be picked up by the new Change Feed Processor...");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
// This is how you would do the same initialization in V3
// <ChangeFeedProcessorMigrated>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithMaxItems(10)
.WithPollInterval(TimeSpan.FromSeconds(1))
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
// </ChangeFeedProcessorMigrated>
await changeFeedProcessor.StartAsync();
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Setup notification APIs for events.
/// </summary>
public static async Task RunWithNotifications(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim();
Container.ChangeFeedHandler<ToDoItem> handleChanges = (ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken) =>
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken} but throwing an exception to bubble to notifications.");
manualResetEventSlim.Set();
throw new Exception("This is an unhandled exception from inside the delegate");
};
// <StartWithNotifications>
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartWithNotifications>
Console.WriteLine($"Starting Change Feed Processor with logging enabled...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
manualResetEventSlim.Wait();
await Task.Delay(1000);
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
// <Delegate>
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate work
await Task.Delay(1);
}
}
// </Delegate>
/// <summary>
/// The delegate for the Estimator receives a numeric representation of items pending to be read.
/// </summary>
// <EstimationDelegate>
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
// </EstimationDelegate>
private static async Task GenerateItems(
int itemsToInsert,
Container container)
{
await Task.Delay(500);
for (int i = 0; i < itemsToInsert; i++)
{
string id = Guid.NewGuid().ToString();
await container.CreateItemAsync<ToDoItem>(
new ToDoItem()
{
id = id,
creationTime = DateTime.UtcNow
},
new PartitionKey(id));
}
}
private static async Task InitializeAsync(
string databaseId,
CosmosClient client)
{
Database database;
// Recreate database
try
{
database = await client.GetDatabase(databaseId).ReadAsync();
await database.DeleteAsync();
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
}
database = await client.CreateDatabaseAsync(databaseId);
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(Program.monitoredContainer, Program.partitionKeyPath));
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(Program.leasesContainer, Program.partitionKeyPath));
}
}
// <Model>
public class ToDoItem
{
public string id { get; set; }
public DateTime creationTime { get; set; }
}
// </Model>
internal class ChangeFeedObserver : IChangeFeedObserver
{
public Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
{
return Task.CompletedTask;
}
public Task OpenAsync(IChangeFeedObserverContext context)
{
return Task.CompletedTask;
}
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Microsoft.Azure.Documents.Document> docs, CancellationToken cancellationToken)
{
foreach (Microsoft.Azure.Documents.Document doc in docs)
{
Console.WriteLine($"\t[OLD Processor] Detected operation for item with id {doc.Id}, created at {doc.GetPropertyValue<DateTime>("creationTime")}.");
}
return Task.CompletedTask;
}
}
}
The change feed processor is initialized for that specific date and time, and it starts to read the changes that happened afterward.
Reading from the beginning
In other scenarios, like in data migrations or if you're analyzing the entire history of a container, you need to read the change feed from the beginning of that container's lifetime. You can use WithStartTime
on the builder extension, but pass DateTime.MinValue.ToUniversalTime()
, which generates the UTC representation of the minimum DateTime
value like in this example:
namespace Cosmos.Samples.ChangeFeed
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing;
using Microsoft.Extensions.Configuration;
using ChangeFeedProcessorLibrary = Microsoft.Azure.Documents.ChangeFeedProcessor;
// ----------------------------------------------------------------------------------------------------------
// Prerequisites -
//
// 1. An Azure Cosmos account -
// https://learn.microsoft.com/azure/cosmos-db/create-cosmosdb-resources-portal
//
// 2. Microsoft.Azure.Cosmos NuGet package -
// http://www.nuget.org/packages/Microsoft.Azure.Cosmos/
// ----------------------------------------------------------------------------------------------------------
// Sample - demonstrates common Change Feed operations
//
// 1. Listening for changes that happen after a Change Feed Processor is started.
//
// 2. Listening for changes that happen after a certain point in time.
//
// 3. Listening for changes that happen since the container was created.
//
// 4. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification
//
// 5. Generate Estimator metrics to expose current Change Feed Processor progress on demand
//
// 6. Code migration template from existing Change Feed Processor library V2
//
// 7. Error handling and advanced logging
//-----------------------------------------------------------------------------------------------------------
class Program
{
private static readonly string monitoredContainer = "monitored";
private static readonly string leasesContainer = "leases";
private static readonly string partitionKeyPath = "/id";
// Async main requires c# 7.1 which is set in the csproj with the LangVersion attribute
static async Task Main(string[] args)
{
try
{
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddJsonFile("appSettings.json")
.Build();
string endpoint = configuration["EndPointUrl"];
if (string.IsNullOrEmpty(endpoint))
{
throw new ArgumentNullException("Please specify a valid endpoint in the appSettings.json");
}
string authKey = configuration["AuthorizationKey"];
if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key"))
{
throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json");
}
using (CosmosClient client = new CosmosClient(endpoint, authKey))
{
Console.WriteLine($"\n1. Listening for changes that happen after a Change Feed Processor is started.");
await Program.RunBasicChangeFeed("changefeed-basic", client);
Console.WriteLine($"\n2. Listening for changes that happen after a certain point in time.");
await Program.RunStartTimeChangeFeed("changefeed-time", client);
Console.WriteLine($"\n3. Listening for changes that happen since the container was created.");
await Program.RunStartFromBeginningChangeFeed("changefeed-beginning", client);
Console.WriteLine($"\n4. Generate Estimator metrics to expose current Change Feed Processor progress as a push notification.");
await Program.RunEstimatorChangeFeed("changefeed-estimator", client);
Console.WriteLine($"\n5. Generate Estimator metrics to expose current Change Feed Processor progress on demand.");
await Program.RunEstimatorPullChangeFeed("changefeed-estimator-detailed", client);
Console.WriteLine($"\n6. Code migration template from existing Change Feed Processor library V2.");
await Program.RunMigrationSample("changefeed-migration", client, configuration);
Console.WriteLine($"\n7. Error handling and advanced logging.");
await Program.RunWithNotifications("changefeed-logging", client);
}
}
finally
{
Console.WriteLine("End of demo, press any key to exit.");
Console.ReadKey();
}
}
/// <summary>
/// Basic change feed functionality.
/// </summary>
/// <remarks>
/// When StartAsync is called, the Change Feed Processor starts an initialization process that can take several milliseconds,
/// in which it starts connections and initializes locks in the leases container.
/// </remarks>
public static async Task RunBasicChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <BasicInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBasic", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </BasicInitialization>
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, monitoredContainer);
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// StartTime will make the Change Feed Processor start processing changes at a particular point in time, all previous changes are ignored.
/// </summary>
/// <remarks>
/// StartTime only works if the leaseContainer is empty or contains no leases for the particular processor name.
/// </remarks>
public static async Task RunStartTimeChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 5 items that will not be picked up.");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine($"Items generated at {DateTime.UtcNow}");
// Generate a future point in time
await Task.Delay(2000);
DateTime particularPointInTime = DateTime.UtcNow;
Console.WriteLine("Generating 5 items that will be picked up by the delegate...");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
// <TimeInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(particularPointInTime)
.Build();
// </TimeInitialization>
Console.WriteLine($"Starting Change Feed Processor with changes after {particularPointInTime}...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Reading the Change Feed since the beginning of time.
/// </summary>
/// <remarks>
/// StartTime only works if the leaseContainer is empty or contains no leases for the particular processor name.
/// </remarks>
public static async Task RunStartFromBeginningChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// <StartFromBeginningInitialization>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
// </StartFromBeginningInitialization>
Console.WriteLine($"Starting Change Feed Processor with changes since the beginning...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// </remarks>
public static async Task RunEstimatorChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <StartProcessorEstimator>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimator>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
// <StartEstimator>
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
// </StartEstimator>
Console.WriteLine($"Starting Change Feed Estimator...");
await changeFeedEstimator.StartAsync();
Console.WriteLine("Change Feed Estimator started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate and reported by the Estimator...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
await changeFeedEstimator.StopAsync();
}
/// <summary>
/// Exposing progress with the Estimator with the detailed iterator.
/// </summary>
/// <remarks>
/// The Estimator uses the same processorName and the same lease configuration as the existing processor to measure progress.
/// The iterator exposes detailed, per-lease, information on estimation and ownership.
/// </remarks>
public static async Task RunEstimatorPullChangeFeed(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
// <StartProcessorEstimatorDetailed>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartProcessorEstimatorDetailed>
Console.WriteLine($"Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
// Wait some seconds for instances to acquire leases
await Task.Delay(5000);
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
// <StartEstimatorDetailed>
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
// </StartEstimatorDetailed>
// <GetIteratorEstimatorDetailed>
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
// </GetIteratorEstimatorDetailed>
Console.WriteLine("Stopping processor to show how the lag increases if no processing is happening.");
await changeFeedProcessor.StopAsync();
// Wait for processor to shutdown completely so the next items generate lag
await Task.Delay(5000);
Console.WriteLine("Generating 10 items that will be seen by the Estimator...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIteratorAfter = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIteratorAfter.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIteratorAfter.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
// Host ownership should be empty as we have already stopped the estimator
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
}
/// <summary>
/// Example of a code migration template from Change Feed Processor V2 to SDK V3.
/// </summary>
/// <returns></returns>
public static async Task RunMigrationSample(
string databaseId,
CosmosClient client,
IConfigurationRoot configuration)
{
await Program.InitializeAsync(databaseId, client);
Console.WriteLine("Generating 10 items that will be picked up by the old Change Feed Processor library...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// This is how you would initialize the processor in V2
// <ChangeFeedProcessorLibrary>
ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.monitoredContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.leasesContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
.WithHostName("consoleHost")
.WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
{
StartFromBeginning = true,
LeasePrefix = "MyLeasePrefix",
MaxItemCount = 10,
FeedPollDelay = TimeSpan.FromSeconds(1)
})
.WithFeedCollection(monitoredCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithObserver<ChangeFeedObserver>()
.BuildAsync();
// </ChangeFeedProcessorLibrary>
await oldChangeFeedProcessor.StartAsync();
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Now we will stop the V2 Processor and start a V3 with the same parameters to pick up from the same state, press any key to continue...");
Console.ReadKey();
await oldChangeFeedProcessor.StopAsync();
Console.WriteLine("Generating 5 items that will be picked up by the new Change Feed Processor...");
await Program.GenerateItems(5, client.GetContainer(databaseId, Program.monitoredContainer));
// This is how you would do the same initialization in V3
// <ChangeFeedProcessorMigrated>
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithMaxItems(10)
.WithPollInterval(TimeSpan.FromSeconds(1))
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
// </ChangeFeedProcessorMigrated>
await changeFeedProcessor.StartAsync();
// Wait random time for the delegate to output all messages after initialization is done
await Task.Delay(5000);
Console.WriteLine("Press any key to continue with the next demo...");
Console.ReadKey();
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// Setup notification APIs for events.
/// </summary>
public static async Task RunWithNotifications(
string databaseId,
CosmosClient client)
{
await Program.InitializeAsync(databaseId, client);
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim();
Container.ChangeFeedHandler<ToDoItem> handleChanges = (ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken) =>
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken} but throwing an exception to bubble to notifications.");
manualResetEventSlim.Set();
throw new Exception("This is an unhandled exception from inside the delegate");
};
// <StartWithNotifications>
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
// </StartWithNotifications>
Console.WriteLine($"Starting Change Feed Processor with logging enabled...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
Console.WriteLine("Generating 10 items that will be picked up by the delegate...");
await Program.GenerateItems(10, client.GetContainer(databaseId, Program.monitoredContainer));
// Wait random time for the delegate to output all messages after initialization is done
manualResetEventSlim.Wait();
await Task.Delay(1000);
await changeFeedProcessor.StopAsync();
}
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
// <Delegate>
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate work
await Task.Delay(1);
}
}
// </Delegate>
/// <summary>
/// The delegate for the Estimator receives a numeric representation of items pending to be read.
/// </summary>
// <EstimationDelegate>
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
// </EstimationDelegate>
private static async Task GenerateItems(
int itemsToInsert,
Container container)
{
await Task.Delay(500);
for (int i = 0; i < itemsToInsert; i++)
{
string id = Guid.NewGuid().ToString();
await container.CreateItemAsync<ToDoItem>(
new ToDoItem()
{
id = id,
creationTime = DateTime.UtcNow
},
new PartitionKey(id));
}
}
private static async Task InitializeAsync(
string databaseId,
CosmosClient client)
{
Database database;
// Recreate database
try
{
database = await client.GetDatabase(databaseId).ReadAsync();
await database.DeleteAsync();
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
}
database = await client.CreateDatabaseAsync(databaseId);
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(Program.monitoredContainer, Program.partitionKeyPath));
await database.CreateContainerIfNotExistsAsync(new ContainerProperties(Program.leasesContainer, Program.partitionKeyPath));
}
}
// <Model>
public class ToDoItem
{
public string id { get; set; }
public DateTime creationTime { get; set; }
}
// </Model>
internal class ChangeFeedObserver : IChangeFeedObserver
{
public Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
{
return Task.CompletedTask;
}
public Task OpenAsync(IChangeFeedObserverContext context)
{
return Task.CompletedTask;
}
public Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList<Microsoft.Azure.Documents.Document> docs, CancellationToken cancellationToken)
{
foreach (Microsoft.Azure.Documents.Document doc in docs)
{
Console.WriteLine($"\t[OLD Processor] Detected operation for item with id {doc.Id}, created at {doc.GetPropertyValue<DateTime>("creationTime")}.");
}
return Task.CompletedTask;
}
}
}
The change feed processor is initialized, and it starts reading changes from the beginning of the lifetime of the container.
Note
These customization options work only to set up the starting point in time of the change feed processor. After the lease container is initialized for the first time, changing these options has no effect.
Customizing the starting point is only available for latest version change feed mode. When using all versions and deletes mode you must start reading from the time the processor is started, or resume from a prior lease state that is within the continuous backup retention period of your account.
Change feed and provisioned throughput
Change feed read operations on the monitored container consume request units. Make sure that your monitored container isn't experiencing throttling. Throttling adds delays in receiving change feed events on your processors.
Operations on the lease container (updating and maintaining state) consume request units. The higher the number of instances that use the same lease container, the higher the potential consumption of request units. Make sure that your lease container isn't experiencing throttling. Throttling adds delays in receiving change feed events. Throttling can even completely end processing.
Share the lease container
You can share a lease container across multiple deployment units. In a shared lease container, each deployment unit listens to a different monitored container or has a different value for processorName
. In this configuration, each deployment unit maintains an independent state on the lease container. Review the request unit consumption on a lease container to make sure that the provisioned throughput is enough for all the deployment units.
Advanced lease configuration
Three key configurations can affect how the change feed processor works. Each configuration affects the request unit consumption on the lease container. You can set one of these configurations when you create the change feed processor, but use them carefully:
- Lease Acquire: By default, every 17 seconds. A host periodically checks the state of the lease store and consider acquiring leases as part of the dynamic scaling process. This process is done by executing a Query on the lease container. Reducing this value makes rebalancing and acquiring leases faster, but it increases request unit consumption on the lease container.
- Lease Expiration: By default, 60 seconds. Defines the maximum amount of time that a lease can exist without any renewal activity before it's acquired by another host. When a host crashes, the leases it owned are picked up by other hosts after this period of time plus the configured renewal interval. Reducing this value makes recovering after a host crash faster, but the expiration value should never be lower than the renewal interval.
- Lease Renewal: By default, every 13 seconds. A host that owns a lease periodically renews the lease, even if there are no new changes to consume. This process is done by executing a Replace on the lease. Reducing this value lowers the time that's required to detect leases lost by a host crashing, but it increases request unit consumption on the lease container.
Where to host the change feed processor
The change feed processor can be hosted in any platform that supports long-running processes or tasks. Here are some examples:
- A continuous running instance of WebJobs in Azure App Service
- A process in an instance of Azure Virtual Machines
- A background job in Azure Kubernetes Service
- A serverless function in Azure Functions
- An ASP.NET hosted service
Although the change feed processor can run in short-lived environments because the lease container maintains the state, the startup cycle of these environments adds delays to the time it takes to receive notifications (due to the overhead of starting the processor every time the environment is started).
Role-based access requirements
When using Microsoft Entra ID as authentication mechanism, make sure the identity has the proper permissions:
- On the monitored container:
Microsoft.DocumentDB/databaseAccounts/readMetadata
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
- On the lease container:
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery
Additional resources
- Azure Cosmos DB SDK
- Complete sample application on GitHub
- Additional usage samples on GitHub
- Azure Cosmos DB workshop labs for change feed processor
Next steps
Learn more about the change feed processor in the following articles: