扇出/扇入情景

扇出/扇入 并行运行多个函数,然后聚合结果。 本文演示了一个示例,该示例使用 Durable Functions 将应用的某些或所有网站内容备份到Azure Storage。

先决条件

扇出/扇入 并行运行多个操作,然后汇总结果。 本文介绍如何使用用于 .NET、JavaScript、Python 和Java的 Durable Task SDK 实现模式。

场景概述

在此示例中,函数会将指定目录下的所有文件(递归地)上传到 Blob 存储。 它们还会计算上传的总字节数。

单个函数可以处理所有内容,但无法缩放。 单个函数执行在一个虚拟机(VM)上运行,因此吞吐量仅限于该 VM。 可靠性是另一个问题。 如果进程在中途失败,或者需要 5 分钟以上,则备份可能会以部分完成的状态结束。 然后重启备份。

更可靠的方法是使用两个单独的函数:一个枚举文件并将文件名添加到队列,另一个从队列读取并将文件上传到 Blob 存储。 此方法可提高吞吐量和可靠性,但需要设置和管理队列。 更重要的是,此方法增加了状态管理和协调的复杂性,例如报告上传的字节总数。

Durable Functions提供所有这些优势,开销很少。

在以下示例中,业务流程协调程序并行处理多个工作项,然后聚合结果。 在需要以下情况时,此模式非常有用:

  • 处理一批项,其中每个项可以独立处理
  • 跨多台计算机分配工作以提高吞吐量
  • 聚合来自所有并行操作的结果

如果没有扇出/扇入模式,要么按顺序处理项目,但这样会限制吞吐量,要么自己管理队列和协调逻辑,而这样会增加复杂性。

Durable Task SDK 处理并行化和协调,因此模式易于实现。

功能

本文介绍示例应用中的函数:

  • E2_BackupSiteContent:一个 协调器函数,它调用 E2_GetFileList 获取要备份的文件列表,然后为每个文件调用 E2_CopyFileToBlob
  • E2_GetFileList:一个活动函数,它返回目录中的文件列表。
  • E2_CopyFileToBlob:将单个文件备份到Azure Blob Storage的活动函数。

本文介绍示例代码中的组件:

  • ParallelProcessingOrchestrationfanOutFanInOrchestratorfan_out_fan_in_orchestratorFanOutFanIn_WordCount:一个业务流程协调程序,将工作并行扇出到多个活动,等待所有活动完成,然后通过聚合结果进行扇入。
  • ProcessWorkItemActivityprocessWorkItemprocess_work_itemCountWords:处理单个工作项的活动。
  • AggregateResultsActivityaggregateResultsaggregate_results:一个活动,用于聚合所有并行操作的结果。

协调器

此业务流程协调程序函数执行如下作业:

  1. rootDirectory 作为输入。
  2. 调用某个函数来获取 rootDirectory 下的文件的递归列表。
  3. 进行并行函数调用,以将每个文件上传到Azure Blob Storage。
  4. 等待所有上传完成。
  5. 返回上传到Azure Blob Storage的总字节数。

下面的代码实现协调器函数:

        [FunctionName("E2_BackupSiteContent")]
        public static async Task<long> Run(
            [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
        {
            string rootDirectory = backupContext.GetInput<string>()?.Trim();
            if (string.IsNullOrEmpty(rootDirectory))
            {
                rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
            }

            string[] files = await backupContext.CallActivityAsync<string[]>(
                "E2_GetFileList",
                rootDirectory);

            var tasks = new Task<long>[files.Length];
            for (int i = 0; i < files.Length; i++)
            {
                tasks[i] = backupContext.CallActivityAsync<long>(
                    "E2_CopyFileToBlob",
                    files[i]);
            }

            await Task.WhenAll(tasks);

            long totalBytes = tasks.Sum(t => t.Result);
            return totalBytes;
        }

请注意 await Task.WhenAll(tasks); 行。 代码不会等待单个对 E2_CopyFileToBlob 的调用,因此它们并行运行。 当协调器将任务数组传递给 Task.WhenAll 时,它会返回一个任务,该任务只有在所有复制操作完成后才会完成。 如果熟悉.NET中的任务并行库(TPL),则此模式很熟悉。 区别在于,这些任务可以同时在多个虚拟机上运行,而Durable Functions扩展可确保端到端执行具备对进程回收的耐受性。

协调器等待 Task.WhenAll后,所有函数调用都已完成且返回值已经生成。 每次调用 E2_CopyFileToBlob 时都会返回上传的字节数。 通过添加返回值计算总计。

协调器执行以下操作:

  1. 将工作项列表作为输入。
  2. 通过为每个工作项创建任务并并行处理它们来退出。
  3. 等待所有并行任务完成。
  4. 球迷通过聚合结果来参与进来。
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
    public override async Task<Dictionary<string, int>> RunAsync(
        TaskOrchestrationContext context, List<string> workItems)
    {
        // Step 1: Fan-out by creating a task for each work item in parallel
        var processingTasks = new List<Task<Dictionary<string, int>>>();

        foreach (string workItem in workItems)
        {
            // Create a task for each work item (fan-out)
            Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
                nameof(ProcessWorkItemActivity), workItem);
            processingTasks.Add(task);
        }

        // Step 2: Wait for all parallel tasks to complete
        Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

        // Step 3: Fan-in by aggregating all results
        Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
            nameof(AggregateResultsActivity), results);

        return aggregatedResults;
    }
}

用于 Task.WhenAll() 等待所有并行任务完成。 Durable Task SDK 确保任务可以在多台计算机上并发运行,并且执行具有恢复能力,能够应对进程重启。

活动

辅助活动函数是使用activityTrigger绑定的常规函数。

E2_GetFileList 活动函数

        [FunctionName("E2_GetFileList")]
        public static string[] GetFileList(
            [ActivityTrigger] string rootDirectory, 
            ILogger log)
        {
            log.LogInformation($"Searching for files under '{rootDirectory}'...");
            string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
            log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

            return files;
        }

注释

不要将此代码放入业务流程协调程序函数中。 协调器函数不应进行 I/O 操作,包括访问本地文件系统。 有关详细信息,请参阅业务流程协调程序函数代码约束

E2_CopyFileToBlob 活动函数

        [FunctionName("E2_CopyFileToBlob")]
        public static async Task<long> CopyFileToBlob(
            [ActivityTrigger] string filePath,
            Binder binder,
            ILogger log)
        {
            long byteCount = new FileInfo(filePath).Length;

            // strip the drive letter prefix and convert to forward slashes
            string blobPath = filePath
                .Substring(Path.GetPathRoot(filePath).Length)
                .Replace('\\', '/');
            string outputLocation = $"backups/{blobPath}";

            log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

            // copy the file contents into a blob
            using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
            using (Stream destination = await binder.BindAsync<CloudBlobStream>(
                new BlobAttribute(outputLocation, FileAccess.Write)))
            {
                await source.CopyToAsync(destination);
            }

            return byteCount;
        }

注释

若要运行示例代码,请安装 Microsoft.Azure.WebJobs.Extensions.Storage NuGet 包。

该函数使用Azure Functions绑定功能,如 Binder 参数。 本演练不需要这些详细信息。

实现从磁盘加载文件,并将内容异步流式传输到容器中同名的 backups Blob 中。 该函数返回复制到存储的字节数。 编排器使用该值来计算总和。

注释

此示例将 I/O 操作移到函数 activityTrigger 中。 该工作可以跨多台计算机运行,并支持进度检查点。 如果主机进程结束,则你知道哪些上传已完成。

活动完成工作。 与编排器不同,活动可以执行 I/O 操作和非确定性逻辑。

处理工作项活动

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    private readonly ILogger<ProcessWorkItemActivity> _logger;

    public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
    {
        _logger.LogInformation("Processing work item: {WorkItem}", workItem);

        // Process the work item (this is where you do the actual work)
        var result = new Dictionary<string, int>
        {
            { workItem, workItem.Length }
        };

        return Task.FromResult(result);
    }
}

聚合结果活动

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    private readonly ILogger<AggregateResultsActivity> _logger;

    public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(
        TaskActivityContext context, Dictionary<string, int>[] results)
    {
        _logger.LogInformation("Aggregating {Count} results", results.Length);

        // Combine all results into one aggregated result
        var aggregatedResult = new Dictionary<string, int>();

        foreach (var result in results)
        {
            foreach (var kvp in result)
            {
                aggregatedResult[kvp.Key] = kvp.Value;
            }
        }

        return Task.FromResult(aggregatedResult);
    }
}

运行示例

通过发送以下 HTTP POST 请求,在 Windows 上启动业务流程:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

或者,在 Linux 函数应用中,通过发送以下 HTTP POST 请求来启动业务流程。 Python当前在 Linux 上运行应用服务:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

注释

HttpStart 函数需要 JSON。 包括 Content-Type: application/json 标头,并将目录路径编码为 JSON 字符串。 HTTP 代码片段假定 host.json 具有从所有 HTTP 触发器函数 URL 中删除默认 api/ 前缀的条目。 在示例 host.json 文件中查找此配置的标记。

此 HTTP 请求会触发 E2_BackupSiteContent 业务流程协调程序,并将字符串 D:\home\LogFiles 作为参数传递。 响应中包含用于检查备份操作状态的链接:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

根据函数应用中的日志文件数,此作可能需要几分钟才能完成。 通过查询上一个 HTTP 202 响应标头中的 Location URL 获取最新状态:

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

在本例中,函数仍在运行。 响应显示保存在协调器状态中的输入和上次更新时间。 使用Location的标头值进行轮询以确定是否完成。 当状态为“Completed”时,响应类似于以下示例:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

响应显示编排已完成以及完成的大致时间。 该 output 字段指示业务流程上传了大约 450 KB 的日志。

运行示例:

  1. 启动持久任务计划程序模拟器 进行本地开发。

    docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
    
  2. 启动工作器 来注册协调器和活动。

  3. 运行客户端 来安排包含工作项列表的编排:

// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    nameof(ParallelProcessingOrchestration), workItems);

// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");

后续步骤

此示例显示了扇出/扇入模式。 下一个示例演示如何使用 持久计时器实现监视器模式。

本文演示扇出/扇入模式。 探索更多模式和功能:

有关 JavaScript SDK 示例,请参阅 Durable Task JavaScript SDK 示例