Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
使用 扇出/扇入 模式并行运行多个函数,然后聚合结果。 此模式是 Azure 无服务器工作流中并行处理的常见方法。 在本教程中,你将使用 Durable Functions 实现扇出/扇入模式,将应用的站点内容备份到 Azure 存储。
先决条件
使用 扇出/扇入 模式在工作流编排中进行并行处理。
- 将工作分配到同时运行的多个活动中。
- 通过聚合结果进行汇聚。
在本教程中,你将使用适用于 .NET、JavaScript、Python 和 Java 的 Durable Task SDK 实现扇出/扇入模式。
场景概述
此示例通过将目录下的所有文件(递归方式)上传到 Azure Blob 存储并计算上传的总字节数来演示并行处理。
单个函数可以处理上传,但无法缩放。 一个函数执行在一个虚拟机(VM)上运行,因此吞吐量仅限于该 VM。 可靠性是另一个问题:如果进程在中途失败或需要 5 分钟以上时间,则备份会以部分完成的状态结束,并且必须重启。
具有两个函数的基于队列的方法可提高吞吐量和可靠性,但引入了状态管理和协调的复杂性,例如报告上传的总字节数。
Durable Functions 提供并行处理、可靠性和协调,且开销最少,无需进行队列管理。
在此示例中,工作流协调程序将在多个活动中展开并行处理,然后通过汇总结果进行合并。 在以下情况下,请使用扇出/扇入模式:
- 处理一批项目,其中每个项目都可以独立处理。
- 跨多台计算机分配工作以提高吞吐量
- 聚合来自所有并行操作的结果
如果没有此模式,可以按顺序处理项(限制吞吐量),或构建自己的队列和协调逻辑(增加复杂性)。 持久性任务 SDK 负责处理并行化和协调,使扇出/扇入模式易于实现。
函数组件
本文介绍示例应用中的函数:
本文介绍示例代码中的组件:
-
ParallelProcessingOrchestration、fanOutFanInOrchestrator、fan_out_fan_in_orchestrator或FanOutFanIn_WordCount:一个业务流程协调程序,将工作并行扇出到多个活动,等待所有活动完成,然后通过聚合结果进行扇入。 -
ProcessWorkItemActivity、processWorkItem、process_work_item或CountWords:处理单个工作项的活动。 -
AggregateResultsActivity、aggregateResults或aggregate_results:一个活动,用于聚合所有并行操作的结果。
协调器
此业务流程协调程序函数执行以下任务:
- 将
rootDirectory作为输入。 - 调用某个函数来获取
rootDirectory下的文件的递归列表。 - 进行并行函数调用,以将每个文件上传到Azure Blob 存储。
- 等待所有上传完成。
- 返回上传到Azure Blob 存储的总字节数。
以下代码演示如何实现业务流程协调程序函数:
独立模型
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask;
namespace SampleApp;
public static class BackupSiteContent
{
[Function("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
string rootDirectory = context.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location)!.FullName;
}
string[] files = await context.CallActivityAsync<string[]>("E2_GetFileList", rootDirectory);
Task<long>[] tasks = files
.Select(file => context.CallActivityAsync<long>("E2_CopyFileToBlob", file))
.ToArray();
long[] results = await Task.WhenAll(tasks);
return results.Sum();
}
}
请注意 await Task.WhenAll(tasks); 行。 代码不会等待单个对 E2_CopyFileToBlob 的调用,因此它们并行运行。 当协调器将任务数组传递给 Task.WhenAll 时,它会返回一个任务,该任务只有在所有复制操作完成后才会完成。 如果熟悉.NET中的任务并行库(TPL),则此模式很熟悉。 借助 Durable Functions 扩展,这些任务可以在多个虚拟机上并行运行,并且整个执行过程具备对进程回收的弹性。
协调器等待 Task.WhenAll后,所有函数调用都已完成且返回值已经生成。 每次调用 E2_CopyFileToBlob 时都会返回上传的字节数。 通过添加返回值计算总计。
进程内模型
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
注意
进程内模型示例使用已弃用的进程内包。 前面的代码展示了推荐的 .NET 隔离工作器模型。
协调器执行以下任务:
- 将工作项列表作为输入。
- 通过为每个工作项创建任务并并行处理它们来退出。
- 等待所有并行任务完成。
- 球迷通过聚合结果来参与进来。
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
public override async Task<Dictionary<string, int>> RunAsync(
TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
var processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
}
用于 Task.WhenAll() 等待所有并行任务完成。 Durable Task SDK 可确保任务能够在多台计算机上同时运行,并且执行过程对进程重启具有弹性和复原能力。
活动
帮助程序活动函数是使用通过 activityTrigger 绑定的常规函数。
E2_GetFileList 活动函数
独立模型
using System.IO;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace SampleApp;
public static class BackupSiteContent
{
[Function("E2_GetFileList")]
public static string[] GetFileList(
[ActivityTrigger] string rootDirectory,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("E2_GetFileList");
logger.LogInformation("Searching for files under '{RootDirectory}'...", rootDirectory);
string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
logger.LogInformation("Found {FileCount} file(s) under {RootDirectory}.", files.Length, rootDirectory);
return files;
}
}
进程内模型
[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
[ActivityTrigger] string rootDirectory,
ILogger log)
{
log.LogInformation($"Searching for files under '{rootDirectory}'...");
string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");
return files;
}
注意
不要将此代码放入业务流程协调程序函数中。 协调器函数不应进行 I/O 操作,包括访问本地文件系统。 有关详细信息,请参阅业务流程协调程序函数代码约束。
E2_CopyFileToBlob 活动函数
独立模型
注意
若要运行示例代码,请安装 Azure.Storage.Blobs NuGet 包。
using System;
using System.IO;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace SampleApp;
public static class BackupSiteContent
{
[Function("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("E2_CopyFileToBlob");
long byteCount = new FileInfo(filePath).Length;
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath)!.Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
string? connectionString = Environment.GetEnvironmentVariable("AzureWebJobsStorage");
if (string.IsNullOrEmpty(connectionString))
{
throw new InvalidOperationException("AzureWebJobsStorage is not configured.");
}
BlobContainerClient containerClient = new(connectionString, "backups");
await containerClient.CreateIfNotExistsAsync();
BlobClient blobClient = containerClient.GetBlobClient(blobPath);
logger.LogInformation("Copying '{FilePath}' to '{OutputLocation}'. Total bytes = {ByteCount}.", filePath, outputLocation, byteCount);
await using Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
await blobClient.UploadAsync(source, overwrite: true);
return byteCount;
}
}
进程内模型
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
注意
进程内模型示例需要 Microsoft.Azure.WebJobs.Extensions.Storage NuGet 包,并使用 Azure Functions 的绑定功能,比如 Binder 参数。
实现从磁盘加载文件,并将内容异步流式传输到容器中同名的 backups Blob 中。 该函数返回复制到存储的字节数。 编排器使用该值来计算总和。
注意
此示例将 I/O 操作移到函数 activityTrigger 中。 该工作可以跨多台计算机运行,并支持进度检查点。 如果主机进程结束,则你知道哪些上传已完成。
活动完成工作。 与编排器不同,活动可以执行 I/O 操作和非确定性逻辑。
处理工作项活动
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
private readonly ILogger<ProcessWorkItemActivity> _logger;
public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
{
_logger.LogInformation("Processing work item: {WorkItem}", workItem);
// Process the work item (where you do the actual work)
var result = new Dictionary<string, int>
{
{ workItem, workItem.Length }
};
return Task.FromResult(result);
}
}
聚合结果活动
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
private readonly ILogger<AggregateResultsActivity> _logger;
public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(
TaskActivityContext context, Dictionary<string, int>[] results)
{
_logger.LogInformation("Aggregating {Count} results", results.Length);
// Combine all results into one aggregated result
var aggregatedResult = new Dictionary<string, int>();
foreach (var result in results)
{
foreach (var kvp in result)
{
aggregatedResult[kvp.Key] = kvp.Value;
}
}
return Task.FromResult(aggregatedResult);
}
}
运行扇出/扇入示例
通过发送以下 HTTP POST 请求,在 Windows 上启动业务流程:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
或者,在 Linux 函数应用中,通过发送以下 HTTP POST 请求来启动业务流程。 Python当前在 Linux 上运行应用服务:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
注意
该 HttpStart 函数需要 JSON。 包括 Content-Type: application/json 标头,并将目录路径编码为 JSON 字符串。 HTTP 代码片段假定 host.json 具有从所有 HTTP 触发器函数 URL 中删除默认 api/ 前缀的条目。 在示例 host.json 文件中查找此配置的标记。
此 HTTP 请求会触发 E2_BackupSiteContent 业务流程协调程序,并将字符串 D:\home\LogFiles 作为参数传递。 响应中包含用于检查备份操作状态的链接:
HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
(...trimmed...)
根据函数应用中的日志文件数,此作可能需要几分钟才能完成。 通过查询上一个 HTTP 202 响应标头中的 Location URL 获取最新状态:
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}
在本例中,函数仍在运行。 响应显示保存在协调器状态中的输入和上次更新时间。 使用Location的标头值进行轮询以确定是否完成。 当状态为“Completed”时,响应类似于以下示例:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
响应显示编排已完成,并提供完成的大致时间。 该output字段指示编排上传了大约 450 KB 的日志。
运行示例:
启动持久任务计划程序模拟器 进行本地开发。
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest启动工作器 来注册协调器和活动。
运行客户端 来安排包含工作项列表的编排:
// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(ParallelProcessingOrchestration), workItems);
// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");
后续步骤
此示例显示了扇出/扇入模式。 下一个示例演示如何使用 持久计时器实现监视器模式。
本文演示扇出/扇入模式。 探索更多模式和功能:
有关 JavaScript SDK 示例,请参阅 Durable Task JavaScript SDK 示例。