处理 Azure Blob 存储中的更改源
更改源提供存储帐户中 Blob 和 Blob 元数据发生的所有更改的事务日志。 本文介绍如何使用 Blob 更改源处理器库读取更改源记录。
若要详细了解更改源,请参阅 Azure Blob 存储中的更改源。
设置项目
本部分逐步指导如何准备一个项目,使其与适用于 .NET 的 Blobs Change Feed 客户端库配合使用。
安装包
从项目目录中,使用 dotnet add package
命令安装适用于 .NET 的 Azure 存储 Blob 更改源客户端库的包。 在此示例中,我们在命令中添加 --prerelease
标志来安装最新的预览版本。
dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease
本文中的代码示例还使用 Azure Blob 存储包和 Azure 标识包。
dotnet add package Azure.Identity
dotnet add package Azure.Storage.Blobs
添加 using
指令
将以下 using
指令添加到代码文件中:
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.ChangeFeed;
创建客户端对象
若要将应用程序连接到 Blob 存储,请创建 BlobServiceClient
类的实例。 以下示例演示如何使用 DefaultAzureCredential
创建客户端对象以进行授权。 若要了解详细信息,请参阅授予访问权限并连接到 Blob 存储。 若要使用更改源,需有 Azure RBAC 内置角色“存储 Blob 数据读取者”或更高级别的角色。
// TODO: Replace <storage-account-name> with the name of your storage account
string accountName = "<storage-account-name>";
BlobServiceClient client = new(
new Uri($"https://{accountName}.blob.core.chinacloudapi.cn"),
new DefaultAzureCredential());
客户端对象作为参数传递给本文所示的一些方法。
读取更改源中的记录
注意
更改源是存储帐户中的不可变只读实体。 任意数量的应用程序都可以同时独立读取和处理更改源,具体取决于应用程序自身的需求。 当应用程序读取记录时,不会从更改源中删除这些记录。 每个使用方读取器的读取或迭代状态是独立的,仅由应用程序维护。
以下代码示例遍历更改源中的所有记录,将它们添加到列表中,然后返回更改源事件的列表:
public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(BlobServiceClient client)
{
// Create a new BlobChangeFeedClient
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = [];
// Get all the events in the change feed
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
以下代码示例从更改源事件列表中输出一些值:
public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
}
从已保存的位置继续读取记录
你可以选择保存你在更改源中的读取位置,以后可以继续循环访问记录。 可以通过获取更改源游标来保存读取位置。 该游标是一个字符串,你的应用程序可以采用适合你的应用程序设计的任何方式保存该字符串(例如,保存到文件或数据库中)。
此示例将循环访问更改源中的所有记录,将它们添加到列表中,然后保存游标。 列表和游标将返回到调用方。
public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync(
BlobServiceClient client,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor)
.AsPages(pageSizeHint: 10)
.GetAsyncEnumerator();
await enumerator.MoveNextAsync();
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}
// Update the change feed cursor. The cursor is not required to get each page of events,
// it's intended to be saved and used to resume iterating at a later date.
cursor = enumerator.Current.ContinuationToken;
return (cursor, changeFeedEvents);
}
记录的流式处理
你可以选择在更改源记录提交到更改源时处理这些记录。 请参阅规范。 更改事件以平均 60 秒的时间间隔发布到更改源。 在指定轮询时间间隔时,建议使用此时间间隔来轮询新更改。
此示例定期轮询更改。 如果存在更改记录,此代码将处理这些记录并保存更改源游标。 这样一来,如果进程停止并再次启动,则应用程序可以使用游标继续从上次离开的位置处理记录。 此示例将游标保存到本地文件进行演示,但你的应用程序可以将其保存为最适合你的方案的任何形式。
public async Task ChangeFeedStreamAsync(
BlobServiceClient client,
int waitTimeMs,
string cursor)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
while (true)
{
IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
.GetChangesAsync(continuationToken: cursor).AsPages().GetAsyncEnumerator();
while (true)
{
var result = await enumerator.MoveNextAsync();
if (result)
{
foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
{
string subject = changeFeedEvent.Subject;
string eventType = changeFeedEvent.EventType.ToString();
BlobOperationName operationName = changeFeedEvent.EventData.BlobOperationName;
Console.WriteLine("Subject: " + subject + "\n" +
"Event Type: " + eventType + "\n" +
"Operation: " + operationName.ToString());
}
// Helper method to save cursor
SaveCursor(enumerator.Current.ContinuationToken);
}
else
{
break;
}
}
await Task.Delay(waitTimeMs);
}
}
void SaveCursor(string cursor)
{
// Specify the path to the file where you want to save the cursor
string filePath = "path/to/cursor.txt";
// Write the cursor value to the file
File.WriteAllText(filePath, cursor);
}
读取特定时间范围内的记录
可以读取特定时间范围内的记录。 此示例遍历更改源中属于特定日期和时间范围内的所有记录,将它们添加到列表并返回列表:
async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(BlobServiceClient client)
{
// Get a new change feed client
BlobChangeFeedClient changeFeedClient = client.GetChangeFeedClient();
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
// Create the start and end time. The change feed client will round start time down to
// the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = new DateTimeOffset(2024, 3, 1, 0, 0, 0, TimeSpan.Zero);
DateTimeOffset endTime = new DateTimeOffset(2024, 6, 1, 0, 0, 0, TimeSpan.Zero);
// You can also provide just a start or end time.
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
start: startTime,
end: endTime))
{
changeFeedEvents.Add(changeFeedEvent);
}
return changeFeedEvents;
}
你提供的开始时间将向下舍入到最接近的小时,结束时间将向上舍入到最接近的小时。 用户可能会看到在开始时间之前和结束时间之后发生的事件。 在开始和结束时间之间发生的某些事件也可能不会显示。 这是因为事件可能会记录在开始时间之前的小时内,也可能会记录在结束时间之后的小时内。