处理 Azure Blob 存储中的更改源Process change feed in Azure Blob Storage

更改源提供存储帐户中 Blob 和 Blob 元数据发生的所有更改的事务日志。Change feed provides transaction logs of all the changes that occur to the blobs and the blob metadata in your storage account. 本文介绍如何使用 Blob 更改源处理器库读取更改源记录。This article shows you how to read change feed records by using the blob change feed processor library.

若要详细了解更改源,请参阅 Azure Blob 存储中的更改源To learn more about the change feed, see Change feed in Azure Blob Storage.

获取 Blob 更改源处理器库Get the blob change feed processor library

  1. 打开一个命令窗口(例如:Windows PowerShell)。Open a command window (For example: Windows PowerShell).
  2. 从你的项目目录中,安装 Azure.Storage.Blobs.Changefeed NuGet 包From your project directory, install the Azure.Storage.Blobs.Changefeed NuGet package.
dotnet add package Azure.Storage.Blobs --version 12.5.1
dotnet add package Azure.Storage.Blobs.ChangeFeed --version 12.0.0-preview.4

读取记录Read records

备注

更改源是存储帐户中的不可变只读实体。The change feed is an immutable and read-only entity in your storage account. 任意数量的应用程序都可以同时独立读取和处理更改源,具体取决于应用程序自身的需求。Any number of applications can read and process the change feed simultaneously and independently at their own convenience. 当应用程序读取记录时,不会从更改源中删除这些记录。Records aren't removed from the change feed when an application reads them. 每个使用方读取器的读取或迭代状态是独立的,仅由应用程序维护。The read or iteration state of each consuming reader is independent and maintained by your application only.

此示例将循环访问更改源中的所有记录,将它们添加到列表中,然后将该列表返回到调用方。This example iterates through all records in the change feed, adds them to a list, and then returns that list to the caller.

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Get all the events in the change feed. 
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

此示例将列表中每个记录的几个值输出到控制台。This example prints to the console a few values from each record in the list.

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        string api = changeFeedEvent.EventData.Api;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Api: " + api);
    }
}

从已保存的位置继续读取记录Resume reading records from a saved position

你可以选择保存你在更改源中的读取位置,以后可以继续循环访问记录。You can choose to save your read position in the change feed, and then resume iterating through the records at a future time. 可以通过获取更改源游标来保存读取位置。You can save the read position by getting the change feed cursor. 该游标是一个字符串,你的应用程序可以采用适合你的应用程序设计的任何方式保存该字符串(例如,保存到文件或数据库中)。The cursor is a string and your application can save that string in any way that makes sense for your application's design (For example: to a file, or database).

此示例将循环访问更改源中的所有记录,将它们添加到列表中,然后保存游标。This example iterates through all records in the change feed, adds them to a list, and saves the cursor. 列表和游标将返回到调用方。The list and the cursor are returned to the caller.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync
    (string connectionString,  string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {
    
        changeFeedEvents.Add(changeFeedEvent);             
    }
    
    // Update the change feed cursor.  The cursor is not required to get each page of events,
    // it is intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

记录的流式处理Stream processing of records

你可以选择在更改源记录提交到更改源时处理这些记录。You can choose to process change feed records as they are committed to the change feed. 请参阅规范See Specifications. 更改事件以平均 60 秒的时间间隔发布到更改源。The change events are published to the change feed at a period of 60 seconds on average. 在指定轮询时间间隔时,建议使用此时间间隔来轮询新更改。We recommend that you poll for new changes with this period in mind when specifying your poll interval.

此示例定期轮询更改。This example periodically polls for changes. 如果存在更改记录,此代码将处理这些记录并保存更改源游标。If change records exist, this code processes those records and saves change feed cursor. 这样一来,如果进程停止并再次启动,则应用程序可以使用游标继续从上次离开的位置处理记录。That way if the process is stopped and then started again, the application can use the cursor to resume processing records where it last left off. 此示例将游标保存到一个本地应用程序配置文件,但你的应用程序可以将其保存为最适合你的方案的任何形式。This example saves the cursor to a local application configuration file, but your application can save it in any form that makes the most sense for your scenario.

public async Task ChangeFeedStreamAsync
    (string connectionString, int waitTimeMs, string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor).AsPages().GetAsyncEnumerator();

        while (true) 
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    string api = changeFeedEvent.EventData.Api;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Api: " + api);
                }
            
                // helper method to save cursor. 
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }

}

public void SaveCursor(string cursor)
{
    System.Configuration.Configuration config = 
        ConfigurationManager.OpenExeConfiguration
        (ConfigurationUserLevel.None);

    config.AppSettings.Settings.Clear();
    config.AppSettings.Settings.Add("Cursor", cursor);
    config.Save(ConfigurationSaveMode.Modified);
}

读取时间范围内的记录Reading records within a time range

可以读取特定时间范围内的记录。You can read records that fall within a specific time range. 此示例循环访问更改源中处于 2020 年 3 月 2 日下午 3:00 到 2020 年 8 月 7 日早上 2:00 之间的所有记录,将它们添加到列表中,然后将该列表返回到调用方。This example iterates through all records in the change feed that fall between 3:00 PM on March 2 2020 and 2:00 AM on August 7 2020, adds them to a list, and then returns that list to the caller.

根据时间范围选择段Selecting segments for a time range

public async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2020, 3, 2, 15, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2020, 8, 7, 2, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

你提供的开始时间将向下舍入到最接近的小时,结束时间将向上舍入到最接近的小时。The start time that you provide is rounded down to the nearest hour and the end time is rounded up to the nearest hour. 用户可能会看到在开始时间之前和结束时间之后发生的事件。It's possible that users might see events that occurred before the start time and after the end time. 在开始和结束时间之间发生的某些事件也可能不会显示。It's also possible that some events that occur between the start and end time won't appear. 这是因为事件可能会记录在开始时间之前的小时内,也可能会记录在结束时间之后的小时内。That's because events might be recorded during the hour previous to the start time or during the hour after the end time.

后续步骤Next steps

详细了解更改源日志。Learn more about change feed logs. 请参阅 Azure Blob 存储中的更改源See Change feed in Azure Blob Storage