扇出/扇入 并行运行多个函数,然后聚合结果。 本文演示了一个示例,该示例使用 Durable Functions 将应用的某些或所有网站内容备份到Azure Storage。
先决条件
扇出/扇入 并行运行多个操作,然后汇总结果。 本文介绍如何使用用于 .NET、JavaScript、Python 和Java的 Durable Task SDK 实现模式。
场景概述
在此示例中,函数会将指定目录下的所有文件(递归地)上传到 Blob 存储。 它们还会计算上传的总字节数。
单个函数可以处理所有内容,但无法缩放。 单个函数执行在一个虚拟机(VM)上运行,因此吞吐量仅限于该 VM。 可靠性是另一个问题。 如果进程在中途失败,或者需要 5 分钟以上,则备份可能会以部分完成的状态结束。 然后重启备份。
更可靠的方法是使用两个单独的函数:一个枚举文件并将文件名添加到队列,另一个从队列读取并将文件上传到 Blob 存储。 此方法可提高吞吐量和可靠性,但需要设置和管理队列。 更重要的是,此方法增加了状态管理和协调的复杂性,例如报告上传的字节总数。
Durable Functions提供所有这些优势,开销很少。
在以下示例中,业务流程协调程序并行处理多个工作项,然后聚合结果。 在需要以下情况时,此模式非常有用:
- 处理一批项,其中每个项可以独立处理
- 跨多台计算机分配工作以提高吞吐量
- 聚合来自所有并行操作的结果
如果没有扇出/扇入模式,要么按顺序处理项目,但这样会限制吞吐量,要么自己管理队列和协调逻辑,而这样会增加复杂性。
Durable Task SDK 处理并行化和协调,因此模式易于实现。
功能
本文介绍示例应用中的函数:
-
E2_BackupSiteContent:一个 协调器函数,它调用 E2_GetFileList 获取要备份的文件列表,然后为每个文件调用 E2_CopyFileToBlob。
-
E2_GetFileList:一个活动函数,它返回目录中的文件列表。
-
E2_CopyFileToBlob:将单个文件备份到Azure Blob Storage的活动函数。
本文介绍示例代码中的组件:
-
ParallelProcessingOrchestration、fanOutFanInOrchestrator、fan_out_fan_in_orchestrator 或 FanOutFanIn_WordCount:一个业务流程协调程序,将工作并行扇出到多个活动,等待所有活动完成,然后通过聚合结果进行扇入。
-
ProcessWorkItemActivity、 processWorkItem、 process_work_item或 CountWords:处理单个工作项的活动。
-
AggregateResultsActivity、aggregateResults或aggregate_results:一个活动,用于聚合所有并行操作的结果。
协调器
此业务流程协调程序函数执行如下作业:
- 将
rootDirectory 作为输入。
- 调用某个函数来获取
rootDirectory 下的文件的递归列表。
- 进行并行函数调用,以将每个文件上传到Azure Blob Storage。
- 等待所有上传完成。
- 返回上传到Azure Blob Storage的总字节数。
下面的代码实现协调器函数:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
请注意 await Task.WhenAll(tasks); 行。 代码不会等待单个对 E2_CopyFileToBlob 的调用,因此它们并行运行。 当协调器将任务数组传递给 Task.WhenAll 时,它会返回一个任务,该任务只有在所有复制操作完成后才会完成。 如果熟悉.NET中的任务并行库(TPL),则此模式很熟悉。 区别在于,这些任务可以同时在多个虚拟机上运行,而Durable Functions扩展可确保端到端执行具备对进程回收的耐受性。
协调器等待 Task.WhenAll后,所有函数调用都已完成且返回值已经生成。 每次调用 E2_CopyFileToBlob 时都会返回上传的字节数。 通过添加返回值计算总计。
V3 编程模型
协调器函数使用标准的 function.json。
{
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
],
"disabled": false
}
下面的代码实现协调器函数:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
请注意 yield context.df.Task.all(tasks); 行。 代码不会引发对 E2_CopyFileToBlob 的单独调用,以便它们并行运行。 当协调器将任务数组传递给 context.df.Task.all 时,它会返回一个任务,该任务只有在所有复制操作完成后才会完成。 如果你熟悉 JavaScript 中的 Promise.all,则这对你来说并不陌生。 区别在于,这些任务可以同时在多个虚拟机上运行,而Durable Functions扩展可确保端到端执行具备对进程回收的耐受性。
注释
虽然任务在概念上类似于 JavaScript 承诺,但业务流程协调程序函数应使用 context.df.Task.all 和 context.df.Task.any(而不是 Promise.all 和 Promise.race)来管理任务并行化。
业务流程协调程序生成 context.df.Task.all后,所有函数调用都已完成并返回值。 每次调用 E2_CopyFileToBlob 都会返回已上传字节数,因此,将所有这些返回值相加就能计算出字节数总和。
V4 编程模型
下面的代码实现协调器函数:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace(/\\/g, "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
请注意 yield context.df.Task.all(tasks); 行。 对 copyFileToBlob 函数的所有单个调用都未暂停,这使它们可以并行运行。 将此任务数组传递给 context.df.Task.all 时,会获得所有复制操作完成之前不会完成的任务。 如果你熟悉 JavaScript 中的 Promise.all,则这对你来说并不陌生。 区别在于,这些任务可以同时在多个虚拟机上运行,而Durable Functions扩展可确保端到端执行具备对进程回收的耐受性。
注释
虽然任务在概念上类似于 JavaScript 承诺,但业务流程协调程序函数应使用 context.df.Task.all 和 context.df.Task.any(而不是 Promise.all 和 Promise.race)来管理任务并行化。
我们在从context.df.Task.all让出控制权后,知道所有函数调用都已完成并返回值给我们。 每次调用 copyFileToBlob 都会返回已上传字节数,因此,将所有这些返回值相加就能计算出字节数总和。
协调器函数使用标准的 function.json。
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
下面的代码实现协调器函数:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
请注意 yield context.task_all(tasks); 行。 代码不会引发对 E2_CopyFileToBlob 的单独调用,以便它们并行运行。 当协调器将任务数组传递给 context.task_all 时,它会返回一个任务,该任务只有在所有复制操作完成后才会完成。 如果你熟悉 Python 中的 asyncio.gather,则这不是新手。 区别在于,这些任务可以同时在多个虚拟机上运行,而Durable Functions扩展可确保端到端执行具备对进程回收的耐受性。
注释
尽管任务在概念上与 Python 可等待项类似,但编排器函数应使用yield以及context.task_all和context.task_any接口来管理任务并行化。
业务流程协调程序生成 context.task_all后,所有函数调用都已完成并返回值。 每次调用 E2_CopyFileToBlob 都会返回已上传字节数,因此,将所有这些返回值相加就能计算出字节数总和。
协调器执行以下操作:
- 将工作项列表作为输入。
- 通过为每个工作项创建任务并并行处理它们来退出。
- 等待所有并行任务完成。
- 球迷通过聚合结果来参与进来。
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
public override async Task<Dictionary<string, int>> RunAsync(
TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
var processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
}
用于 Task.WhenAll() 等待所有并行任务完成。 Durable Task SDK 确保任务可以在多台计算机上并发运行,并且执行具有恢复能力,能够应对进程重启。
import {
OrchestrationContext,
TOrchestrator,
whenAll,
} from "@microsoft/durabletask-js";
const fanOutFanInOrchestrator: TOrchestrator = async function* (
ctx: OrchestrationContext,
workItems: string[]
): any {
// Fan-out: create a task for each work item in parallel
const tasks = workItems.map((item) => ctx.callActivity(processWorkItem, item));
// Wait for all parallel tasks to complete
const results: number[] = yield whenAll(tasks);
// Fan-in: aggregate all results
const aggregatedResult = yield ctx.callActivity(aggregateResults, results);
return aggregatedResult;
};
用于 whenAll() 等待所有并行任务完成。 Durable Task SDK 确保任务可以在多台计算机上并发运行,并且执行具有恢复能力,能够应对进程重启。
from durabletask import task
def fan_out_fan_in_orchestrator(ctx: task.OrchestrationContext, work_items: list) -> dict:
"""Orchestrator that demonstrates fan-out/fan-in pattern."""
# Fan-out: Create a task for each work item
parallel_tasks = []
for item in work_items:
parallel_tasks.append(ctx.call_activity(process_work_item, input=item))
# Wait for all tasks to complete
results = yield task.when_all(parallel_tasks)
# Fan-in: Aggregate all the results
final_result = yield ctx.call_activity(aggregate_results, input=results)
return final_result
用于 task.when_all() 等待所有并行任务完成。 Durable Task SDK 确保任务可以在多台计算机上并发运行,并且执行具有恢复能力,能够应对进程重启。
此示例适用于 .NET、JavaScript、Java 和 Python。
import com.microsoft.durabletask.*;
import java.util.List;
import java.util.stream.Collectors;
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "FanOutFanIn_WordCount"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<?> inputs = ctx.getInput(List.class);
// Fan-out: Create a task for each input item
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
.collect(Collectors.toList());
// Wait for all parallel tasks to complete
List<Integer> allResults = ctx.allOf(tasks).await();
// Fan-in: Aggregate results
int totalCount = allResults.stream().mapToInt(Integer::intValue).sum();
ctx.complete(totalCount);
};
}
})
.build();
用于 ctx.allOf(tasks).await() 等待所有并行任务完成。 Durable Task SDK 确保任务可以在多台计算机上并发运行,并且执行具有恢复能力,能够应对进程重启。
活动
辅助活动函数是使用activityTrigger绑定的常规函数。
E2_GetFileList 活动函数
[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
[ActivityTrigger] string rootDirectory,
ILogger log)
{
log.LogInformation($"Searching for files under '{rootDirectory}'...");
string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");
return files;
}
V3 编程模型
function.json文件 E2_GetFileList如以下示例所示:
{
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
],
"disabled": false
}
下面是实现:
const readdirp = require("readdirp");
module.exports = function (context, rootDirectory) {
context.log(`Searching for files under '${rootDirectory}'...`);
const allFilePaths = [];
readdirp(
{ root: rootDirectory, entryType: "all" },
function (fileInfo) {
if (!fileInfo.stat.isDirectory()) {
allFilePaths.push(fileInfo.fullPath);
}
},
function (err, res) {
if (err) {
throw err;
}
context.log(`Found ${allFilePaths.length} under ${rootDirectory}.`);
context.done(null, allFilePaths);
}
);
};
该函数使用 readdirp 模块版本 2.x以递归方式读取目录结构。
V4 编程模型
活动函数getFileList的实现如下:
const df = require("durable-functions");
const readdirp = require("readdirp");
const getFileListActivityName = "getFileList";
df.app.activity(getFileListActivityName, {
handler: async function (rootDirectory, context) {
context.log(`Searching for files under '${rootDirectory}'...`);
const allFilePaths = [];
for await (const entry of readdirp(rootDirectory, { type: "files" })) {
allFilePaths.push(entry.fullPath);
}
context.log(`Found ${allFilePaths.length} under ${rootDirectory}.`);
return allFilePaths;
},
});
此函数使用 readdirp 模块(版本 3.x)以递归方式读取目录结构。
function.json文件 E2_GetFileList如以下示例所示:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
]
}
下面是实现:
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
注释
不要将此代码放入业务流程协调程序函数中。 协调器函数不应进行 I/O 操作,包括访问本地文件系统。 有关详细信息,请参阅业务流程协调程序函数代码约束。
E2_CopyFileToBlob 活动函数
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
注释
若要运行示例代码,请安装 Microsoft.Azure.WebJobs.Extensions.Storage NuGet 包。
该函数使用Azure Functions绑定功能,如 Binder 参数。 本演练不需要这些详细信息。
V3 编程模型
function.json 文件对 E2_CopyFileToBlob 同样很简单:
{
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
},
{
"name": "out",
"type": "blob",
"path": "",
"connection": "AzureWebJobsStorage",
"direction": "out"
}
],
"disabled": false
}
JavaScript 实现使用 Azure Storage SDK for Node将文件上传到Azure Blob Storage。
const fs = require("fs");
const path = require("path");
const storage = require("azure-storage");
module.exports = function (context, filePath) {
const container = "backups";
const root = path.parse(filePath).root;
const blobPath = filePath.substring(root.length).replace("\\", "/");
const outputLocation = `backups/${blobPath}`;
const blobService = storage.createBlobService();
blobService.createContainerIfNotExists(container, (error) => {
if (error) {
throw error;
}
fs.stat(filePath, function (error, stats) {
if (error) {
throw error;
}
context.log(
`Copying '${filePath}' to '${outputLocation}'. Total bytes = ${stats.size}.`
);
const readStream = fs.createReadStream(filePath);
blobService.createBlockBlobFromStream(
container,
blobPath,
readStream,
stats.size,
function (error) {
if (error) {
throw error;
}
context.done(null, stats.size);
}
);
});
});
};
V4 编程模型
copyFileToBlob的 JavaScript 实现使用Azure Storage输出绑定将文件上传到Azure Blob Storage。
const df = require("durable-functions");
const fs = require("fs/promises");
const { output } = require("@azure/functions");
const copyFileToBlobActivityName = "copyFileToBlob";
const blobOutput = output.storageBlob({
path: "backups/{backupPath}",
connection: "StorageConnString",
});
df.app.activity(copyFileToBlobActivityName, {
extraOutputs: [blobOutput],
handler: async function ({ backupPath, filePath }, context) {
const outputLocation = `backups/${backupPath}`;
const stats = await fs.stat(filePath);
context.log(`Copying '${filePath}' to '${outputLocation}'. Total bytes = ${stats.size}.`);
const fileContents = await fs.readFile(filePath);
context.extraOutputs.set(blobOutput, fileContents);
return stats.size;
},
});
function.json 文件对 E2_CopyFileToBlob 同样很简单:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
}
]
}
Python实现使用 Azure Storage SDK for Python将文件上传到Azure Blob Storage。
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
实现从磁盘加载文件,并将内容异步流式传输到容器中同名的 backups Blob 中。 该函数返回复制到存储的字节数。 编排器使用该值来计算总和。
注释
此示例将 I/O 操作移到函数 activityTrigger 中。 该工作可以跨多台计算机运行,并支持进度检查点。 如果主机进程结束,则你知道哪些上传已完成。
活动完成工作。 与编排器不同,活动可以执行 I/O 操作和非确定性逻辑。
处理工作项活动
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
private readonly ILogger<ProcessWorkItemActivity> _logger;
public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
{
_logger.LogInformation("Processing work item: {WorkItem}", workItem);
// Process the work item (this is where you do the actual work)
var result = new Dictionary<string, int>
{
{ workItem, workItem.Length }
};
return Task.FromResult(result);
}
}
import { ActivityContext } from "@microsoft/durabletask-js";
const processWorkItem = async (
_ctx: ActivityContext,
item: string
): Promise<number> => {
console.log(`Processing work item: "${item}"`);
return item.length;
};
与业务流程协调程序不同,活动可以执行 I/O作,例如 HTTP 调用、数据库查询和文件访问。
from durabletask import task
def process_work_item(ctx: task.ActivityContext, item: int) -> dict:
"""Activity that processes a single work item."""
# Process the work item (this is where you do the actual work)
result = item * item
return {"item": item, "result": result}
此示例适用于 .NET、JavaScript、Java 和 Python。
import java.util.StringTokenizer;
// Activity registration
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CountWords"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
return tokenizer.countTokens();
};
}
})
聚合结果活动
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
private readonly ILogger<AggregateResultsActivity> _logger;
public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(
TaskActivityContext context, Dictionary<string, int>[] results)
{
_logger.LogInformation("Aggregating {Count} results", results.Length);
// Combine all results into one aggregated result
var aggregatedResult = new Dictionary<string, int>();
foreach (var result in results)
{
foreach (var kvp in result)
{
aggregatedResult[kvp.Key] = kvp.Value;
}
}
return Task.FromResult(aggregatedResult);
}
}
import { ActivityContext } from "@microsoft/durabletask-js";
const aggregateResults = async (
_ctx: ActivityContext,
results: number[]
): Promise<object> => {
const total = results.reduce((sum, val) => sum + val, 0);
return {
totalItems: results.length,
sum: total,
average: results.length > 0 ? total / results.length : 0,
};
};
与业务流程协调程序不同,活动可以执行 I/O作,例如 HTTP 调用、数据库查询和文件访问。
from durabletask import task
def aggregate_results(ctx: task.ActivityContext, results: list) -> dict:
"""Activity that aggregates results from multiple work items."""
sum_result = sum(item["result"] for item in results)
return {
"total_items": len(results),
"sum": sum_result,
"average": sum_result / len(results) if results else 0
}
此示例适用于 .NET、JavaScript、Java 和 Python。
在Java示例中,业务流程协调程序在ctx.allOf(tasks).await()返回后聚合结果。
运行示例
通过发送以下 HTTP POST 请求,在 Windows 上启动业务流程:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
或者,在 Linux 函数应用中,通过发送以下 HTTP POST 请求来启动业务流程。 Python当前在 Linux 上运行应用服务:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
注释
该 HttpStart 函数需要 JSON。 包括 Content-Type: application/json 标头,并将目录路径编码为 JSON 字符串。 HTTP 代码片段假定 host.json 具有从所有 HTTP 触发器函数 URL 中删除默认 api/ 前缀的条目。 在示例 host.json 文件中查找此配置的标记。
此 HTTP 请求会触发 E2_BackupSiteContent 业务流程协调程序,并将字符串 D:\home\LogFiles 作为参数传递。 响应中包含用于检查备份操作状态的链接:
HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
(...trimmed...)
根据函数应用中的日志文件数,此作可能需要几分钟才能完成。 通过查询上一个 HTTP 202 响应标头中的 Location URL 获取最新状态:
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}
在本例中,函数仍在运行。 响应显示保存在协调器状态中的输入和上次更新时间。 使用Location的标头值进行轮询以确定是否完成。 当状态为“Completed”时,响应类似于以下示例:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
响应显示编排已完成以及完成的大致时间。 该 output 字段指示业务流程上传了大约 450 KB 的日志。
运行示例:
启动持久任务计划程序模拟器 进行本地开发。
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
启动工作器 来注册协调器和活动。
运行客户端 来安排包含工作项列表的编排:
// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(ParallelProcessingOrchestration), workItems);
// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");
import {
DurableTaskAzureManagedClientBuilder,
} from "@microsoft/durabletask-js-azuremanaged";
const connectionString =
process.env.DURABLE_TASK_SCHEDULER_CONNECTION_STRING ||
"Endpoint=http://localhost:8080;Authentication=None;TaskHub=default";
const client = new DurableTaskAzureManagedClientBuilder()
.connectionString(connectionString)
.build();
const workItems = ["item1", "item2", "item3", "item4", "item5"];
const instanceId = await client.scheduleNewOrchestration(fanOutFanInOrchestrator, workItems);
const state = await client.waitForOrchestrationCompletion(instanceId, true, 30);
console.log(`Result: ${state?.serializedOutput}`);
使用指向持久任务计划程序的连接字符串创建 DurableTaskAzureManagedClientBuilder。 使用 scheduleNewOrchestration 启动编排,并使用 waitForOrchestrationCompletion 等待完成。
# Schedule the orchestration with a list of work items
work_items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
instance_id = client.schedule_new_orchestration(fan_out_fan_in_orchestrator, input=work_items)
# Wait for completion
result = client.wait_for_orchestration_completion(instance_id, timeout=60)
print(f"Result: {result.serialized_output}")
此示例适用于 .NET、JavaScript、Java 和 Python。
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
// Schedule the orchestration with a list of strings
List<String> sentences = Arrays.asList(
"Hello, world!",
"The quick brown fox jumps over the lazy dog.",
"Always remember that you are absolutely unique.");
String instanceId = client.scheduleNewOrchestrationInstance(
"FanOutFanIn_WordCount",
new NewOrchestrationInstanceOptions().setInput(sentences));
// Wait for completion
OrchestrationMetadata result = client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(30), true);
System.out.println("Total word count: " + result.readOutputAs(int.class));
后续步骤
此示例显示了扇出/扇入模式。 下一个示例演示如何使用 持久计时器实现监视器模式。