编排者函数可以等待和侦听外部事件。
Durable Functions 的此功能对于处理人机交互或其他外部触发器通常比较有用。
注意
外部事件是单向的异步操作。 它们不适用于发送事件的客户端需要来自业务流程协调程序函数的同步响应的情况。
编排可以等待并侦听外部事件。 此功能通常用于处理人工交互或其他外部触发器。
注意
外部事件是单向的异步操作。 它们不适用于客户端发送事件时需要来自业务流程的同步响应的情况。
重要
目前,PowerShell Durable Task SDK 不可用。
等待事件
业务流程触发器绑定的“wait-for-external-event”API 使业务流程协调程序函数可异步等待和侦听外部客户端传递的事件。 监听器编排器功能声明了事件的“名称”和它期望收到的“数据结构”。
“wait-for-external-event”API 允许业务流程异步等待和侦听外部客户端传递的事件。 侦听的业务流程声明事件名称和期望接收的数据形状。
独立工作模型
using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
public class BudgetApproval
{
private readonly ILogger _logger;
public BudgetApproval(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<BudgetApproval>();
}
[Function("BudgetApproval")]
public async Task Run(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
bool approved = await context.WaitForExternalEventAsync<bool>("Approval");
if (approved)
{
// approval granted - do the approved action
}
else
{
// approval denied - send a notification
}
}
}
进程内模型
[FunctionName("BudgetApproval")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
bool approved = await context.WaitForExternalEvent<bool>("Approval");
if (approved)
{
// approval granted - do the approved action
}
else
{
// approval denied - send a notification
}
}
注意
如果使用 Durable Functions 1.x,请使用 DurableOrchestrationContext 而不是 IDurableOrchestrationContext。 有关版本相关的详细信息,请查看 Durable Functions 版本一文。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const approved = yield context.df.waitForExternalEvent("Approval");
if (approved) {
// approval granted - do the approved action
} else {
// approval denied - send a notification
}
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
approved = yield context.wait_for_external_event('Approval')
if approved:
# approval granted - do the approved action
else:
# approval denied - send a notification
main = df.Orchestrator.create(orchestrator_function)
@FunctionName("WaitForExternalEvent")
public void waitForExternalEvent(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await();
if (approved) {
// approval granted - do the approved action
} else {
// approval denied - send a notification
}
}
param($Context)
$approved = Start-DurableExternalEventListener -EventName "Approval"
if ($approved) {
# approval granted - do the approved action
} else {
# approval denied - send a notification
}
public class BudgetApproval : TaskOrchestrator<object?, bool>
{
public override async Task<bool> RunAsync(TaskOrchestrationContext context, object? input)
{
bool approved = await context.WaitForExternalEvent<bool>("Approval");
if (approved)
{
// approval granted - do the approved action
}
else
{
// approval denied - send a notification
}
return approved;
}
}
from durabletask import task
def budget_approval(ctx: task.OrchestrationContext, _):
approved = yield ctx.wait_for_external_event("Approval")
if approved:
# approval granted - do the approved action
pass
else:
# approval denied - send a notification
pass
return approved
public class BudgetApproval implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await();
if (approved) {
// approval granted - do the approved action
} else {
// approval denied - send a notification
}
}
}
import { OrchestrationContext, TOrchestrator } from "@microsoft/durabletask-js";
const budgetApproval: TOrchestrator = async function* (ctx: OrchestrationContext): any {
const approved = yield ctx.waitForExternalEvent("Approval");
if (approved) {
// approval granted - do the approved action
} else {
// approval denied - send a notification
}
return approved;
};
前面的示例侦听特定的事件,并在收到事件时执行操作。
可以同时侦听多个事件,例如以下示例会等待三个可能的事件通知中的一个。
独立工作模型
[Function("Select")]
public async Task Run(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
Task<float> event1 = context.WaitForExternalEventAsync<float>("Event1");
Task<bool> event2 = context.WaitForExternalEventAsync<bool>("Event2");
Task<int> event3 = context.WaitForExternalEventAsync<int>("Event3");
Task winner = await Task.WhenAny(event1, event2, event3);
if (winner == event1)
{
// ...
}
else if (winner == event2)
{
// ...
}
else if (winner == event3)
{
// ...
}
}
进程内模型
[FunctionName("Select")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var event1 = context.WaitForExternalEvent<float>("Event1");
var event2 = context.WaitForExternalEvent<bool>("Event2");
var event3 = context.WaitForExternalEvent<int>("Event3");
var winner = await Task.WhenAny(event1, event2, event3);
if (winner == event1)
{
// ...
}
else if (winner == event2)
{
// ...
}
else if (winner == event3)
{
// ...
}
}
注意
使用 Durable Functions 1.x? 请替换 DurableOrchestrationContext,而不是 IDurableOrchestrationContext。 若要了解其他版本差异,请参阅 Durable Functions 版本一文。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const event1 = context.df.waitForExternalEvent("Event1");
const event2 = context.df.waitForExternalEvent("Event2");
const event3 = context.df.waitForExternalEvent("Event3");
const winner = yield context.df.Task.any([event1, event2, event3]);
if (winner === event1) {
// ...
} else if (winner === event2) {
// ...
} else if (winner === event3) {
// ...
}
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
event1 = context.wait_for_external_event('Event1')
event2 = context.wait_for_external_event('Event2')
event3 = context.wait_for_external_event('Event3')
winner = yield context.task_any([event1, event2, event3])
if winner == event1:
# ...
elif winner == event2:
# ...
elif winner == event3:
# ...
main = df.Orchestrator.create(orchestrator_function)
@FunctionName("Select")
public void selectOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
Task<Void> event1 = ctx.waitForExternalEvent("Event1");
Task<Void> event2 = ctx.waitForExternalEvent("Event2");
Task<Void> event3 = ctx.waitForExternalEvent("Event3");
Task<?> winner = ctx.anyOf(event1, event2, event3).await();
if (winner == event1) {
// ...
} else if (winner == event2) {
// ...
} else if (winner == event3) {
// ...
}
}
param($Context)
$event1 = Start-DurableExternalEventListener -EventName "Event1" -NoWait
$event2 = Start-DurableExternalEventListener -EventName "Event2" -NoWait
$event3 = Start-DurableExternalEventListener -EventName "Event3" -NoWait
$winner = Wait-DurableTask -Task @($event1, $event2, $event3) -Any
if ($winner -eq $event1) {
# ...
} else if ($winner -eq $event2) {
# ...
} else if ($winner -eq $event3) {
# ...
}
public class SelectOrchestrator : TaskOrchestrator<object?, object?>
{
public override async Task<object?> RunAsync(TaskOrchestrationContext context, object? input)
{
Task<float> event1 = context.WaitForExternalEvent<float>("Event1");
Task<bool> event2 = context.WaitForExternalEvent<bool>("Event2");
Task<int> event3 = context.WaitForExternalEvent<int>("Event3");
Task winner = await Task.WhenAny(event1, event2, event3);
if (winner == event1)
{
// ...
}
else if (winner == event2)
{
// ...
}
else if (winner == event3)
{
// ...
}
return null;
}
}
from durabletask import task
def select_orchestrator(ctx: task.OrchestrationContext, _):
event1 = ctx.wait_for_external_event("Event1")
event2 = ctx.wait_for_external_event("Event2")
event3 = ctx.wait_for_external_event("Event3")
winner = yield task.when_any([event1, event2, event3])
if winner == event1:
# ...
pass
elif winner == event2:
# ...
pass
elif winner == event3:
# ...
pass
public class SelectOrchestrator implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
Task<Void> event1 = ctx.waitForExternalEvent("Event1");
Task<Void> event2 = ctx.waitForExternalEvent("Event2");
Task<Void> event3 = ctx.waitForExternalEvent("Event3");
Task<?> winner = ctx.anyOf(event1, event2, event3).await();
if (winner == event1) {
// ...
} else if (winner == event2) {
// ...
} else if (winner == event3) {
// ...
}
}
}
import { OrchestrationContext, TOrchestrator, whenAny } from "@microsoft/durabletask-js";
const selectOrchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
const event1 = ctx.waitForExternalEvent("Event1");
const event2 = ctx.waitForExternalEvent("Event2");
const event3 = ctx.waitForExternalEvent("Event3");
const winner = yield whenAny([event1, event2, event3]);
if (winner === event1) {
// ...
} else if (winner === event2) {
// ...
} else if (winner === event3) {
// ...
}
};
前面的示例侦听多个事件中的任意事件。 您还可以等待 所有 事件。
独立工作模型
[Function("NewBuildingPermit")]
public async Task Run(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
string applicationId = context.GetInput<string>();
Task gate1 = context.WaitForExternalEventAsync<object>("CityPlanningApproval");
Task gate2 = context.WaitForExternalEventAsync<object>("FireDeptApproval");
Task gate3 = context.WaitForExternalEventAsync<object>("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
await Task.WhenAll(gate1, gate2, gate3);
await context.CallActivityAsync("IssueBuildingPermit", applicationId);
}
进程内模型
[FunctionName("NewBuildingPermit")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string applicationId = context.GetInput<string>();
var gate1 = context.WaitForExternalEvent("CityPlanningApproval");
var gate2 = context.WaitForExternalEvent("FireDeptApproval");
var gate3 = context.WaitForExternalEvent("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
await Task.WhenAll(gate1, gate2, gate3);
await context.CallActivityAsync("IssueBuildingPermit", applicationId);
}
注意
如果要运行 Durable Functions 1.x,请使用 DurableOrchestrationContext 而不是 IDurableOrchestrationContext。 请查看 Durable Functions 版本,获取各个版本之间差异的详细信息。
在 .NET 中,如果事件负载无法转换为预期类型 T,将会引发异常。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const applicationId = context.df.getInput();
const gate1 = context.df.waitForExternalEvent("CityPlanningApproval");
const gate2 = context.df.waitForExternalEvent("FireDeptApproval");
const gate3 = context.df.waitForExternalEvent("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
yield context.df.Task.all([gate1, gate2, gate3]);
yield context.df.callActivity("IssueBuildingPermit", applicationId);
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
application_id = context.get_input()
gate1 = context.wait_for_external_event('CityPlanningApproval')
gate2 = context.wait_for_external_event('FireDeptApproval')
gate3 = context.wait_for_external_event('BuildingDeptApproval')
yield context.task_all([gate1, gate2, gate3])
yield context.call_activity('IssueBuildingPermit', application_id)
main = df.Orchestrator.create(orchestrator_function)
@FunctionName("NewBuildingPermit")
public void newBuildingPermit(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String applicationId = ctx.getInput(String.class);
Task<Void> gate1 = ctx.waitForExternalEvent("CityPlanningApproval");
Task<Void> gate2 = ctx.waitForExternalEvent("FireDeptApproval");
Task<Void> gate3 = ctx.waitForExternalEvent("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
ctx.allOf(List.of(gate1, gate2, gate3)).await();
ctx.callActivity("IssueBuildingPermit", applicationId).await();
}
param($Context)
$applicationId = $Context.Input
$gate1 = Start-DurableExternalEventListener -EventName "CityPlanningApproval" -NoWait
$gate2 = Start-DurableExternalEventListener -EventName "FireDeptApproval" -NoWait
$gate3 = Start-DurableExternalEventListener -EventName "BuildingDeptApproval" -NoWait
Wait-DurableTask -Task @($gate1, $gate2, $gate3)
Invoke-ActivityFunction -FunctionName 'IssueBuildingPermit' -Input $applicationId
public class NewBuildingPermit : TaskOrchestrator<string, object?>
{
public override async Task<object?> RunAsync(TaskOrchestrationContext context, string applicationId)
{
Task<object?> gate1 = context.WaitForExternalEvent<object?>("CityPlanningApproval");
Task<object?> gate2 = context.WaitForExternalEvent<object?>("FireDeptApproval");
Task<object?> gate3 = context.WaitForExternalEvent<object?>("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
await Task.WhenAll(gate1, gate2, gate3);
await context.CallActivityAsync("IssueBuildingPermit", applicationId);
return null;
}
}
在 .NET 中,如果事件负载无法转换为预期类型 T,将会引发异常。
from durabletask import task
def issue_building_permit(ctx: task.ActivityContext, application_id: str) -> None:
# Issue the permit
pass
def new_building_permit(ctx: task.OrchestrationContext, application_id: str):
gate1 = ctx.wait_for_external_event("CityPlanningApproval")
gate2 = ctx.wait_for_external_event("FireDeptApproval")
gate3 = ctx.wait_for_external_event("BuildingDeptApproval")
# all three departments must grant approval before a permit can be issued
yield task.when_all([gate1, gate2, gate3])
yield ctx.call_activity(issue_building_permit, input=application_id)
public class NewBuildingPermit implements TaskOrchestration {
@Override
public void run(TaskOrchestrationContext ctx) {
String applicationId = ctx.getInput(String.class);
Task<Void> gate1 = ctx.waitForExternalEvent("CityPlanningApproval");
Task<Void> gate2 = ctx.waitForExternalEvent("FireDeptApproval");
Task<Void> gate3 = ctx.waitForExternalEvent("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
ctx.allOf(List.of(gate1, gate2, gate3)).await();
ctx.callActivity("IssueBuildingPermit", applicationId).await();
}
}
import { ActivityContext, OrchestrationContext, TOrchestrator, whenAll } from "@microsoft/durabletask-js";
const issueBuildingPermit = async (_: ActivityContext, applicationId: string): Promise<void> => {
// Issue the permit
};
const newBuildingPermit: TOrchestrator = async function* (
ctx: OrchestrationContext,
applicationId: string,
): any {
const gate1 = ctx.waitForExternalEvent("CityPlanningApproval");
const gate2 = ctx.waitForExternalEvent("FireDeptApproval");
const gate3 = ctx.waitForExternalEvent("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
yield whenAll([gate1, gate2, gate3]);
yield ctx.callActivity(issueBuildingPermit, applicationId);
};
“wait-for-external-event” API 无期限地等待一些输入。 等待时可以安全地卸载函数应用。 如果某个事件到达此业务流程实例,则会自动唤醒该实例,并立即处理该事件。
注意
如果函数应用使用消耗计划,当业务流程协调程序函数等待外部事件任务时,无论它等待多久,都不会产生账单费用。
与活动函数一样,外部事件具有至少一次交付保证。 这意味着,在某些条件下(例如重启、缩放、崩溃等),应用程序可能会收到重复的同一外部事件。 因此,我们建议外部事件包含某种 ID,以便在业务流程协调程序中手动取消其重复。
“wait-for-external-event” API 无期限地等待一些输入。 可以在等待时安全地中止工作线程。 对于此编排实例,当某个事件到达时,会立即被自动唤醒并处理该事件。
外部事件至少提供 一次 传递保证。 这意味着,在某些条件下(例如重启、缩放、崩溃等),应用程序可能会收到重复的同一外部事件。 因此,我们建议外部事件包含某种 ID,允许它们在业务流程中手动取消重复。
发送事件
可使用由编排客户端绑定定义的“raise-event”API将外部事件发送到编排。 还可以使用内置引发事件 HTTP API 将外部事件发送到业务流程。
引发的事件包含 instanceID、eventName 和 eventData 作为参数。 业务流程协调程序函数使用 wait-for-external-event API 处理这些事件。
eventName端和接收端的 必须匹配,事件才能被处理。 事件数据还必须是 JSON 可序列化的。
在内部,“raise-event”机制将消息排入队列,随后会被正在等待的协调器函数选取。 如果实例未等待指定的 事件名称, 则会将事件消息添加到内存中队列。 如果编排实例稍后开始侦听该 事件名称, 它会检查队列中的事件消息。
注意
如果没有具有指定 实例 ID 的业务流程实例,则会丢弃事件消息。
下面是一个队列触发函数的示例,它将“Approval”事件发送到一个协调器函数实例。 编排实例 ID 来自队列消息的正文。
可以使用 Durable Task 客户端上的“raise-event” API 将外部事件发送到协调程序。
引发的事件包括实例 ID、eventName 和 eventData 等参数。 业务流程使用 “wait-for-external-event” API 处理这些事件。 在发送端和接收端,eventName 必须匹配才能处理事件。 事件数据还必须是 JSON 可序列化的。
在内部,“raise-event”机制将消息排入队列,由等待的业务流程提取。 如果实例没有在等待指定的事件名,则将事件消息添加到内存中队列。 如果编排实例稍后开始侦听该事件名称,它将检查队列中的事件消息。
注意
如果没有具有指定instanceID的编排实例,则事件消息会被丢弃。
下面是将“审批”事件发送到业务流程实例的示例。
独立工作模型
using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask.Client;
public class ApprovalQueueProcessor
{
[Function("ApprovalQueueProcessor")]
public async Task Run(
[QueueTrigger("approval-queue")] string instanceId,
[DurableClient] DurableTaskClient client)
{
await client.RaiseEventAsync(instanceId, "Approval", true);
}
}
进程内模型
[FunctionName("ApprovalQueueProcessor")]
public static async Task Run(
[QueueTrigger("approval-queue")] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
await client.RaiseEventAsync(instanceId, "Approval", true);
}
注意
对于 Durable Functions 1.x,请改用 OrchestrationClient 属性和 DurableOrchestrationClient 参数类型。 有关所有特定于版本的更改,请查看 Durable Functions 版本一文。
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
await client.raiseEvent(instanceId, "Approval", true);
};
import azure.functions as func
import azure.durable_functions as df
async def main(instance_id:str, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
await client.raise_event(instance_id, 'Approval', True)
@FunctionName("ApprovalQueueProcessor")
public void approvalQueueProcessor(
@QueueTrigger(name = "instanceID", queueName = "approval-queue") String instanceID,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
durableContext.getClient().raiseEvent(instanceID, "Approval", true);
}
param($instanceId)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "Approval"
在内部,“raise-event”API 会将消息排入队列,供在等待的协调函数处理。 如果实例未等待指定的 事件名称, 则会将事件消息添加到内存中缓冲区。 如果编排实例稍后开始侦听该事件名称,它会检查缓冲区中的事件消息,并触发正在等待事件的任务。
注意
如果没有具有指定实例 ID 的业务流程实例,则丢弃事件消息。
await client.RaiseEventAsync(instanceId, "Approval", true);
client.raise_orchestration_event(instance_id, "Approval", data=True)
client.raiseEvent(instanceId, "Approval", true);
await client.raiseOrchestrationEvent(instanceId, "Approval", true);
内部,“raise-event”API 会将一条消息加入队列,该消息会被等待的业务流程获取。 如果实例没有在等待指定的事件名,则将事件消息添加到内存缓冲区。 如果编排实例稍后开始侦听“事件名称”,它将检查缓冲区中的事件消息,并触发正在等待的任务。
注意
如果没有具有指定实例 ID 的业务流程实例,则丢弃事件消息。
HTTP
下面是向编排实例引发 Approval 事件的 HTTP 请求的示例。
POST /runtime/webhooks/durabletask/instances/MyInstanceId/raiseEvent/Approval&code=XXX
Content-Type: application/json
"true"
在本例中,实例 ID 硬编码为 MyInstanceId。
外部事件的最佳做法
使用外部事件时,请记住以下最佳做法:
使用唯一的事件名称进行重复数据删除
外部事件至少提供 一次 传递保证。 在某些罕见情况下(这些情况可能发生在重启、扩展或崩溃期间),您的应用程序可能会收到同一外部事件的重复项。 建议外部事件包含唯一 ID,允许在业务流程协调程序中手动删除重复数据。
后续步骤