人机交互模式描述工作流在继续前会暂停以等待人的输入。 此模式对于审批工作流、多重身份验证以及人员在时间限制内做出响应的任何方案都很有用。
此示例演示如何生成包含人工交互的 Durable Functions 业务流程。 该示例实现基于短信的电话验证系统。 电话号码验证和多重身份验证(MFA)流程中很常见。
注释
Azure Functions 的 Node.js 编程模型版本 4 已正式发布。 v4 模型旨在为 JavaScript 和 TypeScript 开发人员提供更灵活、更直观的体验。 有关 v3 和 v4 之间的差异的详细信息,请参阅 迁移指南。
在以下代码片段中,JavaScript (PM4) 表示编程模型 v4,即新体验。
先决条件
本文介绍如何使用 Durable Task SDK 实现人工交互模式。 该示例实现一个审批工作流,业务流程在该工作流中等待某人批准或拒绝请求,然后再继续。
场景概述
电话验证有助于确认使用你的应用的用户不是垃圾邮件发送者,并且他们控制他们提供的电话号码。 多重身份验证是保护帐户的常见方法。 构建自己的手机验证需要与某人 进行有状态的交互 。 用户通常获取一个代码(例如四位数数字),并且必须在合理的时间范围内做出响应。
标准Azure Functions是无状态的(与许多其他云终结点一样),因此这种类型的交互要求你将状态存储在数据库或其他持久存储中。 还可以跨多个函数拆分交互并协调它们。 例如,一个函数生成代码、存储代码并将其发送到用户的手机。 另一个函数接收用户的响应,并将其映射到原始请求以验证代码。 添加超时以帮助保护安全性。 此工作流会很快变得复杂。
Durable Functions可降低此方案的复杂性。 在此示例中,协调器函数管理无外部数据存储的有状态交互。 由于业务流程协调程序函数 是持久的,因此这些交互式流高度可靠。
审批工作流在业务应用程序中很常见,在继续作之前,必须由人工审查请求。 工作流要求如下:
- 无限期等待人工响应,或直到超时
- 处理批准结果和拒绝结果
- 未收到响应时支持超时
- 跟踪状态 ,以便请求者可以检查进度
Durable Task SDK 通过以下方法简化了此方案:
- 外部事件:业务流程可以暂停并等待外部系统或用户引发的事件
- 持久计时器:设置一个超时,若未收到响应则触发该机制
- 自定义状态:跟踪当前工作流状态并将其公开给客户端
编排器和活动
本文介绍示例应用中的以下函数:
本文介绍示例应用中的以下组件:
-
ApprovalOrchestration/approvalOrchestrator/human_interaction_orchestrator:一个协调器,负责提交审批请求并等待人工响应或超时。 -
SubmitApprovalRequestActivity/submitRequest/submit_approval_request:通知人工审批者的活动,例如通过电子邮件或聊天消息。 -
ProcessApprovalActivity/processApproval/process_approval:处理审批决策的活动。
协调器
E4_SmsPhoneVerification 业务流程协调程序函数
[FunctionName("E4_SmsPhoneVerification")]
public static async Task<bool> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string phoneNumber = context.GetInput<string>();
if (string.IsNullOrEmpty(phoneNumber))
{
throw new ArgumentNullException(
nameof(phoneNumber),
"A phone number input is required.");
}
int challengeCode = await context.CallActivityAsync<int>(
"E4_SendSmsChallenge",
phoneNumber);
using (var timeoutCts = new CancellationTokenSource())
{
// The user has 90 seconds to respond with the code they received in the SMS message.
DateTime expiration = context.CurrentUtcDateTime.AddSeconds(90);
Task timeoutTask = context.CreateTimer(expiration, timeoutCts.Token);
bool authorized = false;
for (int retryCount = 0; retryCount <= 3; retryCount++)
{
Task<int> challengeResponseTask =
context.WaitForExternalEvent<int>("SmsChallengeResponse");
Task winner = await Task.WhenAny(challengeResponseTask, timeoutTask);
if (winner == challengeResponseTask)
{
// We got back a response! Compare it to the challenge code.
if (challengeResponseTask.Result == challengeCode)
{
authorized = true;
break;
}
}
else
{
// Timeout expired
break;
}
}
if (!timeoutTask.IsCompleted)
{
// All pending timers must be complete or canceled before the function exits.
timeoutCts.Cancel();
}
return authorized;
}
}
注释
最初可能并不明显,但此业务流程协调程序不会违反 确定性业务流程约束。 这具有确定性,因为CurrentUtcDateTime属性计算计时器的过期时间,并且在业务流程协调程序代码中,每次重播时,该属性都会返回相同的值。 此行为可确保 winner 每次重复调用 Task.WhenAny时都相同。
启动后,该业务流程协调程序函数执行以下任务:
- 获取要向其发送短信通知的电话号码。
- 调用 E4_SendSmsChallenge 向用户发送短信,并返回预期的四位数质询代码。
- 创建一个在当前时间之后 90 秒触发的持久定时器。
- 与计时器一起,等待来自用户的 SmsChallengeResponse 事件。
用户会收到一条含 4 位数代码的短信。 他们有 90 秒的时间将相同的代码发送到业务流程协调程序实例以完成验证。 如果他们提交错误的代码,则会在同一 90 秒窗口中再尝试三次。
警告
取消不再需要的计时器 。 在上面的示例中,编排在接受质询响应时取消计时器。
任务协调器提交审批请求,然后等待人工反馈或等待超时。
using Microsoft.DurableTask;
using System;
using System.Threading;
using System.Threading.Tasks;
[DurableTask(nameof(ApprovalOrchestration))]
public class ApprovalOrchestration : TaskOrchestrator<ApprovalRequestData, ApprovalResult>
{
public override async Task<ApprovalResult> RunAsync(
TaskOrchestrationContext context, ApprovalRequestData input)
{
string requestId = input.RequestId;
double timeoutHours = input.TimeoutHours;
// Step 1: Submit the approval request (notify approver)
SubmissionResult submissionResult = await context.CallActivityAsync<SubmissionResult>(
nameof(SubmitApprovalRequestActivity), input);
// Make the status available via custom status
context.SetCustomStatus(submissionResult);
// Step 2: Create a durable timer for the timeout
DateTime timeoutDeadline = context.CurrentUtcDateTime.AddHours(timeoutHours);
using var timeoutCts = new CancellationTokenSource();
Task timeoutTask = context.CreateTimer(timeoutDeadline, timeoutCts.Token);
// Step 3: Wait for an external event (approval/rejection)
Task<ApprovalResponseData> approvalTask = context.WaitForExternalEvent<ApprovalResponseData>(
"approval_response");
// Step 4: Wait for either the timeout or the approval response
Task completedTask = await Task.WhenAny(approvalTask, timeoutTask);
// Step 5: Process based on which task completed
ApprovalResult result;
if (completedTask == approvalTask)
{
// Human responded in time - cancel the timeout timer
timeoutCts.Cancel();
ApprovalResponseData approvalData = approvalTask.Result;
// Process the approval
result = await context.CallActivityAsync<ApprovalResult>(
nameof(ProcessApprovalActivity),
new ProcessApprovalInput
{
RequestId = requestId,
IsApproved = approvalData.IsApproved,
Approver = approvalData.Approver
});
}
else
{
// Timeout occurred
result = new ApprovalResult
{
RequestId = requestId,
Status = "Timeout",
ProcessedAt = context.CurrentUtcDateTime.ToString("o")
};
}
return result;
}
}
此编排器执行以下操作:
- 通过调用一个用于通知审批者的活动来提交审批请求。
- 设置自定义状态,以便客户端可以跟踪进度。
- 为超时时间创建持久计时器。
- 等待由审批者触发的外部事件(
approval_response)。 - 使用
WhenAny、when_any或anyOf来等待最先完成的任务:审批或超时。 - 根据哪个任务完成来处理结果。
警告
取消不再需要的计时器 。 在 C# 示例中,编排在收到批准时取消超时计时器。
活动
活动提交审批请求并处理响应。
提交审批请求的活动
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;
[DurableTask(nameof(SubmitApprovalRequestActivity))]
public class SubmitApprovalRequestActivity : TaskActivity<ApprovalRequestData, SubmissionResult>
{
private readonly ILogger<SubmitApprovalRequestActivity> _logger;
public SubmitApprovalRequestActivity(ILogger<SubmitApprovalRequestActivity> logger)
{
_logger = logger;
}
public override Task<SubmissionResult> RunAsync(
TaskActivityContext context, ApprovalRequestData input)
{
_logger.LogInformation(
"Submitting approval request {RequestId} from {Requester} for {Item}",
input.RequestId, input.Requester, input.Item);
// In a real system, this would send an email, notification, or update a database
var result = new SubmissionResult
{
RequestId = input.RequestId,
Status = "Pending",
SubmittedAt = DateTime.UtcNow.ToString("o"),
ApprovalUrl = $"http://localhost:8000/api/approvals/{input.RequestId}"
};
return Task.FromResult(result);
}
}
流程审批活动
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;
[DurableTask(nameof(ProcessApprovalActivity))]
public class ProcessApprovalActivity : TaskActivity<ProcessApprovalInput, ApprovalResult>
{
private readonly ILogger<ProcessApprovalActivity> _logger;
public ProcessApprovalActivity(ILogger<ProcessApprovalActivity> logger)
{
_logger = logger;
}
public override Task<ApprovalResult> RunAsync(
TaskActivityContext context, ProcessApprovalInput input)
{
string status = input.IsApproved ? "Approved" : "Rejected";
_logger.LogInformation(
"Processing {Status} request {RequestId} by {Approver}",
status, input.RequestId, input.Approver);
// In a real system, this would update a database, trigger workflows, etc.
var result = new ApprovalResult
{
RequestId = input.RequestId,
Status = status,
ProcessedAt = DateTime.UtcNow.ToString("o"),
Approver = input.Approver
};
return Task.FromResult(result);
}
}
// Data classes
public class ApprovalRequestData
{
public string RequestId { get; set; } = string.Empty;
public string Requester { get; set; } = string.Empty;
public string Item { get; set; } = string.Empty;
public double TimeoutHours { get; set; } = 24.0;
}
public class ApprovalResponseData
{
public bool IsApproved { get; set; }
public string Approver { get; set; } = string.Empty;
}
public class SubmissionResult
{
public string RequestId { get; set; } = string.Empty;
public string Status { get; set; } = string.Empty;
public string SubmittedAt { get; set; } = string.Empty;
public string ApprovalUrl { get; set; } = string.Empty;
}
public class ProcessApprovalInput
{
public string RequestId { get; set; } = string.Empty;
public bool IsApproved { get; set; }
public string Approver { get; set; } = string.Empty;
}
public class ApprovalResult
{
public string RequestId { get; set; } = string.Empty;
public string Status { get; set; } = string.Empty;
public string ProcessedAt { get; set; } = string.Empty;
public string? Approver { get; set; }
}
运行示例
使用示例中的 HTTP 触发函数通过发送以下 HTTP POST 请求来启动业务流程:
POST http://{host}/orchestrators/E4_SmsPhoneVerification
Content-Length: 14
Content-Type: application/json
"+1425XXXXXXX"
HTTP/1.1 202 Accepted
Content-Length: 695
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/741c65651d4c40cea29acdd5bb47baf1?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
{"id":"741c65651d4c40cea29acdd5bb47baf1","statusQueryGetUri":"http://{host}/runtime/webhooks/durabletask/instances/741c65651d4c40cea29acdd5bb47baf1?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}","sendEventPostUri":"http://{host}/runtime/webhooks/durabletask/instances/741c65651d4c40cea29acdd5bb47baf1/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}","terminatePostUri":"http://{host}/runtime/webhooks/durabletask/instances/741c65651d4c40cea29acdd5bb47baf1/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}"}
业务流程协调程序函数接收电话号码,并立即向该号码发送一条短信,其中包含随机生成的 4 位验证码,例如 2168。 然后函数等待 90 秒,获取响应。
若要使用代码进行答复,请在另一个函数中使用 RaiseEventAsync (.NET) 或 raiseEvent (JavaScript 和 TypeScript),或在 202 响应中调用 sendEventPostUri HTTP POST 终结点。 将{eventName}替换为SmsChallengeResponse。
POST http://{host}/runtime/webhooks/durabletask/instances/741c65651d4c40cea29acdd5bb47baf1/raiseEvent/SmsChallengeResponse?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
Content-Length: 4
Content-Type: application/json
2168
如果在计时器过期之前发送事件,业务流程将完成,并且 output 字段设置为 true,这表示验证成功。
GET http://{host}/runtime/webhooks/durabletask/instances/741c65651d4c40cea29acdd5bb47baf1?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 200 OK
Content-Length: 144
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"+1425XXXXXXX","output":true,"createdTime":"2017-06-29T19:10:49Z","lastUpdatedTime":"2017-06-29T19:12:23Z"}
如果计时器过期或输入错误的代码四次,请查询状态以查看 output 设置为 false,这表示电话验证失败。
HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Content-Length: 145
{"runtimeStatus":"Completed","input":"+1425XXXXXXX","output":false,"createdTime":"2017-06-29T19:20:49Z","lastUpdatedTime":"2017-06-29T19:22:23Z"}
若要运行该示例:
启动持久任务计划程序模拟器 进行本地开发。
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest启动工作器 来注册协调器和活动。
运行客户端 来计划审批工作流并发送事件。
using System;
using System.Threading.Tasks;
var client = DurableTaskClientBuilder.UseDurableTaskScheduler(connectionString).Build();
// Schedule the approval workflow
var input = new ApprovalRequestData
{
RequestId = "request-" + Guid.NewGuid().ToString(),
Requester = "john.doe@example.com",
Item = "Vacation Request - 5 days",
TimeoutHours = 24
};
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(ApprovalOrchestration), input);
Console.WriteLine($"Started approval workflow: {instanceId}");
// Simulate human approving the request
Console.WriteLine("Simulating approval...");
await Task.Delay(2000);
// Raise the approval event
var approvalResponse = new ApprovalResponseData
{
IsApproved = true,
Approver = "manager@example.com"
};
await client.RaiseEventAsync(instanceId, "approval_response", approvalResponse);
// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<ApprovalResult>().Status}");
后续步骤
此示例演示高级 Durable Functions 功能,包括 WaitForExternalEvent 和 CreateTimer API。 它演示如何将 Task.WhenAny (C#)、context.df.Task.any (JavaScript 和 TypeScript)或 context.task_any (Python) 组合在一起,实现等待人员响应的工作流的可靠超时模式。 在涵盖特定主题的一系列文章中详细了解Durable Functions。
此示例演示如何使用 Durable Task SDK 实现等待人员响应的工作流,并配置了超时。 关键概念:
外部事件:使用
WaitForExternalEvent来等待输入持久计时器:用于
CreateTimer实现超时赛车任务:使用
WhenAny或when_anyanyOf处理首先完成的任务