扇出/扇入 并行运行多个函数,然后聚合结果。 本文演示了一个示例,该示例使用 Durable Functions 将应用的某些或所有网站内容备份到Azure Storage。
先决条件
扇出/扇入 并行运行多个操作,然后汇总结果。 本文介绍如何使用用于 .NET、JavaScript、Python 和Java的 Durable Task SDK 实现模式。
场景概述
在此示例中,函数会将指定目录下的所有文件(递归地)上传到 Blob 存储。 它们还会计算上传的总字节数。
单个函数可以处理所有内容,但无法缩放。 单个函数执行在一个虚拟机(VM)上运行,因此吞吐量仅限于该 VM。 可靠性是另一个问题。 如果进程在中途失败,或者需要 5 分钟以上,则备份可能会以部分完成的状态结束。 然后重启备份。
更可靠的方法是使用两个单独的函数:一个枚举文件并将文件名添加到队列,另一个从队列读取并将文件上传到 Blob 存储。 此方法可提高吞吐量和可靠性,但需要设置和管理队列。 更重要的是,此方法增加了状态管理和协调的复杂性,例如报告上传的字节总数。
Durable Functions提供所有这些优势,同时开销极小。
在以下示例中,业务流程协调程序并行处理多个工作项,然后聚合结果。 在需要以下情况时,此模式非常有用:
- 处理一批项,其中每个项可以独立处理
- 跨多台计算机分配工作以提高吞吐量
- 聚合来自所有并行操作的结果
如果没有扇出/扇入模式,要么按顺序处理项目,但这样会限制吞吐量,要么自己管理队列和协调逻辑,而这样会增加复杂性。
Durable Task SDK 处理并行化和协调,因此模式易于实现。
功能
本文介绍示例应用中的函数:
本文介绍示例代码中的组件:
-
ParallelProcessingOrchestration、fanOutFanInOrchestrator、fan_out_fan_in_orchestrator或FanOutFanIn_WordCount:一个业务流程协调程序,将工作并行扇出到多个活动,等待所有活动完成,然后通过聚合结果进行扇入。 -
ProcessWorkItemActivity、processWorkItem、process_work_item或CountWords:处理单个工作项的活动。 -
AggregateResultsActivity、aggregateResults或aggregate_results:一个活动,用于聚合所有并行操作的结果。
协调器
此业务流程协调程序函数执行如下作业:
- 将
rootDirectory作为输入。 - 调用某个函数来获取
rootDirectory下的文件的递归列表。 - 进行并行函数调用,以将每个文件上传到Azure Blob Storage。
- 等待所有上传完成。
- 返回上传到Azure Blob Storage的总字节数。
下面的代码实现协调器函数:
独立模型
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask;
namespace SampleApp;
public static class BackupSiteContent
{
[Function("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
string rootDirectory = context.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location)!.FullName;
}
string[] files = await context.CallActivityAsync<string[]>("E2_GetFileList", rootDirectory);
Task<long>[] tasks = files
.Select(file => context.CallActivityAsync<long>("E2_CopyFileToBlob", file))
.ToArray();
long[] results = await Task.WhenAll(tasks);
return results.Sum();
}
}
请注意 await Task.WhenAll(tasks); 行。 代码不会等待单个对 E2_CopyFileToBlob 的调用,因此它们并行运行。 当协调器将任务数组传递给 Task.WhenAll 时,它会返回一个任务,该任务只有在所有复制操作完成后才会完成。 如果熟悉.NET中的任务并行库(TPL),则此模式很熟悉。 区别在于,这些任务可以同时在多个虚拟机上运行,而Durable Functions扩展确保端到端的执行过程能够抵抗进程重启的影响。
协调器等待 Task.WhenAll后,所有函数调用都已完成且返回值已经生成。 每次调用 E2_CopyFileToBlob 时都会返回上传的字节数。 通过添加返回值计算总计。
进程内模型
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
注意
进程内模型示例使用已弃用的进程内包。 前面的代码展示了推荐的 .NET 隔离工作器模型。
协调器执行以下操作:
- 将工作项列表作为输入。
- 通过为每个工作项创建任务并并行处理它们来退出。
- 等待所有并行任务完成。
- 球迷通过聚合结果来参与进来。
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
public override async Task<Dictionary<string, int>> RunAsync(
TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
var processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
}
用于 Task.WhenAll() 等待所有并行任务完成。 Durable Task SDK 确保任务可以在多台计算机上并发运行,并且执行具有恢复能力,能够应对进程重启。
活动
辅助活动函数是使用activityTrigger绑定的常规函数。
E2_GetFileList 活动函数
独立模型
using System.IO;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace SampleApp;
public static class BackupSiteContent
{
[Function("E2_GetFileList")]
public static string[] GetFileList(
[ActivityTrigger] string rootDirectory,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("E2_GetFileList");
logger.LogInformation("Searching for files under '{RootDirectory}'...", rootDirectory);
string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
logger.LogInformation("Found {FileCount} file(s) under {RootDirectory}.", files.Length, rootDirectory);
return files;
}
}
进程内模型
[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
[ActivityTrigger] string rootDirectory,
ILogger log)
{
log.LogInformation($"Searching for files under '{rootDirectory}'...");
string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");
return files;
}
注意
不要将此代码放入业务流程协调程序函数中。 协调器函数不应进行 I/O 操作,包括访问本地文件系统。 有关详细信息,请参阅业务流程协调程序函数代码约束。
E2_CopyFileToBlob 活动函数
独立模型
注意
若要运行示例代码,请安装 Azure.Storage.Blobs NuGet 包。
using System;
using System.IO;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace SampleApp;
public static class BackupSiteContent
{
[Function("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("E2_CopyFileToBlob");
long byteCount = new FileInfo(filePath).Length;
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath)!.Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
string? connectionString = Environment.GetEnvironmentVariable("AzureWebJobsStorage");
if (string.IsNullOrEmpty(connectionString))
{
throw new InvalidOperationException("AzureWebJobsStorage is not configured.");
}
BlobContainerClient containerClient = new(connectionString, "backups");
await containerClient.CreateIfNotExistsAsync();
BlobClient blobClient = containerClient.GetBlobClient(blobPath);
logger.LogInformation("Copying '{FilePath}' to '{OutputLocation}'. Total bytes = {ByteCount}.", filePath, outputLocation, byteCount);
await using Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
await blobClient.UploadAsync(source, overwrite: true);
return byteCount;
}
}
进程内模型
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
注意
进程内模型示例需要 Microsoft.Azure.WebJobs.Extensions.Storage NuGet 包,并使用 Azure Functions 的绑定功能,比如 Binder 参数。
实现从磁盘加载文件,并将内容异步流式传输到容器中同名的 backups Blob 中。 该函数返回复制到存储的字节数。 编排器使用该值来计算总和。
注意
此示例将 I/O 操作移到函数 activityTrigger 中。 该工作可以跨多台计算机运行,并支持进度检查点。 如果主机进程结束,则你知道哪些上传已完成。
活动完成工作。 与编排器不同,活动可以执行 I/O 操作和非确定性逻辑。
处理工作项活动
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
private readonly ILogger<ProcessWorkItemActivity> _logger;
public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
{
_logger.LogInformation("Processing work item: {WorkItem}", workItem);
// Process the work item (this is where you do the actual work)
var result = new Dictionary<string, int>
{
{ workItem, workItem.Length }
};
return Task.FromResult(result);
}
}
聚合结果活动
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
private readonly ILogger<AggregateResultsActivity> _logger;
public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(
TaskActivityContext context, Dictionary<string, int>[] results)
{
_logger.LogInformation("Aggregating {Count} results", results.Length);
// Combine all results into one aggregated result
var aggregatedResult = new Dictionary<string, int>();
foreach (var result in results)
{
foreach (var kvp in result)
{
aggregatedResult[kvp.Key] = kvp.Value;
}
}
return Task.FromResult(aggregatedResult);
}
}
运行示例
通过发送以下 HTTP POST 请求,在 Windows 上启动业务流程:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
或者,在 Linux 函数应用中,通过发送以下 HTTP POST 请求来启动业务流程。 Python当前在 Linux 上运行应用服务:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
注意
该 HttpStart 函数需要 JSON。 包括 Content-Type: application/json 标头,并将目录路径编码为 JSON 字符串。 HTTP 代码片段假定 host.json 具有从所有 HTTP 触发器函数 URL 中删除默认 api/ 前缀的条目。 在示例 host.json 文件中查找此配置的标记。
此 HTTP 请求会触发 E2_BackupSiteContent 业务流程协调程序,并将字符串 D:\home\LogFiles 作为参数传递。 响应中包含用于检查备份操作状态的链接:
HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
(...trimmed...)
根据函数应用中的日志文件数,此作可能需要几分钟才能完成。 通过查询上一个 HTTP 202 响应标头中的 Location URL 获取最新状态:
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}
在本例中,函数仍在运行。 响应显示保存在协调器状态中的输入和上次更新时间。 使用Location的标头值进行轮询以确定是否完成。 当状态为“Completed”时,响应类似于以下示例:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
响应显示编排已完成以及完成的大致时间。 该 output 字段指示业务流程上传了大约 450 KB 的日志。
运行示例:
启动持久任务计划程序模拟器 进行本地开发。
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest启动工作器 来注册协调器和活动。
运行客户端 来安排包含工作项列表的编排:
// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(ParallelProcessingOrchestration), workItems);
// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");
后续步骤
此示例显示了扇出/扇入模式。 下一个示例演示如何使用 持久计时器实现监视器模式。
本文演示扇出/扇入模式。 探索更多模式和功能:
有关 JavaScript SDK 示例,请参阅 Durable Task JavaScript SDK 示例。